Skip to content

snapshot

ExpireSnapshots

Bases: UpdateTableMetadata['ExpireSnapshots']

Expire snapshots by ID.

Use table.expire_snapshots().().commit() to run a specific operation. Use table.expire_snapshots().().().commit() to run multiple operations. Pending changes are applied on commit.

Source code in pyiceberg/table/update/snapshot.py
class ExpireSnapshots(UpdateTableMetadata["ExpireSnapshots"]):
    """Expire snapshots by ID.

    Use table.expire_snapshots().<operation>().commit() to run a specific operation.
    Use table.expire_snapshots().<operation-one>().<operation-two>().commit() to run multiple operations.
    Pending changes are applied on commit.
    """

    _snapshot_ids_to_expire: Set[int] = set()
    _updates: Tuple[TableUpdate, ...] = ()
    _requirements: Tuple[TableRequirement, ...] = ()

    def _commit(self) -> UpdatesAndRequirements:
        """
        Commit the staged updates and requirements.

        This will remove the snapshots with the given IDs, but will always skip protected snapshots (branch/tag heads).

        Returns:
            Tuple of updates and requirements to be committed,
            as required by the calling parent apply functions.
        """
        # Remove any protected snapshot IDs from the set to expire, just in case
        protected_ids = self._get_protected_snapshot_ids()
        self._snapshot_ids_to_expire -= protected_ids
        update = RemoveSnapshotsUpdate(snapshot_ids=self._snapshot_ids_to_expire)
        self._updates += (update,)
        return self._updates, self._requirements

    def _get_protected_snapshot_ids(self) -> Set[int]:
        """
        Get the IDs of protected snapshots.

        These are the HEAD snapshots of all branches and all tagged snapshots.  These ids are to be excluded from expiration.

        Returns:
            Set of protected snapshot IDs to exclude from expiration.
        """
        return {
            ref.snapshot_id
            for ref in self._transaction.table_metadata.refs.values()
            if ref.snapshot_ref_type in [SnapshotRefType.TAG, SnapshotRefType.BRANCH]
        }

    def by_id(self, snapshot_id: int) -> ExpireSnapshots:
        """
        Expire a snapshot by its ID.

        This will mark the snapshot for expiration.

        Args:
            snapshot_id (int): The ID of the snapshot to expire.
        Returns:
            This for method chaining.
        """
        if self._transaction.table_metadata.snapshot_by_id(snapshot_id) is None:
            raise ValueError(f"Snapshot with ID {snapshot_id} does not exist.")

        if snapshot_id in self._get_protected_snapshot_ids():
            raise ValueError(f"Snapshot with ID {snapshot_id} is protected and cannot be expired.")

        self._snapshot_ids_to_expire.add(snapshot_id)

        return self

    def by_ids(self, snapshot_ids: List[int]) -> "ExpireSnapshots":
        """
        Expire multiple snapshots by their IDs.

        This will mark the snapshots for expiration.

        Args:
            snapshot_ids (List[int]): List of snapshot IDs to expire.
        Returns:
            This for method chaining.
        """
        for snapshot_id in snapshot_ids:
            self.by_id(snapshot_id)
        return self

    def older_than(self, dt: datetime) -> "ExpireSnapshots":
        """
        Expire all unprotected snapshots with a timestamp older than a given value.

        Args:
            dt (datetime): Only snapshots with datetime < this value will be expired.

        Returns:
            This for method chaining.
        """
        protected_ids = self._get_protected_snapshot_ids()
        expire_from = datetime_to_millis(dt)
        for snapshot in self._transaction.table_metadata.snapshots:
            if snapshot.timestamp_ms < expire_from and snapshot.snapshot_id not in protected_ids:
                self._snapshot_ids_to_expire.add(snapshot.snapshot_id)
        return self

by_id(snapshot_id)

Expire a snapshot by its ID.

This will mark the snapshot for expiration.

Parameters:

Name Type Description Default
snapshot_id int

The ID of the snapshot to expire.

required

Returns: This for method chaining.

Source code in pyiceberg/table/update/snapshot.py
def by_id(self, snapshot_id: int) -> ExpireSnapshots:
    """
    Expire a snapshot by its ID.

    This will mark the snapshot for expiration.

    Args:
        snapshot_id (int): The ID of the snapshot to expire.
    Returns:
        This for method chaining.
    """
    if self._transaction.table_metadata.snapshot_by_id(snapshot_id) is None:
        raise ValueError(f"Snapshot with ID {snapshot_id} does not exist.")

    if snapshot_id in self._get_protected_snapshot_ids():
        raise ValueError(f"Snapshot with ID {snapshot_id} is protected and cannot be expired.")

    self._snapshot_ids_to_expire.add(snapshot_id)

    return self

by_ids(snapshot_ids)

Expire multiple snapshots by their IDs.

This will mark the snapshots for expiration.

Parameters:

Name Type Description Default
snapshot_ids List[int]

List of snapshot IDs to expire.

required

Returns: This for method chaining.

Source code in pyiceberg/table/update/snapshot.py
def by_ids(self, snapshot_ids: List[int]) -> "ExpireSnapshots":
    """
    Expire multiple snapshots by their IDs.

    This will mark the snapshots for expiration.

    Args:
        snapshot_ids (List[int]): List of snapshot IDs to expire.
    Returns:
        This for method chaining.
    """
    for snapshot_id in snapshot_ids:
        self.by_id(snapshot_id)
    return self

older_than(dt)

Expire all unprotected snapshots with a timestamp older than a given value.

Parameters:

Name Type Description Default
dt datetime

Only snapshots with datetime < this value will be expired.

required

Returns:

Type Description
'ExpireSnapshots'

This for method chaining.

Source code in pyiceberg/table/update/snapshot.py
def older_than(self, dt: datetime) -> "ExpireSnapshots":
    """
    Expire all unprotected snapshots with a timestamp older than a given value.

    Args:
        dt (datetime): Only snapshots with datetime < this value will be expired.

    Returns:
        This for method chaining.
    """
    protected_ids = self._get_protected_snapshot_ids()
    expire_from = datetime_to_millis(dt)
    for snapshot in self._transaction.table_metadata.snapshots:
        if snapshot.timestamp_ms < expire_from and snapshot.snapshot_id not in protected_ids:
            self._snapshot_ids_to_expire.add(snapshot.snapshot_id)
    return self

ManageSnapshots

Bases: UpdateTableMetadata['ManageSnapshots']

Run snapshot management operations using APIs.

APIs include create branch, create tag, etc.

Use table.manage_snapshots().().commit() to run a specific operation. Use table.manage_snapshots().().().commit() to run multiple operations. Pending changes are applied on commit.

We can also use context managers to make more changes. For example,

with table.manage_snapshots() as ms: ms.create_tag(snapshot_id1, "Tag_A").create_tag(snapshot_id2, "Tag_B")

Source code in pyiceberg/table/update/snapshot.py
class ManageSnapshots(UpdateTableMetadata["ManageSnapshots"]):
    """
    Run snapshot management operations using APIs.

    APIs include create branch, create tag, etc.

    Use table.manage_snapshots().<operation>().commit() to run a specific operation.
    Use table.manage_snapshots().<operation-one>().<operation-two>().commit() to run multiple operations.
    Pending changes are applied on commit.

    We can also use context managers to make more changes. For example,

    with table.manage_snapshots() as ms:
       ms.create_tag(snapshot_id1, "Tag_A").create_tag(snapshot_id2, "Tag_B")
    """

    _updates: Tuple[TableUpdate, ...] = ()
    _requirements: Tuple[TableRequirement, ...] = ()

    def _commit(self) -> UpdatesAndRequirements:
        """Apply the pending changes and commit."""
        return self._updates, self._requirements

    def _remove_ref_snapshot(self, ref_name: str) -> ManageSnapshots:
        """Remove a snapshot ref.

        Args:
            ref_name: branch / tag name to remove
        Stages the updates and requirements for the remove-snapshot-ref.
        Returns
            This method for chaining
        """
        updates = (RemoveSnapshotRefUpdate(ref_name=ref_name),)
        requirements = (
            AssertRefSnapshotId(
                snapshot_id=self._transaction.table_metadata.refs[ref_name].snapshot_id
                if ref_name in self._transaction.table_metadata.refs
                else None,
                ref=ref_name,
            ),
        )
        self._updates += updates
        self._requirements += requirements
        return self

    def create_tag(self, snapshot_id: int, tag_name: str, max_ref_age_ms: Optional[int] = None) -> ManageSnapshots:
        """
        Create a new tag pointing to the given snapshot id.

        Args:
            snapshot_id (int): snapshot id of the existing snapshot to tag
            tag_name (str): name of the tag
            max_ref_age_ms (Optional[int]): max ref age in milliseconds

        Returns:
            This for method chaining
        """
        update, requirement = self._transaction._set_ref_snapshot(
            snapshot_id=snapshot_id,
            ref_name=tag_name,
            type="tag",
            max_ref_age_ms=max_ref_age_ms,
        )
        self._updates += update
        self._requirements += requirement
        return self

    def remove_tag(self, tag_name: str) -> ManageSnapshots:
        """
        Remove a tag.

        Args:
            tag_name (str): name of tag to remove
        Returns:
            This for method chaining
        """
        return self._remove_ref_snapshot(ref_name=tag_name)

    def create_branch(
        self,
        snapshot_id: int,
        branch_name: str,
        max_ref_age_ms: Optional[int] = None,
        max_snapshot_age_ms: Optional[int] = None,
        min_snapshots_to_keep: Optional[int] = None,
    ) -> ManageSnapshots:
        """
        Create a new branch pointing to the given snapshot id.

        Args:
            snapshot_id (int): snapshot id of existing snapshot at which the branch is created.
            branch_name (str): name of the new branch
            max_ref_age_ms (Optional[int]): max ref age in milliseconds
            max_snapshot_age_ms (Optional[int]): max age of snapshots to keep in milliseconds
            min_snapshots_to_keep (Optional[int]): min number of snapshots to keep for the branch
        Returns:
            This for method chaining
        """
        update, requirement = self._transaction._set_ref_snapshot(
            snapshot_id=snapshot_id,
            ref_name=branch_name,
            type="branch",
            max_ref_age_ms=max_ref_age_ms,
            max_snapshot_age_ms=max_snapshot_age_ms,
            min_snapshots_to_keep=min_snapshots_to_keep,
        )
        self._updates += update
        self._requirements += requirement
        return self

    def remove_branch(self, branch_name: str) -> ManageSnapshots:
        """
        Remove a branch.

        Args:
            branch_name (str): name of branch to remove
        Returns:
            This for method chaining
        """
        return self._remove_ref_snapshot(ref_name=branch_name)

create_branch(snapshot_id, branch_name, max_ref_age_ms=None, max_snapshot_age_ms=None, min_snapshots_to_keep=None)

Create a new branch pointing to the given snapshot id.

Parameters:

Name Type Description Default
snapshot_id int

snapshot id of existing snapshot at which the branch is created.

required
branch_name str

name of the new branch

required
max_ref_age_ms Optional[int]

max ref age in milliseconds

None
max_snapshot_age_ms Optional[int]

max age of snapshots to keep in milliseconds

None
min_snapshots_to_keep Optional[int]

min number of snapshots to keep for the branch

None

Returns: This for method chaining

Source code in pyiceberg/table/update/snapshot.py
def create_branch(
    self,
    snapshot_id: int,
    branch_name: str,
    max_ref_age_ms: Optional[int] = None,
    max_snapshot_age_ms: Optional[int] = None,
    min_snapshots_to_keep: Optional[int] = None,
) -> ManageSnapshots:
    """
    Create a new branch pointing to the given snapshot id.

    Args:
        snapshot_id (int): snapshot id of existing snapshot at which the branch is created.
        branch_name (str): name of the new branch
        max_ref_age_ms (Optional[int]): max ref age in milliseconds
        max_snapshot_age_ms (Optional[int]): max age of snapshots to keep in milliseconds
        min_snapshots_to_keep (Optional[int]): min number of snapshots to keep for the branch
    Returns:
        This for method chaining
    """
    update, requirement = self._transaction._set_ref_snapshot(
        snapshot_id=snapshot_id,
        ref_name=branch_name,
        type="branch",
        max_ref_age_ms=max_ref_age_ms,
        max_snapshot_age_ms=max_snapshot_age_ms,
        min_snapshots_to_keep=min_snapshots_to_keep,
    )
    self._updates += update
    self._requirements += requirement
    return self

create_tag(snapshot_id, tag_name, max_ref_age_ms=None)

Create a new tag pointing to the given snapshot id.

Parameters:

Name Type Description Default
snapshot_id int

snapshot id of the existing snapshot to tag

required
tag_name str

name of the tag

required
max_ref_age_ms Optional[int]

max ref age in milliseconds

None

Returns:

Type Description
ManageSnapshots

This for method chaining

Source code in pyiceberg/table/update/snapshot.py
def create_tag(self, snapshot_id: int, tag_name: str, max_ref_age_ms: Optional[int] = None) -> ManageSnapshots:
    """
    Create a new tag pointing to the given snapshot id.

    Args:
        snapshot_id (int): snapshot id of the existing snapshot to tag
        tag_name (str): name of the tag
        max_ref_age_ms (Optional[int]): max ref age in milliseconds

    Returns:
        This for method chaining
    """
    update, requirement = self._transaction._set_ref_snapshot(
        snapshot_id=snapshot_id,
        ref_name=tag_name,
        type="tag",
        max_ref_age_ms=max_ref_age_ms,
    )
    self._updates += update
    self._requirements += requirement
    return self

remove_branch(branch_name)

Remove a branch.

Parameters:

Name Type Description Default
branch_name str

name of branch to remove

required

Returns: This for method chaining

Source code in pyiceberg/table/update/snapshot.py
def remove_branch(self, branch_name: str) -> ManageSnapshots:
    """
    Remove a branch.

    Args:
        branch_name (str): name of branch to remove
    Returns:
        This for method chaining
    """
    return self._remove_ref_snapshot(ref_name=branch_name)

remove_tag(tag_name)

Remove a tag.

Parameters:

Name Type Description Default
tag_name str

name of tag to remove

required

Returns: This for method chaining

Source code in pyiceberg/table/update/snapshot.py
def remove_tag(self, tag_name: str) -> ManageSnapshots:
    """
    Remove a tag.

    Args:
        tag_name (str): name of tag to remove
    Returns:
        This for method chaining
    """
    return self._remove_ref_snapshot(ref_name=tag_name)