Skip to content

snapshot

ExpireSnapshots

Bases: UpdateTableMetadata['ExpireSnapshots']

Expire snapshots by ID.

Use table.expire_snapshots().().commit() to run a specific operation. Use table.expire_snapshots().().().commit() to run multiple operations. Pending changes are applied on commit.

Source code in pyiceberg/table/update/snapshot.py
class ExpireSnapshots(UpdateTableMetadata["ExpireSnapshots"]):
    """Expire snapshots by ID.

    Use table.expire_snapshots().<operation>().commit() to run a specific operation.
    Use table.expire_snapshots().<operation-one>().<operation-two>().commit() to run multiple operations.
    Pending changes are applied on commit.
    """

    _updates: tuple[TableUpdate, ...]
    _requirements: tuple[TableRequirement, ...]
    _snapshot_ids_to_expire: set[int]

    def __init__(self, transaction: Transaction) -> None:
        super().__init__(transaction)
        self._updates = ()
        self._requirements = ()
        self._snapshot_ids_to_expire = set()

    def _commit(self) -> UpdatesAndRequirements:
        """
        Commit the staged updates and requirements.

        This will remove the snapshots with the given IDs, but will always skip protected snapshots (branch/tag heads).

        Returns:
            Tuple of updates and requirements to be committed,
            as required by the calling parent apply functions.
        """
        # Remove any protected snapshot IDs from the set to expire, just in case
        protected_ids = self._get_protected_snapshot_ids()
        self._snapshot_ids_to_expire -= protected_ids
        update = RemoveSnapshotsUpdate(snapshot_ids=self._snapshot_ids_to_expire)
        self._updates += (update,)
        return self._updates, self._requirements

    def _get_protected_snapshot_ids(self) -> set[int]:
        """
        Get the IDs of protected snapshots.

        These are the HEAD snapshots of all branches and all tagged snapshots.  These ids are to be excluded from expiration.

        Returns:
            Set of protected snapshot IDs to exclude from expiration.
        """
        return {
            ref.snapshot_id
            for ref in self._transaction.table_metadata.refs.values()
            if ref.snapshot_ref_type in [SnapshotRefType.TAG, SnapshotRefType.BRANCH]
        }

    def by_id(self, snapshot_id: int) -> ExpireSnapshots:
        """
        Expire a snapshot by its ID.

        This will mark the snapshot for expiration.

        Args:
            snapshot_id (int): The ID of the snapshot to expire.
        Returns:
            This for method chaining.
        """
        if self._transaction.table_metadata.snapshot_by_id(snapshot_id) is None:
            raise ValueError(f"Snapshot with ID {snapshot_id} does not exist.")

        if snapshot_id in self._get_protected_snapshot_ids():
            raise ValueError(f"Snapshot with ID {snapshot_id} is protected and cannot be expired.")

        self._snapshot_ids_to_expire.add(snapshot_id)

        return self

    def by_ids(self, snapshot_ids: list[int]) -> ExpireSnapshots:
        """
        Expire multiple snapshots by their IDs.

        This will mark the snapshots for expiration.

        Args:
            snapshot_ids (List[int]): List of snapshot IDs to expire.
        Returns:
            This for method chaining.
        """
        for snapshot_id in snapshot_ids:
            self.by_id(snapshot_id)
        return self

    def older_than(self, dt: datetime) -> ExpireSnapshots:
        """
        Expire all unprotected snapshots with a timestamp older than a given value.

        Args:
            dt (datetime): Only snapshots with datetime < this value will be expired.

        Returns:
            This for method chaining.
        """
        protected_ids = self._get_protected_snapshot_ids()
        expire_from = datetime_to_millis(dt)
        for snapshot in self._transaction.table_metadata.snapshots:
            if snapshot.timestamp_ms < expire_from and snapshot.snapshot_id not in protected_ids:
                self._snapshot_ids_to_expire.add(snapshot.snapshot_id)
        return self

by_id(snapshot_id)

Expire a snapshot by its ID.

This will mark the snapshot for expiration.

Parameters:

Name Type Description Default
snapshot_id int

The ID of the snapshot to expire.

required

Returns: This for method chaining.

Source code in pyiceberg/table/update/snapshot.py
def by_id(self, snapshot_id: int) -> ExpireSnapshots:
    """
    Expire a snapshot by its ID.

    This will mark the snapshot for expiration.

    Args:
        snapshot_id (int): The ID of the snapshot to expire.
    Returns:
        This for method chaining.
    """
    if self._transaction.table_metadata.snapshot_by_id(snapshot_id) is None:
        raise ValueError(f"Snapshot with ID {snapshot_id} does not exist.")

    if snapshot_id in self._get_protected_snapshot_ids():
        raise ValueError(f"Snapshot with ID {snapshot_id} is protected and cannot be expired.")

    self._snapshot_ids_to_expire.add(snapshot_id)

    return self

by_ids(snapshot_ids)

Expire multiple snapshots by their IDs.

This will mark the snapshots for expiration.

Parameters:

Name Type Description Default
snapshot_ids List[int]

List of snapshot IDs to expire.

required

Returns: This for method chaining.

Source code in pyiceberg/table/update/snapshot.py
def by_ids(self, snapshot_ids: list[int]) -> ExpireSnapshots:
    """
    Expire multiple snapshots by their IDs.

    This will mark the snapshots for expiration.

    Args:
        snapshot_ids (List[int]): List of snapshot IDs to expire.
    Returns:
        This for method chaining.
    """
    for snapshot_id in snapshot_ids:
        self.by_id(snapshot_id)
    return self

older_than(dt)

Expire all unprotected snapshots with a timestamp older than a given value.

Parameters:

Name Type Description Default
dt datetime

Only snapshots with datetime < this value will be expired.

required

Returns:

Type Description
ExpireSnapshots

This for method chaining.

Source code in pyiceberg/table/update/snapshot.py
def older_than(self, dt: datetime) -> ExpireSnapshots:
    """
    Expire all unprotected snapshots with a timestamp older than a given value.

    Args:
        dt (datetime): Only snapshots with datetime < this value will be expired.

    Returns:
        This for method chaining.
    """
    protected_ids = self._get_protected_snapshot_ids()
    expire_from = datetime_to_millis(dt)
    for snapshot in self._transaction.table_metadata.snapshots:
        if snapshot.timestamp_ms < expire_from and snapshot.snapshot_id not in protected_ids:
            self._snapshot_ids_to_expire.add(snapshot.snapshot_id)
    return self

ManageSnapshots

Bases: UpdateTableMetadata['ManageSnapshots']

Run snapshot management operations using APIs.

APIs include create branch, create tag, etc.

Use table.manage_snapshots().().commit() to run a specific operation. Use table.manage_snapshots().().().commit() to run multiple operations. Pending changes are applied on commit.

We can also use context managers to make more changes. For example,

with table.manage_snapshots() as ms: ms.create_tag(snapshot_id1, "Tag_A").create_tag(snapshot_id2, "Tag_B")

Source code in pyiceberg/table/update/snapshot.py
class ManageSnapshots(UpdateTableMetadata["ManageSnapshots"]):
    """
    Run snapshot management operations using APIs.

    APIs include create branch, create tag, etc.

    Use table.manage_snapshots().<operation>().commit() to run a specific operation.
    Use table.manage_snapshots().<operation-one>().<operation-two>().commit() to run multiple operations.
    Pending changes are applied on commit.

    We can also use context managers to make more changes. For example,

    with table.manage_snapshots() as ms:
       ms.create_tag(snapshot_id1, "Tag_A").create_tag(snapshot_id2, "Tag_B")
    """

    _updates: tuple[TableUpdate, ...]
    _requirements: tuple[TableRequirement, ...]

    def __init__(self, transaction: Transaction) -> None:
        super().__init__(transaction)
        self._updates = ()
        self._requirements = ()

    def _commit(self) -> UpdatesAndRequirements:
        """Apply the pending changes and commit."""
        return self._updates, self._requirements

    def _commit_if_ref_updates_exist(self) -> None:
        """Stage any pending ref updates to the transaction state."""
        if self._updates:
            self._transaction._stage(*self._commit())
            self._updates = ()
            self._requirements = ()

    def _remove_ref_snapshot(self, ref_name: str) -> ManageSnapshots:
        """Remove a snapshot ref.

        Args:
            ref_name: branch / tag name to remove
        Stages the updates and requirements for the remove-snapshot-ref.
        Returns
            This method for chaining
        """
        updates = (RemoveSnapshotRefUpdate(ref_name=ref_name),)
        requirements = (
            AssertRefSnapshotId(
                snapshot_id=self._transaction.table_metadata.refs[ref_name].snapshot_id
                if ref_name in self._transaction.table_metadata.refs
                else None,
                ref=ref_name,
            ),
        )
        self._updates += updates
        self._requirements += requirements
        return self

    def create_tag(self, snapshot_id: int, tag_name: str, max_ref_age_ms: int | None = None) -> ManageSnapshots:
        """
        Create a new tag pointing to the given snapshot id.

        Args:
            snapshot_id (int): snapshot id of the existing snapshot to tag
            tag_name (str): name of the tag
            max_ref_age_ms (Optional[int]): max ref age in milliseconds

        Returns:
            This for method chaining
        """
        update, requirement = self._transaction._set_ref_snapshot(
            snapshot_id=snapshot_id,
            ref_name=tag_name,
            type=SnapshotRefType.TAG,
            max_ref_age_ms=max_ref_age_ms,
        )
        self._updates += update
        self._requirements += requirement
        return self

    def remove_tag(self, tag_name: str) -> ManageSnapshots:
        """
        Remove a tag.

        Args:
            tag_name (str): name of tag to remove
        Returns:
            This for method chaining
        """
        return self._remove_ref_snapshot(ref_name=tag_name)

    def create_branch(
        self,
        snapshot_id: int,
        branch_name: str,
        max_ref_age_ms: int | None = None,
        max_snapshot_age_ms: int | None = None,
        min_snapshots_to_keep: int | None = None,
    ) -> ManageSnapshots:
        """
        Create a new branch pointing to the given snapshot id.

        Args:
            snapshot_id (int): snapshot id of existing snapshot at which the branch is created.
            branch_name (str): name of the new branch
            max_ref_age_ms (Optional[int]): max ref age in milliseconds
            max_snapshot_age_ms (Optional[int]): max age of snapshots to keep in milliseconds
            min_snapshots_to_keep (Optional[int]): min number of snapshots to keep for the branch
        Returns:
            This for method chaining
        """
        update, requirement = self._transaction._set_ref_snapshot(
            snapshot_id=snapshot_id,
            ref_name=branch_name,
            type=SnapshotRefType.BRANCH,
            max_ref_age_ms=max_ref_age_ms,
            max_snapshot_age_ms=max_snapshot_age_ms,
            min_snapshots_to_keep=min_snapshots_to_keep,
        )
        self._updates += update
        self._requirements += requirement
        return self

    def remove_branch(self, branch_name: str) -> ManageSnapshots:
        """
        Remove a branch.

        Args:
            branch_name (str): name of branch to remove
        Returns:
            This for method chaining
        """
        return self._remove_ref_snapshot(ref_name=branch_name)

    def set_current_snapshot(self, snapshot_id: int | None = None, ref_name: str | None = None) -> ManageSnapshots:
        """Set the current snapshot to a specific snapshot ID or ref.

        Args:
            snapshot_id: The ID of the snapshot to set as current.
            ref_name: The snapshot reference (branch or tag) to set as current.

        Returns:
            This for method chaining.

        Raises:
            ValueError: If neither or both arguments are provided, or if the snapshot/ref does not exist.
        """
        self._commit_if_ref_updates_exist()

        if (snapshot_id is None) == (ref_name is None):
            raise ValueError("Either snapshot_id or ref_name must be provided, not both")

        target_snapshot_id: int
        if snapshot_id is not None:
            target_snapshot_id = snapshot_id
        else:
            if ref_name not in self._transaction.table_metadata.refs:
                raise ValueError(f"Cannot find matching snapshot ID for ref: {ref_name}")
            target_snapshot_id = self._transaction.table_metadata.refs[ref_name].snapshot_id

        if self._transaction.table_metadata.snapshot_by_id(target_snapshot_id) is None:
            raise ValueError(f"Cannot set current snapshot to unknown snapshot id: {target_snapshot_id}")

        update, requirement = self._transaction._set_ref_snapshot(
            snapshot_id=target_snapshot_id,
            ref_name=MAIN_BRANCH,
            type=SnapshotRefType.BRANCH,
        )
        self._transaction._stage(update, requirement)
        return self

    def rollback_to_snapshot(self, snapshot_id: int) -> ManageSnapshots:
        """Rollback the table to the given snapshot id.

        The snapshot needs to be an ancestor of the current table state.

        Args:
            snapshot_id (int): rollback to this snapshot_id that used to be current.

        Returns:
            This for method chaining

        Raises:
            ValueError: If the snapshot does not exist or is not an ancestor of the current table state.
        """
        if not self._transaction.table_metadata.snapshot_by_id(snapshot_id):
            raise ValueError(f"Cannot roll back to unknown snapshot id: {snapshot_id}")

        if not self._is_current_ancestor(snapshot_id):
            raise ValueError(f"Cannot roll back to snapshot, not an ancestor of the current state: {snapshot_id}")

        return self.set_current_snapshot(snapshot_id=snapshot_id)

    def rollback_to_timestamp(self, timestamp_ms: int) -> ManageSnapshots:
        """Rollback the table to the latest snapshot before the given timestamp.

        Finds the latest ancestor snapshot whose timestamp is before the given timestamp and rolls back to it.

        Args:
            timestamp_ms: Rollback to the latest snapshot before this timestamp in milliseconds.

        Returns:
            This for method chaining

        Raises:
            ValueError: If no valid snapshot exists older than the given timestamp.
        """
        snapshot = latest_ancestor_before_timestamp(self._transaction.table_metadata, timestamp_ms)
        if snapshot is None:
            raise ValueError(f"Cannot roll back, no valid snapshot older than: {timestamp_ms}")

        return self.set_current_snapshot(snapshot_id=snapshot.snapshot_id)

    def _is_current_ancestor(self, snapshot_id: int) -> bool:
        return snapshot_id in self._current_ancestors()

    def _current_ancestors(self) -> set[int]:
        return {
            a.snapshot_id
            for a in ancestors_of(
                self._transaction.table_metadata.current_snapshot(),
                self._transaction.table_metadata,
            )
        }

create_branch(snapshot_id, branch_name, max_ref_age_ms=None, max_snapshot_age_ms=None, min_snapshots_to_keep=None)

Create a new branch pointing to the given snapshot id.

Parameters:

Name Type Description Default
snapshot_id int

snapshot id of existing snapshot at which the branch is created.

required
branch_name str

name of the new branch

required
max_ref_age_ms Optional[int]

max ref age in milliseconds

None
max_snapshot_age_ms Optional[int]

max age of snapshots to keep in milliseconds

None
min_snapshots_to_keep Optional[int]

min number of snapshots to keep for the branch

None

Returns: This for method chaining

Source code in pyiceberg/table/update/snapshot.py
def create_branch(
    self,
    snapshot_id: int,
    branch_name: str,
    max_ref_age_ms: int | None = None,
    max_snapshot_age_ms: int | None = None,
    min_snapshots_to_keep: int | None = None,
) -> ManageSnapshots:
    """
    Create a new branch pointing to the given snapshot id.

    Args:
        snapshot_id (int): snapshot id of existing snapshot at which the branch is created.
        branch_name (str): name of the new branch
        max_ref_age_ms (Optional[int]): max ref age in milliseconds
        max_snapshot_age_ms (Optional[int]): max age of snapshots to keep in milliseconds
        min_snapshots_to_keep (Optional[int]): min number of snapshots to keep for the branch
    Returns:
        This for method chaining
    """
    update, requirement = self._transaction._set_ref_snapshot(
        snapshot_id=snapshot_id,
        ref_name=branch_name,
        type=SnapshotRefType.BRANCH,
        max_ref_age_ms=max_ref_age_ms,
        max_snapshot_age_ms=max_snapshot_age_ms,
        min_snapshots_to_keep=min_snapshots_to_keep,
    )
    self._updates += update
    self._requirements += requirement
    return self

create_tag(snapshot_id, tag_name, max_ref_age_ms=None)

Create a new tag pointing to the given snapshot id.

Parameters:

Name Type Description Default
snapshot_id int

snapshot id of the existing snapshot to tag

required
tag_name str

name of the tag

required
max_ref_age_ms Optional[int]

max ref age in milliseconds

None

Returns:

Type Description
ManageSnapshots

This for method chaining

Source code in pyiceberg/table/update/snapshot.py
def create_tag(self, snapshot_id: int, tag_name: str, max_ref_age_ms: int | None = None) -> ManageSnapshots:
    """
    Create a new tag pointing to the given snapshot id.

    Args:
        snapshot_id (int): snapshot id of the existing snapshot to tag
        tag_name (str): name of the tag
        max_ref_age_ms (Optional[int]): max ref age in milliseconds

    Returns:
        This for method chaining
    """
    update, requirement = self._transaction._set_ref_snapshot(
        snapshot_id=snapshot_id,
        ref_name=tag_name,
        type=SnapshotRefType.TAG,
        max_ref_age_ms=max_ref_age_ms,
    )
    self._updates += update
    self._requirements += requirement
    return self

remove_branch(branch_name)

Remove a branch.

Parameters:

Name Type Description Default
branch_name str

name of branch to remove

required

Returns: This for method chaining

Source code in pyiceberg/table/update/snapshot.py
def remove_branch(self, branch_name: str) -> ManageSnapshots:
    """
    Remove a branch.

    Args:
        branch_name (str): name of branch to remove
    Returns:
        This for method chaining
    """
    return self._remove_ref_snapshot(ref_name=branch_name)

remove_tag(tag_name)

Remove a tag.

Parameters:

Name Type Description Default
tag_name str

name of tag to remove

required

Returns: This for method chaining

Source code in pyiceberg/table/update/snapshot.py
def remove_tag(self, tag_name: str) -> ManageSnapshots:
    """
    Remove a tag.

    Args:
        tag_name (str): name of tag to remove
    Returns:
        This for method chaining
    """
    return self._remove_ref_snapshot(ref_name=tag_name)

rollback_to_snapshot(snapshot_id)

Rollback the table to the given snapshot id.

The snapshot needs to be an ancestor of the current table state.

Parameters:

Name Type Description Default
snapshot_id int

rollback to this snapshot_id that used to be current.

required

Returns:

Type Description
ManageSnapshots

This for method chaining

Raises:

Type Description
ValueError

If the snapshot does not exist or is not an ancestor of the current table state.

Source code in pyiceberg/table/update/snapshot.py
def rollback_to_snapshot(self, snapshot_id: int) -> ManageSnapshots:
    """Rollback the table to the given snapshot id.

    The snapshot needs to be an ancestor of the current table state.

    Args:
        snapshot_id (int): rollback to this snapshot_id that used to be current.

    Returns:
        This for method chaining

    Raises:
        ValueError: If the snapshot does not exist or is not an ancestor of the current table state.
    """
    if not self._transaction.table_metadata.snapshot_by_id(snapshot_id):
        raise ValueError(f"Cannot roll back to unknown snapshot id: {snapshot_id}")

    if not self._is_current_ancestor(snapshot_id):
        raise ValueError(f"Cannot roll back to snapshot, not an ancestor of the current state: {snapshot_id}")

    return self.set_current_snapshot(snapshot_id=snapshot_id)

rollback_to_timestamp(timestamp_ms)

Rollback the table to the latest snapshot before the given timestamp.

Finds the latest ancestor snapshot whose timestamp is before the given timestamp and rolls back to it.

Parameters:

Name Type Description Default
timestamp_ms int

Rollback to the latest snapshot before this timestamp in milliseconds.

required

Returns:

Type Description
ManageSnapshots

This for method chaining

Raises:

Type Description
ValueError

If no valid snapshot exists older than the given timestamp.

Source code in pyiceberg/table/update/snapshot.py
def rollback_to_timestamp(self, timestamp_ms: int) -> ManageSnapshots:
    """Rollback the table to the latest snapshot before the given timestamp.

    Finds the latest ancestor snapshot whose timestamp is before the given timestamp and rolls back to it.

    Args:
        timestamp_ms: Rollback to the latest snapshot before this timestamp in milliseconds.

    Returns:
        This for method chaining

    Raises:
        ValueError: If no valid snapshot exists older than the given timestamp.
    """
    snapshot = latest_ancestor_before_timestamp(self._transaction.table_metadata, timestamp_ms)
    if snapshot is None:
        raise ValueError(f"Cannot roll back, no valid snapshot older than: {timestamp_ms}")

    return self.set_current_snapshot(snapshot_id=snapshot.snapshot_id)

set_current_snapshot(snapshot_id=None, ref_name=None)

Set the current snapshot to a specific snapshot ID or ref.

Parameters:

Name Type Description Default
snapshot_id int | None

The ID of the snapshot to set as current.

None
ref_name str | None

The snapshot reference (branch or tag) to set as current.

None

Returns:

Type Description
ManageSnapshots

This for method chaining.

Raises:

Type Description
ValueError

If neither or both arguments are provided, or if the snapshot/ref does not exist.

Source code in pyiceberg/table/update/snapshot.py
def set_current_snapshot(self, snapshot_id: int | None = None, ref_name: str | None = None) -> ManageSnapshots:
    """Set the current snapshot to a specific snapshot ID or ref.

    Args:
        snapshot_id: The ID of the snapshot to set as current.
        ref_name: The snapshot reference (branch or tag) to set as current.

    Returns:
        This for method chaining.

    Raises:
        ValueError: If neither or both arguments are provided, or if the snapshot/ref does not exist.
    """
    self._commit_if_ref_updates_exist()

    if (snapshot_id is None) == (ref_name is None):
        raise ValueError("Either snapshot_id or ref_name must be provided, not both")

    target_snapshot_id: int
    if snapshot_id is not None:
        target_snapshot_id = snapshot_id
    else:
        if ref_name not in self._transaction.table_metadata.refs:
            raise ValueError(f"Cannot find matching snapshot ID for ref: {ref_name}")
        target_snapshot_id = self._transaction.table_metadata.refs[ref_name].snapshot_id

    if self._transaction.table_metadata.snapshot_by_id(target_snapshot_id) is None:
        raise ValueError(f"Cannot set current snapshot to unknown snapshot id: {target_snapshot_id}")

    update, requirement = self._transaction._set_ref_snapshot(
        snapshot_id=target_snapshot_id,
        ref_name=MAIN_BRANCH,
        type=SnapshotRefType.BRANCH,
    )
    self._transaction._stage(update, requirement)
    return self