Skip to content

table

AddFileTask dataclass

Task with the parameters for adding a Parquet file as a DataFile.

Source code in pyiceberg/table/__init__.py
@dataclass(frozen=True)
class AddFileTask:
    """Task with the parameters for adding a Parquet file as a DataFile."""

    file_path: str
    partition_field_value: Record

CommitTableRequest

Bases: IcebergBaseModel

A pydantic BaseModel for a table commit request.

Source code in pyiceberg/table/__init__.py
class CommitTableRequest(IcebergBaseModel):
    """A pydantic BaseModel for a table commit request."""

    identifier: TableIdentifier = Field()
    requirements: Tuple[TableRequirement, ...] = Field(default_factory=tuple)
    updates: Tuple[TableUpdate, ...] = Field(default_factory=tuple)

CommitTableResponse

Bases: IcebergBaseModel

A pydantic BaseModel for a table commit response.

Source code in pyiceberg/table/__init__.py
class CommitTableResponse(IcebergBaseModel):
    """A pydantic BaseModel for a table commit response."""

    metadata: TableMetadata
    metadata_location: str = Field(alias="metadata-location")

CreateTableTransaction

Bases: Transaction

A transaction that involves the creation of a a new table.

Source code in pyiceberg/table/__init__.py
class CreateTableTransaction(Transaction):
    """A transaction that involves the creation of a a new table."""

    def _initial_changes(self, table_metadata: TableMetadata) -> None:
        """Set the initial changes that can reconstruct the initial table metadata when creating the CreateTableTransaction."""
        self._updates += (
            AssignUUIDUpdate(uuid=table_metadata.table_uuid),
            UpgradeFormatVersionUpdate(format_version=table_metadata.format_version),
        )

        schema: Schema = table_metadata.schema()
        self._updates += (
            AddSchemaUpdate(schema_=schema, last_column_id=schema.highest_field_id),
            SetCurrentSchemaUpdate(schema_id=-1),
        )

        spec: PartitionSpec = table_metadata.spec()
        if spec.is_unpartitioned():
            self._updates += (AddPartitionSpecUpdate(spec=UNPARTITIONED_PARTITION_SPEC),)
        else:
            self._updates += (AddPartitionSpecUpdate(spec=spec),)
        self._updates += (SetDefaultSpecUpdate(spec_id=-1),)

        sort_order: Optional[SortOrder] = table_metadata.sort_order_by_id(table_metadata.default_sort_order_id)
        if sort_order is None or sort_order.is_unsorted:
            self._updates += (AddSortOrderUpdate(sort_order=UNSORTED_SORT_ORDER),)
        else:
            self._updates += (AddSortOrderUpdate(sort_order=sort_order),)
        self._updates += (SetDefaultSortOrderUpdate(sort_order_id=-1),)

        self._updates += (
            SetLocationUpdate(location=table_metadata.location),
            SetPropertiesUpdate(updates=table_metadata.properties),
        )

    def __init__(self, table: StagedTable):
        super().__init__(table, autocommit=False)
        self._initial_changes(table.metadata)

    def commit_transaction(self) -> Table:
        """Commit the changes to the catalog.

        In the case of a CreateTableTransaction, the only requirement is AssertCreate.
        Returns:
            The table with the updates applied.
        """
        self._requirements = (AssertCreate(),)
        self._table._do_commit(  # pylint: disable=W0212
            updates=self._updates,
            requirements=self._requirements,
        )
        return self._table

commit_transaction()

Commit the changes to the catalog.

In the case of a CreateTableTransaction, the only requirement is AssertCreate. Returns: The table with the updates applied.

Source code in pyiceberg/table/__init__.py
def commit_transaction(self) -> Table:
    """Commit the changes to the catalog.

    In the case of a CreateTableTransaction, the only requirement is AssertCreate.
    Returns:
        The table with the updates applied.
    """
    self._requirements = (AssertCreate(),)
    self._table._do_commit(  # pylint: disable=W0212
        updates=self._updates,
        requirements=self._requirements,
    )
    return self._table

DataScan

Bases: TableScan

Source code in pyiceberg/table/__init__.py
class DataScan(TableScan):
    def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
        project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id])
        return project(self.row_filter)

    @cached_property
    def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
        return KeyDefaultDict(self._build_partition_projection)

    def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]:
        spec = self.table_metadata.specs()[spec_id]
        return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive)

    def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]:
        spec = self.table_metadata.specs()[spec_id]
        partition_type = spec.partition_type(self.table_metadata.schema())
        partition_schema = Schema(*partition_type.fields)
        partition_expr = self.partition_filters[spec_id]

        # The lambda created here is run in multiple threads.
        # So we avoid creating _EvaluatorExpression methods bound to a single
        # shared instance across multiple threads.
        return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition)

    def _check_sequence_number(self, min_sequence_number: int, manifest: ManifestFile) -> bool:
        """Ensure that no manifests are loaded that contain deletes that are older than the data.

        Args:
            min_sequence_number (int): The minimal sequence number.
            manifest (ManifestFile): A ManifestFile that can be either data or deletes.

        Returns:
            Boolean indicating if it is either a data file, or a relevant delete file.
        """
        return manifest.content == ManifestContent.DATA or (
            # Not interested in deletes that are older than the data
            manifest.content == ManifestContent.DELETES
            and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_sequence_number
        )

    def plan_files(self) -> Iterable[FileScanTask]:
        """Plans the relevant files by filtering on the PartitionSpecs.

        Returns:
            List of FileScanTasks that contain both data and delete files.
        """
        snapshot = self.snapshot()
        if not snapshot:
            return iter([])

        # step 1: filter manifests using partition summaries
        # the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id

        manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)

        manifests = [
            manifest_file
            for manifest_file in snapshot.manifests(self.io)
            if manifest_evaluators[manifest_file.partition_spec_id](manifest_file)
        ]

        # step 2: filter the data files in each manifest
        # this filter depends on the partition spec used to write the manifest file

        partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator)
        metrics_evaluator = _InclusiveMetricsEvaluator(
            self.table_metadata.schema(),
            self.row_filter,
            self.case_sensitive,
            strtobool(self.options.get("include_empty_files", "false")),
        ).eval

        min_sequence_number = _min_sequence_number(manifests)

        data_entries: List[ManifestEntry] = []
        positional_delete_entries = SortedList(key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER)

        executor = ExecutorFactory.get_or_create()
        for manifest_entry in chain(
            *executor.map(
                lambda args: _open_manifest(*args),
                [
                    (
                        self.io,
                        manifest,
                        partition_evaluators[manifest.partition_spec_id],
                        metrics_evaluator,
                    )
                    for manifest in manifests
                    if self._check_sequence_number(min_sequence_number, manifest)
                ],
            )
        ):
            data_file = manifest_entry.data_file
            if data_file.content == DataFileContent.DATA:
                data_entries.append(manifest_entry)
            elif data_file.content == DataFileContent.POSITION_DELETES:
                positional_delete_entries.add(manifest_entry)
            elif data_file.content == DataFileContent.EQUALITY_DELETES:
                raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568")
            else:
                raise ValueError(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}")

        return [
            FileScanTask(
                data_entry.data_file,
                delete_files=_match_deletes_to_data_file(
                    data_entry,
                    positional_delete_entries,
                ),
            )
            for data_entry in data_entries
        ]

    def to_arrow(self) -> pa.Table:
        """Read an Arrow table eagerly from this DataScan.

        All rows will be loaded into memory at once.

        Returns:
            pa.Table: Materialized Arrow Table from the Iceberg table's DataScan
        """
        from pyiceberg.io.pyarrow import ArrowScan

        return ArrowScan(
            self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
        ).to_table(self.plan_files())

    def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
        """Return an Arrow RecordBatchReader from this DataScan.

        For large results, using a RecordBatchReader requires less memory than
        loading an Arrow Table for the same DataScan, because a RecordBatch
        is read one at a time.

        Returns:
            pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan
                which can be used to read a stream of record batches one by one.
        """
        import pyarrow as pa

        from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow

        target_schema = schema_to_pyarrow(self.projection())
        batches = ArrowScan(
            self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
        ).to_record_batches(self.plan_files())

        return pa.RecordBatchReader.from_batches(
            target_schema,
            batches,
        )

    def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
        """Read a Pandas DataFrame eagerly from this Iceberg table.

        Returns:
            pd.DataFrame: Materialized Pandas Dataframe from the Iceberg table
        """
        return self.to_arrow().to_pandas(**kwargs)

    def to_duckdb(self, table_name: str, connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection:
        """Shorthand for loading the Iceberg Table in DuckDB.

        Returns:
            DuckDBPyConnection: In memory DuckDB connection with the Iceberg table.
        """
        import duckdb

        con = connection or duckdb.connect(database=":memory:")
        con.register(table_name, self.to_arrow())

        return con

    def to_ray(self) -> ray.data.dataset.Dataset:
        """Read a Ray Dataset eagerly from this Iceberg table.

        Returns:
            ray.data.dataset.Dataset: Materialized Ray Dataset from the Iceberg table
        """
        import ray

        return ray.data.from_arrow(self.to_arrow())

plan_files()

Plans the relevant files by filtering on the PartitionSpecs.

Returns:

Type Description
Iterable[FileScanTask]

List of FileScanTasks that contain both data and delete files.

Source code in pyiceberg/table/__init__.py
def plan_files(self) -> Iterable[FileScanTask]:
    """Plans the relevant files by filtering on the PartitionSpecs.

    Returns:
        List of FileScanTasks that contain both data and delete files.
    """
    snapshot = self.snapshot()
    if not snapshot:
        return iter([])

    # step 1: filter manifests using partition summaries
    # the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id

    manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)

    manifests = [
        manifest_file
        for manifest_file in snapshot.manifests(self.io)
        if manifest_evaluators[manifest_file.partition_spec_id](manifest_file)
    ]

    # step 2: filter the data files in each manifest
    # this filter depends on the partition spec used to write the manifest file

    partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator)
    metrics_evaluator = _InclusiveMetricsEvaluator(
        self.table_metadata.schema(),
        self.row_filter,
        self.case_sensitive,
        strtobool(self.options.get("include_empty_files", "false")),
    ).eval

    min_sequence_number = _min_sequence_number(manifests)

    data_entries: List[ManifestEntry] = []
    positional_delete_entries = SortedList(key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER)

    executor = ExecutorFactory.get_or_create()
    for manifest_entry in chain(
        *executor.map(
            lambda args: _open_manifest(*args),
            [
                (
                    self.io,
                    manifest,
                    partition_evaluators[manifest.partition_spec_id],
                    metrics_evaluator,
                )
                for manifest in manifests
                if self._check_sequence_number(min_sequence_number, manifest)
            ],
        )
    ):
        data_file = manifest_entry.data_file
        if data_file.content == DataFileContent.DATA:
            data_entries.append(manifest_entry)
        elif data_file.content == DataFileContent.POSITION_DELETES:
            positional_delete_entries.add(manifest_entry)
        elif data_file.content == DataFileContent.EQUALITY_DELETES:
            raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568")
        else:
            raise ValueError(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}")

    return [
        FileScanTask(
            data_entry.data_file,
            delete_files=_match_deletes_to_data_file(
                data_entry,
                positional_delete_entries,
            ),
        )
        for data_entry in data_entries
    ]

to_arrow()

Read an Arrow table eagerly from this DataScan.

All rows will be loaded into memory at once.

Returns:

Type Description
Table

pa.Table: Materialized Arrow Table from the Iceberg table's DataScan

Source code in pyiceberg/table/__init__.py
def to_arrow(self) -> pa.Table:
    """Read an Arrow table eagerly from this DataScan.

    All rows will be loaded into memory at once.

    Returns:
        pa.Table: Materialized Arrow Table from the Iceberg table's DataScan
    """
    from pyiceberg.io.pyarrow import ArrowScan

    return ArrowScan(
        self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
    ).to_table(self.plan_files())

to_arrow_batch_reader()

Return an Arrow RecordBatchReader from this DataScan.

For large results, using a RecordBatchReader requires less memory than loading an Arrow Table for the same DataScan, because a RecordBatch is read one at a time.

Returns:

Type Description
RecordBatchReader

pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan which can be used to read a stream of record batches one by one.

Source code in pyiceberg/table/__init__.py
def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
    """Return an Arrow RecordBatchReader from this DataScan.

    For large results, using a RecordBatchReader requires less memory than
    loading an Arrow Table for the same DataScan, because a RecordBatch
    is read one at a time.

    Returns:
        pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan
            which can be used to read a stream of record batches one by one.
    """
    import pyarrow as pa

    from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow

    target_schema = schema_to_pyarrow(self.projection())
    batches = ArrowScan(
        self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
    ).to_record_batches(self.plan_files())

    return pa.RecordBatchReader.from_batches(
        target_schema,
        batches,
    )

to_duckdb(table_name, connection=None)

Shorthand for loading the Iceberg Table in DuckDB.

Returns:

Name Type Description
DuckDBPyConnection DuckDBPyConnection

In memory DuckDB connection with the Iceberg table.

Source code in pyiceberg/table/__init__.py
def to_duckdb(self, table_name: str, connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection:
    """Shorthand for loading the Iceberg Table in DuckDB.

    Returns:
        DuckDBPyConnection: In memory DuckDB connection with the Iceberg table.
    """
    import duckdb

    con = connection or duckdb.connect(database=":memory:")
    con.register(table_name, self.to_arrow())

    return con

to_pandas(**kwargs)

Read a Pandas DataFrame eagerly from this Iceberg table.

Returns:

Type Description
DataFrame

pd.DataFrame: Materialized Pandas Dataframe from the Iceberg table

Source code in pyiceberg/table/__init__.py
def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
    """Read a Pandas DataFrame eagerly from this Iceberg table.

    Returns:
        pd.DataFrame: Materialized Pandas Dataframe from the Iceberg table
    """
    return self.to_arrow().to_pandas(**kwargs)

to_ray()

Read a Ray Dataset eagerly from this Iceberg table.

Returns:

Type Description
Dataset

ray.data.dataset.Dataset: Materialized Ray Dataset from the Iceberg table

Source code in pyiceberg/table/__init__.py
def to_ray(self) -> ray.data.dataset.Dataset:
    """Read a Ray Dataset eagerly from this Iceberg table.

    Returns:
        ray.data.dataset.Dataset: Materialized Ray Dataset from the Iceberg table
    """
    import ray

    return ray.data.from_arrow(self.to_arrow())

FileScanTask dataclass

Bases: ScanTask

Task representing a data file and its corresponding delete files.

Source code in pyiceberg/table/__init__.py
@dataclass(init=False)
class FileScanTask(ScanTask):
    """Task representing a data file and its corresponding delete files."""

    file: DataFile
    delete_files: Set[DataFile]
    start: int
    length: int

    def __init__(
        self,
        data_file: DataFile,
        delete_files: Optional[Set[DataFile]] = None,
        start: Optional[int] = None,
        length: Optional[int] = None,
    ) -> None:
        self.file = data_file
        self.delete_files = delete_files or set()
        self.start = start or 0
        self.length = length or data_file.file_size_in_bytes

Namespace

Bases: IcebergRootModel[List[str]]

Reference to one or more levels of a namespace.

Source code in pyiceberg/table/__init__.py
class Namespace(IcebergRootModel[List[str]]):
    """Reference to one or more levels of a namespace."""

    root: List[str] = Field(
        ...,
        description="Reference to one or more levels of a namespace",
    )

StaticTable

Bases: Table

Load a table directly from a metadata file (i.e., without using a catalog).

Source code in pyiceberg/table/__init__.py
class StaticTable(Table):
    """Load a table directly from a metadata file (i.e., without using a catalog)."""

    def refresh(self) -> Table:
        """Refresh the current table metadata."""
        raise NotImplementedError("To be implemented")

    @classmethod
    def from_metadata(cls, metadata_location: str, properties: Properties = EMPTY_DICT) -> StaticTable:
        io = load_file_io(properties=properties, location=metadata_location)
        file = io.new_input(metadata_location)

        from pyiceberg.serializers import FromInputFile

        metadata = FromInputFile.table_metadata(file)

        from pyiceberg.catalog.noop import NoopCatalog

        return cls(
            identifier=("static-table", metadata_location),
            metadata_location=metadata_location,
            metadata=metadata,
            io=load_file_io({**properties, **metadata.properties}),
            catalog=NoopCatalog("static-table"),
        )

refresh()

Refresh the current table metadata.

Source code in pyiceberg/table/__init__.py
def refresh(self) -> Table:
    """Refresh the current table metadata."""
    raise NotImplementedError("To be implemented")

Table

An Iceberg table.

Source code in pyiceberg/table/__init__.py
class Table:
    """An Iceberg table."""

    _identifier: Identifier = Field()
    metadata: TableMetadata
    metadata_location: str = Field()
    io: FileIO
    catalog: Catalog
    config: Dict[str, str]

    def __init__(
        self,
        identifier: Identifier,
        metadata: TableMetadata,
        metadata_location: str,
        io: FileIO,
        catalog: Catalog,
        config: Dict[str, str] = EMPTY_DICT,
    ) -> None:
        self._identifier = identifier
        self.metadata = metadata
        self.metadata_location = metadata_location
        self.io = io
        self.catalog = catalog
        self.config = config

    def transaction(self) -> Transaction:
        """Create a new transaction object to first stage the changes, and then commit them to the catalog.

        Returns:
            The transaction object
        """
        return Transaction(self)

    @property
    def inspect(self) -> InspectTable:
        """Return the InspectTable object to browse the table metadata.

        Returns:
            InspectTable object based on this Table.
        """
        return InspectTable(self)

    def refresh(self) -> Table:
        """Refresh the current table metadata.

        Returns:
            An updated instance of the same Iceberg table
        """
        fresh = self.catalog.load_table(self._identifier)
        self.metadata = fresh.metadata
        self.io = fresh.io
        self.metadata_location = fresh.metadata_location
        return self

    @property
    def identifier(self) -> Identifier:
        """Return the identifier of this table.

        Returns:
            An Identifier tuple of the table name
        """
        deprecation_message(
            deprecated_in="0.8.0",
            removed_in="0.9.0",
            help_message="Table.identifier property is deprecated. Please use Table.name() function instead.",
        )
        return (self.catalog.name,) + self._identifier

    def name(self) -> Identifier:
        """Return the identifier of this table.

        Returns:
            An Identifier tuple of the table name
        """
        return self.identifier

    def scan(
        self,
        row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE,
        selected_fields: Tuple[str, ...] = ("*",),
        case_sensitive: bool = True,
        snapshot_id: Optional[int] = None,
        options: Properties = EMPTY_DICT,
        limit: Optional[int] = None,
    ) -> DataScan:
        """Fetch a DataScan based on the table's current metadata.

            The data scan can be used to project the table's data
            that matches the provided row_filter onto the table's
            current schema.

        Args:
            row_filter:
                A string or BooleanExpression that decsribes the
                desired rows
            selected_fields:
                A tuple of strings representing the column names
                to return in the output dataframe.
            case_sensitive:
                If True column matching is case sensitive
            snapshot_id:
                Optional Snapshot ID to time travel to. If None,
                scans the table as of the current snapshot ID.
            options:
                Additional Table properties as a dictionary of
                string key value pairs to use for this scan.
            limit:
                An integer representing the number of rows to
                return in the scan result. If None, fetches all
                matching rows.

        Returns:
            A DataScan based on the table's current metadata.
        """
        return DataScan(
            table_metadata=self.metadata,
            io=self.io,
            row_filter=row_filter,
            selected_fields=selected_fields,
            case_sensitive=case_sensitive,
            snapshot_id=snapshot_id,
            options=options,
            limit=limit,
        )

    @property
    def format_version(self) -> TableVersion:
        return self.metadata.format_version

    def schema(self) -> Schema:
        """Return the schema for this table."""
        return next(schema for schema in self.metadata.schemas if schema.schema_id == self.metadata.current_schema_id)

    def schemas(self) -> Dict[int, Schema]:
        """Return a dict of the schema of this table."""
        return {schema.schema_id: schema for schema in self.metadata.schemas}

    def spec(self) -> PartitionSpec:
        """Return the partition spec of this table."""
        return next(spec for spec in self.metadata.partition_specs if spec.spec_id == self.metadata.default_spec_id)

    def specs(self) -> Dict[int, PartitionSpec]:
        """Return a dict the partition specs this table."""
        return {spec.spec_id: spec for spec in self.metadata.partition_specs}

    def sort_order(self) -> SortOrder:
        """Return the sort order of this table."""
        return next(
            sort_order for sort_order in self.metadata.sort_orders if sort_order.order_id == self.metadata.default_sort_order_id
        )

    def sort_orders(self) -> Dict[int, SortOrder]:
        """Return a dict of the sort orders of this table."""
        return {sort_order.order_id: sort_order for sort_order in self.metadata.sort_orders}

    def last_partition_id(self) -> int:
        """Return the highest assigned partition field ID across all specs or 999 if only the unpartitioned spec exists."""
        if self.metadata.last_partition_id:
            return self.metadata.last_partition_id
        return PARTITION_FIELD_ID_START - 1

    @property
    def properties(self) -> Dict[str, str]:
        """Properties of the table."""
        return self.metadata.properties

    def location(self) -> str:
        """Return the table's base location."""
        return self.metadata.location

    @property
    def last_sequence_number(self) -> int:
        return self.metadata.last_sequence_number

    def current_snapshot(self) -> Optional[Snapshot]:
        """Get the current snapshot for this table, or None if there is no current snapshot."""
        if self.metadata.current_snapshot_id is not None:
            return self.snapshot_by_id(self.metadata.current_snapshot_id)
        return None

    def snapshots(self) -> List[Snapshot]:
        return self.metadata.snapshots

    def snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]:
        """Get the snapshot of this table with the given id, or None if there is no matching snapshot."""
        return self.metadata.snapshot_by_id(snapshot_id)

    def snapshot_by_name(self, name: str) -> Optional[Snapshot]:
        """Return the snapshot referenced by the given name or null if no such reference exists."""
        if ref := self.metadata.refs.get(name):
            return self.snapshot_by_id(ref.snapshot_id)
        return None

    def snapshot_as_of_timestamp(self, timestamp_ms: int, inclusive: bool = True) -> Optional[Snapshot]:
        """Get the snapshot that was current as of or right before the given timestamp, or None if there is no matching snapshot.

        Args:
            timestamp_ms: Find snapshot that was current at/before this timestamp
            inclusive: Includes timestamp_ms in search when True. Excludes timestamp_ms when False
        """
        for log_entry in reversed(self.history()):
            if (inclusive and log_entry.timestamp_ms <= timestamp_ms) or log_entry.timestamp_ms < timestamp_ms:
                return self.snapshot_by_id(log_entry.snapshot_id)
        return None

    def history(self) -> List[SnapshotLogEntry]:
        """Get the snapshot history of this table."""
        return self.metadata.snapshot_log

    def manage_snapshots(self) -> ManageSnapshots:
        """
        Shorthand to run snapshot management operations like create branch, create tag, etc.

        Use table.manage_snapshots().<operation>().commit() to run a specific operation.
        Use table.manage_snapshots().<operation-one>().<operation-two>().commit() to run multiple operations.
        Pending changes are applied on commit.

        We can also use context managers to make more changes. For example,

        with table.manage_snapshots() as ms:
           ms.create_tag(snapshot_id1, "Tag_A").create_tag(snapshot_id2, "Tag_B")
        """
        return ManageSnapshots(transaction=Transaction(self, autocommit=True))

    def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema:
        """Create a new UpdateSchema to alter the columns of this table.

        Args:
            allow_incompatible_changes: If changes are allowed that might break downstream consumers.
            case_sensitive: If field names are case-sensitive.

        Returns:
            A new UpdateSchema.
        """
        return UpdateSchema(
            transaction=Transaction(self, autocommit=True),
            allow_incompatible_changes=allow_incompatible_changes,
            case_sensitive=case_sensitive,
            name_mapping=self.name_mapping(),
        )

    def name_mapping(self) -> Optional[NameMapping]:
        """Return the table's field-id NameMapping."""
        return self.metadata.name_mapping()

    def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
        """
        Shorthand API for appending a PyArrow table to the table.

        Args:
            df: The Arrow dataframe that will be appended to overwrite the table
            snapshot_properties: Custom properties to be added to the snapshot summary
        """
        with self.transaction() as tx:
            tx.append(df=df, snapshot_properties=snapshot_properties)

    def overwrite(
        self,
        df: pa.Table,
        overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
        snapshot_properties: Dict[str, str] = EMPTY_DICT,
    ) -> None:
        """
        Shorthand for overwriting the table with a PyArrow table.

        An overwrite may produce zero or more snapshots based on the operation:

            - DELETE: In case existing Parquet files can be dropped completely.
            - REPLACE: In case existing Parquet files need to be rewritten.
            - APPEND: In case new data is being inserted into the table.

        Args:
            df: The Arrow dataframe that will be used to overwrite the table
            overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
                              or a boolean expression in case of a partial overwrite
            snapshot_properties: Custom properties to be added to the snapshot summary
        """
        with self.transaction() as tx:
            tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties)

    def delete(
        self, delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT
    ) -> None:
        """
        Shorthand for deleting rows from the table.

        Args:
            delete_filter: The predicate that used to remove rows
            snapshot_properties: Custom properties to be added to the snapshot summary
        """
        with self.transaction() as tx:
            tx.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties)

    def add_files(
        self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True
    ) -> None:
        """
        Shorthand API for adding files as data files to the table.

        Args:
            file_paths: The list of full file paths to be added as data files to the table

        Raises:
            FileNotFoundError: If the file does not exist.
        """
        with self.transaction() as tx:
            tx.add_files(
                file_paths=file_paths, snapshot_properties=snapshot_properties, check_duplicate_files=check_duplicate_files
            )

    def update_spec(self, case_sensitive: bool = True) -> UpdateSpec:
        return UpdateSpec(Transaction(self, autocommit=True), case_sensitive=case_sensitive)

    def refs(self) -> Dict[str, SnapshotRef]:
        """Return the snapshot references in the table."""
        return self.metadata.refs

    def _do_commit(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...]) -> None:
        response = self.catalog.commit_table(self, requirements, updates)
        self.metadata = response.metadata
        self.metadata_location = response.metadata_location

    def __eq__(self, other: Any) -> bool:
        """Return the equality of two instances of the Table class."""
        return (
            self.name() == other.name() and self.metadata == other.metadata and self.metadata_location == other.metadata_location
            if isinstance(other, Table)
            else False
        )

    def __repr__(self) -> str:
        """Return the string representation of the Table class."""
        table_name = self.catalog.table_name_from(self._identifier)
        schema_str = ",\n  ".join(str(column) for column in self.schema().columns if self.schema())
        partition_str = f"partition by: [{', '.join(field.name for field in self.spec().fields if self.spec())}]"
        sort_order_str = f"sort order: [{', '.join(str(field) for field in self.sort_order().fields if self.sort_order())}]"
        snapshot_str = f"snapshot: {str(self.current_snapshot()) if self.current_snapshot() else 'null'}"
        result_str = f"{table_name}(\n  {schema_str}\n),\n{partition_str},\n{sort_order_str},\n{snapshot_str}"
        return result_str

    def to_daft(self) -> daft.DataFrame:
        """Read a Daft DataFrame lazily from this Iceberg table.

        Returns:
            daft.DataFrame: Unmaterialized Daft Dataframe created from the Iceberg table
        """
        import daft

        return daft.read_iceberg(self)

identifier: Identifier property

Return the identifier of this table.

Returns:

Type Description
Identifier

An Identifier tuple of the table name

inspect: InspectTable property

Return the InspectTable object to browse the table metadata.

Returns:

Type Description
InspectTable

InspectTable object based on this Table.

properties: Dict[str, str] property

Properties of the table.

__eq__(other)

Return the equality of two instances of the Table class.

Source code in pyiceberg/table/__init__.py
def __eq__(self, other: Any) -> bool:
    """Return the equality of two instances of the Table class."""
    return (
        self.name() == other.name() and self.metadata == other.metadata and self.metadata_location == other.metadata_location
        if isinstance(other, Table)
        else False
    )

__repr__()

Return the string representation of the Table class.

Source code in pyiceberg/table/__init__.py
def __repr__(self) -> str:
    """Return the string representation of the Table class."""
    table_name = self.catalog.table_name_from(self._identifier)
    schema_str = ",\n  ".join(str(column) for column in self.schema().columns if self.schema())
    partition_str = f"partition by: [{', '.join(field.name for field in self.spec().fields if self.spec())}]"
    sort_order_str = f"sort order: [{', '.join(str(field) for field in self.sort_order().fields if self.sort_order())}]"
    snapshot_str = f"snapshot: {str(self.current_snapshot()) if self.current_snapshot() else 'null'}"
    result_str = f"{table_name}(\n  {schema_str}\n),\n{partition_str},\n{sort_order_str},\n{snapshot_str}"
    return result_str

add_files(file_paths, snapshot_properties=EMPTY_DICT, check_duplicate_files=True)

Shorthand API for adding files as data files to the table.

Parameters:

Name Type Description Default
file_paths List[str]

The list of full file paths to be added as data files to the table

required

Raises:

Type Description
FileNotFoundError

If the file does not exist.

Source code in pyiceberg/table/__init__.py
def add_files(
    self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True
) -> None:
    """
    Shorthand API for adding files as data files to the table.

    Args:
        file_paths: The list of full file paths to be added as data files to the table

    Raises:
        FileNotFoundError: If the file does not exist.
    """
    with self.transaction() as tx:
        tx.add_files(
            file_paths=file_paths, snapshot_properties=snapshot_properties, check_duplicate_files=check_duplicate_files
        )

append(df, snapshot_properties=EMPTY_DICT)

Shorthand API for appending a PyArrow table to the table.

Parameters:

Name Type Description Default
df Table

The Arrow dataframe that will be appended to overwrite the table

required
snapshot_properties Dict[str, str]

Custom properties to be added to the snapshot summary

EMPTY_DICT
Source code in pyiceberg/table/__init__.py
def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
    """
    Shorthand API for appending a PyArrow table to the table.

    Args:
        df: The Arrow dataframe that will be appended to overwrite the table
        snapshot_properties: Custom properties to be added to the snapshot summary
    """
    with self.transaction() as tx:
        tx.append(df=df, snapshot_properties=snapshot_properties)

current_snapshot()

Get the current snapshot for this table, or None if there is no current snapshot.

Source code in pyiceberg/table/__init__.py
def current_snapshot(self) -> Optional[Snapshot]:
    """Get the current snapshot for this table, or None if there is no current snapshot."""
    if self.metadata.current_snapshot_id is not None:
        return self.snapshot_by_id(self.metadata.current_snapshot_id)
    return None

delete(delete_filter=ALWAYS_TRUE, snapshot_properties=EMPTY_DICT)

Shorthand for deleting rows from the table.

Parameters:

Name Type Description Default
delete_filter Union[BooleanExpression, str]

The predicate that used to remove rows

ALWAYS_TRUE
snapshot_properties Dict[str, str]

Custom properties to be added to the snapshot summary

EMPTY_DICT
Source code in pyiceberg/table/__init__.py
def delete(
    self, delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT
) -> None:
    """
    Shorthand for deleting rows from the table.

    Args:
        delete_filter: The predicate that used to remove rows
        snapshot_properties: Custom properties to be added to the snapshot summary
    """
    with self.transaction() as tx:
        tx.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties)

history()

Get the snapshot history of this table.

Source code in pyiceberg/table/__init__.py
def history(self) -> List[SnapshotLogEntry]:
    """Get the snapshot history of this table."""
    return self.metadata.snapshot_log

last_partition_id()

Return the highest assigned partition field ID across all specs or 999 if only the unpartitioned spec exists.

Source code in pyiceberg/table/__init__.py
def last_partition_id(self) -> int:
    """Return the highest assigned partition field ID across all specs or 999 if only the unpartitioned spec exists."""
    if self.metadata.last_partition_id:
        return self.metadata.last_partition_id
    return PARTITION_FIELD_ID_START - 1

location()

Return the table's base location.

Source code in pyiceberg/table/__init__.py
def location(self) -> str:
    """Return the table's base location."""
    return self.metadata.location

manage_snapshots()

Shorthand to run snapshot management operations like create branch, create tag, etc.

Use table.manage_snapshots().().commit() to run a specific operation. Use table.manage_snapshots().().().commit() to run multiple operations. Pending changes are applied on commit.

We can also use context managers to make more changes. For example,

with table.manage_snapshots() as ms: ms.create_tag(snapshot_id1, "Tag_A").create_tag(snapshot_id2, "Tag_B")

Source code in pyiceberg/table/__init__.py
def manage_snapshots(self) -> ManageSnapshots:
    """
    Shorthand to run snapshot management operations like create branch, create tag, etc.

    Use table.manage_snapshots().<operation>().commit() to run a specific operation.
    Use table.manage_snapshots().<operation-one>().<operation-two>().commit() to run multiple operations.
    Pending changes are applied on commit.

    We can also use context managers to make more changes. For example,

    with table.manage_snapshots() as ms:
       ms.create_tag(snapshot_id1, "Tag_A").create_tag(snapshot_id2, "Tag_B")
    """
    return ManageSnapshots(transaction=Transaction(self, autocommit=True))

name()

Return the identifier of this table.

Returns:

Type Description
Identifier

An Identifier tuple of the table name

Source code in pyiceberg/table/__init__.py
def name(self) -> Identifier:
    """Return the identifier of this table.

    Returns:
        An Identifier tuple of the table name
    """
    return self.identifier

name_mapping()

Return the table's field-id NameMapping.

Source code in pyiceberg/table/__init__.py
def name_mapping(self) -> Optional[NameMapping]:
    """Return the table's field-id NameMapping."""
    return self.metadata.name_mapping()

overwrite(df, overwrite_filter=ALWAYS_TRUE, snapshot_properties=EMPTY_DICT)

Shorthand for overwriting the table with a PyArrow table.

An overwrite may produce zero or more snapshots based on the operation:

- DELETE: In case existing Parquet files can be dropped completely.
- REPLACE: In case existing Parquet files need to be rewritten.
- APPEND: In case new data is being inserted into the table.

Parameters:

Name Type Description Default
df Table

The Arrow dataframe that will be used to overwrite the table

required
overwrite_filter Union[BooleanExpression, str]

ALWAYS_TRUE when you overwrite all the data, or a boolean expression in case of a partial overwrite

ALWAYS_TRUE
snapshot_properties Dict[str, str]

Custom properties to be added to the snapshot summary

EMPTY_DICT
Source code in pyiceberg/table/__init__.py
def overwrite(
    self,
    df: pa.Table,
    overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
    snapshot_properties: Dict[str, str] = EMPTY_DICT,
) -> None:
    """
    Shorthand for overwriting the table with a PyArrow table.

    An overwrite may produce zero or more snapshots based on the operation:

        - DELETE: In case existing Parquet files can be dropped completely.
        - REPLACE: In case existing Parquet files need to be rewritten.
        - APPEND: In case new data is being inserted into the table.

    Args:
        df: The Arrow dataframe that will be used to overwrite the table
        overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
                          or a boolean expression in case of a partial overwrite
        snapshot_properties: Custom properties to be added to the snapshot summary
    """
    with self.transaction() as tx:
        tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties)

refresh()

Refresh the current table metadata.

Returns:

Type Description
Table

An updated instance of the same Iceberg table

Source code in pyiceberg/table/__init__.py
def refresh(self) -> Table:
    """Refresh the current table metadata.

    Returns:
        An updated instance of the same Iceberg table
    """
    fresh = self.catalog.load_table(self._identifier)
    self.metadata = fresh.metadata
    self.io = fresh.io
    self.metadata_location = fresh.metadata_location
    return self

refs()

Return the snapshot references in the table.

Source code in pyiceberg/table/__init__.py
def refs(self) -> Dict[str, SnapshotRef]:
    """Return the snapshot references in the table."""
    return self.metadata.refs

scan(row_filter=ALWAYS_TRUE, selected_fields=('*'), case_sensitive=True, snapshot_id=None, options=EMPTY_DICT, limit=None)

Fetch a DataScan based on the table's current metadata.

The data scan can be used to project the table's data
that matches the provided row_filter onto the table's
current schema.

Parameters:

Name Type Description Default
row_filter Union[str, BooleanExpression]

A string or BooleanExpression that decsribes the desired rows

ALWAYS_TRUE
selected_fields Tuple[str, ...]

A tuple of strings representing the column names to return in the output dataframe.

('*')
case_sensitive bool

If True column matching is case sensitive

True
snapshot_id Optional[int]

Optional Snapshot ID to time travel to. If None, scans the table as of the current snapshot ID.

None
options Properties

Additional Table properties as a dictionary of string key value pairs to use for this scan.

EMPTY_DICT
limit Optional[int]

An integer representing the number of rows to return in the scan result. If None, fetches all matching rows.

None

Returns:

Type Description
DataScan

A DataScan based on the table's current metadata.

Source code in pyiceberg/table/__init__.py
def scan(
    self,
    row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE,
    selected_fields: Tuple[str, ...] = ("*",),
    case_sensitive: bool = True,
    snapshot_id: Optional[int] = None,
    options: Properties = EMPTY_DICT,
    limit: Optional[int] = None,
) -> DataScan:
    """Fetch a DataScan based on the table's current metadata.

        The data scan can be used to project the table's data
        that matches the provided row_filter onto the table's
        current schema.

    Args:
        row_filter:
            A string or BooleanExpression that decsribes the
            desired rows
        selected_fields:
            A tuple of strings representing the column names
            to return in the output dataframe.
        case_sensitive:
            If True column matching is case sensitive
        snapshot_id:
            Optional Snapshot ID to time travel to. If None,
            scans the table as of the current snapshot ID.
        options:
            Additional Table properties as a dictionary of
            string key value pairs to use for this scan.
        limit:
            An integer representing the number of rows to
            return in the scan result. If None, fetches all
            matching rows.

    Returns:
        A DataScan based on the table's current metadata.
    """
    return DataScan(
        table_metadata=self.metadata,
        io=self.io,
        row_filter=row_filter,
        selected_fields=selected_fields,
        case_sensitive=case_sensitive,
        snapshot_id=snapshot_id,
        options=options,
        limit=limit,
    )

schema()

Return the schema for this table.

Source code in pyiceberg/table/__init__.py
def schema(self) -> Schema:
    """Return the schema for this table."""
    return next(schema for schema in self.metadata.schemas if schema.schema_id == self.metadata.current_schema_id)

schemas()

Return a dict of the schema of this table.

Source code in pyiceberg/table/__init__.py
def schemas(self) -> Dict[int, Schema]:
    """Return a dict of the schema of this table."""
    return {schema.schema_id: schema for schema in self.metadata.schemas}

snapshot_as_of_timestamp(timestamp_ms, inclusive=True)

Get the snapshot that was current as of or right before the given timestamp, or None if there is no matching snapshot.

Parameters:

Name Type Description Default
timestamp_ms int

Find snapshot that was current at/before this timestamp

required
inclusive bool

Includes timestamp_ms in search when True. Excludes timestamp_ms when False

True
Source code in pyiceberg/table/__init__.py
def snapshot_as_of_timestamp(self, timestamp_ms: int, inclusive: bool = True) -> Optional[Snapshot]:
    """Get the snapshot that was current as of or right before the given timestamp, or None if there is no matching snapshot.

    Args:
        timestamp_ms: Find snapshot that was current at/before this timestamp
        inclusive: Includes timestamp_ms in search when True. Excludes timestamp_ms when False
    """
    for log_entry in reversed(self.history()):
        if (inclusive and log_entry.timestamp_ms <= timestamp_ms) or log_entry.timestamp_ms < timestamp_ms:
            return self.snapshot_by_id(log_entry.snapshot_id)
    return None

snapshot_by_id(snapshot_id)

Get the snapshot of this table with the given id, or None if there is no matching snapshot.

Source code in pyiceberg/table/__init__.py
def snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]:
    """Get the snapshot of this table with the given id, or None if there is no matching snapshot."""
    return self.metadata.snapshot_by_id(snapshot_id)

snapshot_by_name(name)

Return the snapshot referenced by the given name or null if no such reference exists.

Source code in pyiceberg/table/__init__.py
def snapshot_by_name(self, name: str) -> Optional[Snapshot]:
    """Return the snapshot referenced by the given name or null if no such reference exists."""
    if ref := self.metadata.refs.get(name):
        return self.snapshot_by_id(ref.snapshot_id)
    return None

sort_order()

Return the sort order of this table.

Source code in pyiceberg/table/__init__.py
def sort_order(self) -> SortOrder:
    """Return the sort order of this table."""
    return next(
        sort_order for sort_order in self.metadata.sort_orders if sort_order.order_id == self.metadata.default_sort_order_id
    )

sort_orders()

Return a dict of the sort orders of this table.

Source code in pyiceberg/table/__init__.py
def sort_orders(self) -> Dict[int, SortOrder]:
    """Return a dict of the sort orders of this table."""
    return {sort_order.order_id: sort_order for sort_order in self.metadata.sort_orders}

spec()

Return the partition spec of this table.

Source code in pyiceberg/table/__init__.py
def spec(self) -> PartitionSpec:
    """Return the partition spec of this table."""
    return next(spec for spec in self.metadata.partition_specs if spec.spec_id == self.metadata.default_spec_id)

specs()

Return a dict the partition specs this table.

Source code in pyiceberg/table/__init__.py
def specs(self) -> Dict[int, PartitionSpec]:
    """Return a dict the partition specs this table."""
    return {spec.spec_id: spec for spec in self.metadata.partition_specs}

to_daft()

Read a Daft DataFrame lazily from this Iceberg table.

Returns:

Type Description
DataFrame

daft.DataFrame: Unmaterialized Daft Dataframe created from the Iceberg table

Source code in pyiceberg/table/__init__.py
def to_daft(self) -> daft.DataFrame:
    """Read a Daft DataFrame lazily from this Iceberg table.

    Returns:
        daft.DataFrame: Unmaterialized Daft Dataframe created from the Iceberg table
    """
    import daft

    return daft.read_iceberg(self)

transaction()

Create a new transaction object to first stage the changes, and then commit them to the catalog.

Returns:

Type Description
Transaction

The transaction object

Source code in pyiceberg/table/__init__.py
def transaction(self) -> Transaction:
    """Create a new transaction object to first stage the changes, and then commit them to the catalog.

    Returns:
        The transaction object
    """
    return Transaction(self)

update_schema(allow_incompatible_changes=False, case_sensitive=True)

Create a new UpdateSchema to alter the columns of this table.

Parameters:

Name Type Description Default
allow_incompatible_changes bool

If changes are allowed that might break downstream consumers.

False
case_sensitive bool

If field names are case-sensitive.

True

Returns:

Type Description
UpdateSchema

A new UpdateSchema.

Source code in pyiceberg/table/__init__.py
def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema:
    """Create a new UpdateSchema to alter the columns of this table.

    Args:
        allow_incompatible_changes: If changes are allowed that might break downstream consumers.
        case_sensitive: If field names are case-sensitive.

    Returns:
        A new UpdateSchema.
    """
    return UpdateSchema(
        transaction=Transaction(self, autocommit=True),
        allow_incompatible_changes=allow_incompatible_changes,
        case_sensitive=case_sensitive,
        name_mapping=self.name_mapping(),
    )

TableIdentifier

Bases: IcebergBaseModel

Fully Qualified identifier to a table.

Source code in pyiceberg/table/__init__.py
class TableIdentifier(IcebergBaseModel):
    """Fully Qualified identifier to a table."""

    namespace: Namespace
    name: str

TableScan

Bases: ABC

Source code in pyiceberg/table/__init__.py
class TableScan(ABC):
    table_metadata: TableMetadata
    io: FileIO
    row_filter: BooleanExpression
    selected_fields: Tuple[str, ...]
    case_sensitive: bool
    snapshot_id: Optional[int]
    options: Properties
    limit: Optional[int]

    def __init__(
        self,
        table_metadata: TableMetadata,
        io: FileIO,
        row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE,
        selected_fields: Tuple[str, ...] = ("*",),
        case_sensitive: bool = True,
        snapshot_id: Optional[int] = None,
        options: Properties = EMPTY_DICT,
        limit: Optional[int] = None,
    ):
        self.table_metadata = table_metadata
        self.io = io
        self.row_filter = _parse_row_filter(row_filter)
        self.selected_fields = selected_fields
        self.case_sensitive = case_sensitive
        self.snapshot_id = snapshot_id
        self.options = options
        self.limit = limit

    def snapshot(self) -> Optional[Snapshot]:
        if self.snapshot_id:
            return self.table_metadata.snapshot_by_id(self.snapshot_id)
        return self.table_metadata.current_snapshot()

    def projection(self) -> Schema:
        current_schema = self.table_metadata.schema()
        if self.snapshot_id is not None:
            snapshot = self.table_metadata.snapshot_by_id(self.snapshot_id)
            if snapshot is not None:
                if snapshot.schema_id is not None:
                    try:
                        current_schema = next(
                            schema for schema in self.table_metadata.schemas if schema.schema_id == snapshot.schema_id
                        )
                    except StopIteration:
                        warnings.warn(f"Metadata does not contain schema with id: {snapshot.schema_id}")
            else:
                raise ValueError(f"Snapshot not found: {self.snapshot_id}")

        if "*" in self.selected_fields:
            return current_schema

        return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive)

    @abstractmethod
    def plan_files(self) -> Iterable[ScanTask]: ...

    @abstractmethod
    def to_arrow(self) -> pa.Table: ...

    @abstractmethod
    def to_pandas(self, **kwargs: Any) -> pd.DataFrame: ...

    def update(self: S, **overrides: Any) -> S:
        """Create a copy of this table scan with updated fields."""
        return type(self)(**{**self.__dict__, **overrides})

    def use_ref(self: S, name: str) -> S:
        if self.snapshot_id:
            raise ValueError(f"Cannot override ref, already set snapshot id={self.snapshot_id}")
        if snapshot := self.table_metadata.snapshot_by_name(name):
            return self.update(snapshot_id=snapshot.snapshot_id)

        raise ValueError(f"Cannot scan unknown ref={name}")

    def select(self: S, *field_names: str) -> S:
        if "*" in self.selected_fields:
            return self.update(selected_fields=field_names)
        return self.update(selected_fields=tuple(set(self.selected_fields).intersection(set(field_names))))

    def filter(self: S, expr: Union[str, BooleanExpression]) -> S:
        return self.update(row_filter=And(self.row_filter, _parse_row_filter(expr)))

    def with_case_sensitive(self: S, case_sensitive: bool = True) -> S:
        return self.update(case_sensitive=case_sensitive)

update(**overrides)

Create a copy of this table scan with updated fields.

Source code in pyiceberg/table/__init__.py
def update(self: S, **overrides: Any) -> S:
    """Create a copy of this table scan with updated fields."""
    return type(self)(**{**self.__dict__, **overrides})

Transaction

Source code in pyiceberg/table/__init__.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
class Transaction:
    _table: Table
    table_metadata: TableMetadata
    _autocommit: bool
    _updates: Tuple[TableUpdate, ...]
    _requirements: Tuple[TableRequirement, ...]

    def __init__(self, table: Table, autocommit: bool = False):
        """Open a transaction to stage and commit changes to a table.

        Args:
            table: The table that will be altered.
            autocommit: Option to automatically commit the changes when they are staged.
        """
        self.table_metadata = table.metadata
        self._table = table
        self._autocommit = autocommit
        self._updates = ()
        self._requirements = ()

    def __enter__(self) -> Transaction:
        """Start a transaction to update the table."""
        return self

    def __exit__(
        self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType]
    ) -> None:
        """Close and commit the transaction if no exceptions have been raised."""
        if exctype is None and excinst is None and exctb is None:
            self.commit_transaction()

    def _apply(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...] = ()) -> Transaction:
        """Check if the requirements are met, and applies the updates to the metadata."""
        for requirement in requirements:
            requirement.validate(self.table_metadata)

        self._updates += updates

        # For the requirements, it does not make sense to add a requirement more than once
        # For example, you cannot assert that the current schema has two different IDs
        existing_requirements = {type(requirement) for requirement in self._requirements}
        for new_requirement in requirements:
            if type(new_requirement) not in existing_requirements:
                self._requirements = self._requirements + requirements

        self.table_metadata = update_table_metadata(self.table_metadata, updates)

        if self._autocommit:
            self.commit_transaction()
            self._updates = ()
            self._requirements = ()

        return self

    def _scan(self, row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE) -> DataScan:
        """Minimal data scan of the table with the current state of the transaction."""
        return DataScan(
            table_metadata=self.table_metadata,
            io=self._table.io,
            row_filter=row_filter,
        )

    def upgrade_table_version(self, format_version: TableVersion) -> Transaction:
        """Set the table to a certain version.

        Args:
            format_version: The newly set version.

        Returns:
            The alter table builder.
        """
        if format_version not in {1, 2}:
            raise ValueError(f"Unsupported table format version: {format_version}")

        if format_version < self.table_metadata.format_version:
            raise ValueError(f"Cannot downgrade v{self.table_metadata.format_version} table to v{format_version}")

        if format_version > self.table_metadata.format_version:
            return self._apply((UpgradeFormatVersionUpdate(format_version=format_version),))

        return self

    def set_properties(self, properties: Properties = EMPTY_DICT, **kwargs: Any) -> Transaction:
        """Set properties.

        When a property is already set, it will be overwritten.

        Args:
            properties: The properties set on the table.
            kwargs: properties can also be pass as kwargs.

        Returns:
            The alter table builder.
        """
        if properties and kwargs:
            raise ValueError("Cannot pass both properties and kwargs")
        updates = properties or kwargs
        return self._apply((SetPropertiesUpdate(updates=updates),))

    def _set_ref_snapshot(
        self,
        snapshot_id: int,
        ref_name: str,
        type: str,
        max_ref_age_ms: Optional[int] = None,
        max_snapshot_age_ms: Optional[int] = None,
        min_snapshots_to_keep: Optional[int] = None,
    ) -> UpdatesAndRequirements:
        """Update a ref to a snapshot.

        Returns:
            The updates and requirements for the set-snapshot-ref staged
        """
        updates = (
            SetSnapshotRefUpdate(
                snapshot_id=snapshot_id,
                ref_name=ref_name,
                type=type,
                max_ref_age_ms=max_ref_age_ms,
                max_snapshot_age_ms=max_snapshot_age_ms,
                min_snapshots_to_keep=min_snapshots_to_keep,
            ),
        )
        requirements = (
            AssertRefSnapshotId(
                snapshot_id=self.table_metadata.refs[ref_name].snapshot_id if ref_name in self.table_metadata.refs else None,
                ref=ref_name,
            ),
        )

        return updates, requirements

    def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema:
        """Create a new UpdateSchema to alter the columns of this table.

        Args:
            allow_incompatible_changes: If changes are allowed that might break downstream consumers.
            case_sensitive: If field names are case-sensitive.

        Returns:
            A new UpdateSchema.
        """
        return UpdateSchema(
            self,
            allow_incompatible_changes=allow_incompatible_changes,
            case_sensitive=case_sensitive,
            name_mapping=self.table_metadata.name_mapping(),
        )

    def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> UpdateSnapshot:
        """Create a new UpdateSnapshot to produce a new snapshot for the table.

        Returns:
            A new UpdateSnapshot
        """
        return UpdateSnapshot(self, io=self._table.io, snapshot_properties=snapshot_properties)

    def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
        """
        Shorthand API for appending a PyArrow table to a table transaction.

        Args:
            df: The Arrow dataframe that will be appended to overwrite the table
            snapshot_properties: Custom properties to be added to the snapshot summary
        """
        try:
            import pyarrow as pa
        except ModuleNotFoundError as e:
            raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e

        from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _dataframe_to_data_files

        if not isinstance(df, pa.Table):
            raise ValueError(f"Expected PyArrow table, got: {df}")

        if unsupported_partitions := [
            field for field in self.table_metadata.spec().fields if not field.transform.supports_pyarrow_transform
        ]:
            raise ValueError(
                f"Not all partition types are supported for writes. Following partitions cannot be written using pyarrow: {unsupported_partitions}."
            )
        downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
        _check_pyarrow_schema_compatible(
            self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
        )

        manifest_merge_enabled = property_as_bool(
            self.table_metadata.properties,
            TableProperties.MANIFEST_MERGE_ENABLED,
            TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT,
        )
        update_snapshot = self.update_snapshot(snapshot_properties=snapshot_properties)
        append_method = update_snapshot.merge_append if manifest_merge_enabled else update_snapshot.fast_append

        with append_method() as append_files:
            # skip writing data files if the dataframe is empty
            if df.shape[0] > 0:
                data_files = _dataframe_to_data_files(
                    table_metadata=self.table_metadata, write_uuid=append_files.commit_uuid, df=df, io=self._table.io
                )
                for data_file in data_files:
                    append_files.append_data_file(data_file)

    def overwrite(
        self,
        df: pa.Table,
        overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
        snapshot_properties: Dict[str, str] = EMPTY_DICT,
    ) -> None:
        """
        Shorthand for adding a table overwrite with a PyArrow table to the transaction.

        An overwrite may produce zero or more snapshots based on the operation:

            - DELETE: In case existing Parquet files can be dropped completely.
            - REPLACE: In case existing Parquet files need to be rewritten.
            - APPEND: In case new data is being inserted into the table.

        Args:
            df: The Arrow dataframe that will be used to overwrite the table
            overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
                              or a boolean expression in case of a partial overwrite
            snapshot_properties: Custom properties to be added to the snapshot summary
        """
        try:
            import pyarrow as pa
        except ModuleNotFoundError as e:
            raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e

        from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _dataframe_to_data_files

        if not isinstance(df, pa.Table):
            raise ValueError(f"Expected PyArrow table, got: {df}")

        if unsupported_partitions := [
            field for field in self.table_metadata.spec().fields if not field.transform.supports_pyarrow_transform
        ]:
            raise ValueError(
                f"Not all partition types are supported for writes. Following partitions cannot be written using pyarrow: {unsupported_partitions}."
            )
        downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
        _check_pyarrow_schema_compatible(
            self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
        )

        self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties)

        with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot:
            # skip writing data files if the dataframe is empty
            if df.shape[0] > 0:
                data_files = _dataframe_to_data_files(
                    table_metadata=self.table_metadata, write_uuid=update_snapshot.commit_uuid, df=df, io=self._table.io
                )
                for data_file in data_files:
                    update_snapshot.append_data_file(data_file)

    def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
        """
        Shorthand for deleting record from a table.

        An deletee may produce zero or more snapshots based on the operation:

            - DELETE: In case existing Parquet files can be dropped completely.
            - REPLACE: In case existing Parquet files need to be rewritten

        Args:
            delete_filter: A boolean expression to delete rows from a table
            snapshot_properties: Custom properties to be added to the snapshot summary
        """
        from pyiceberg.io.pyarrow import (
            ArrowScan,
            _dataframe_to_data_files,
            _expression_to_complementary_pyarrow,
        )

        if (
            self.table_metadata.properties.get(TableProperties.DELETE_MODE, TableProperties.DELETE_MODE_DEFAULT)
            == TableProperties.DELETE_MODE_MERGE_ON_READ
        ):
            warnings.warn("Merge on read is not yet supported, falling back to copy-on-write")

        if isinstance(delete_filter, str):
            delete_filter = _parse_row_filter(delete_filter)

        with self.update_snapshot(snapshot_properties=snapshot_properties).delete() as delete_snapshot:
            delete_snapshot.delete_by_predicate(delete_filter)

        # Check if there are any files that require an actual rewrite of a data file
        if delete_snapshot.rewrites_needed is True:
            bound_delete_filter = bind(self.table_metadata.schema(), delete_filter, case_sensitive=True)
            preserve_row_filter = _expression_to_complementary_pyarrow(bound_delete_filter)

            files = self._scan(row_filter=delete_filter).plan_files()

            commit_uuid = uuid.uuid4()
            counter = itertools.count(0)

            replaced_files: List[Tuple[DataFile, List[DataFile]]] = []
            # This will load the Parquet file into memory, including:
            #   - Filter out the rows based on the delete filter
            #   - Projecting it to the current schema
            #   - Applying the positional deletes if they are there
            # When writing
            #   - Apply the latest partition-spec
            #   - And sort order when added
            for original_file in files:
                df = ArrowScan(
                    table_metadata=self.table_metadata,
                    io=self._table.io,
                    projected_schema=self.table_metadata.schema(),
                    row_filter=AlwaysTrue(),
                ).to_table(tasks=[original_file])
                filtered_df = df.filter(preserve_row_filter)

                # Only rewrite if there are records being deleted
                if len(filtered_df) == 0:
                    replaced_files.append((original_file.file, []))
                elif len(df) != len(filtered_df):
                    replaced_files.append((
                        original_file.file,
                        list(
                            _dataframe_to_data_files(
                                io=self._table.io,
                                df=filtered_df,
                                table_metadata=self.table_metadata,
                                write_uuid=commit_uuid,
                                counter=counter,
                            )
                        ),
                    ))

            if len(replaced_files) > 0:
                with self.update_snapshot(snapshot_properties=snapshot_properties).overwrite(
                    commit_uuid=commit_uuid
                ) as overwrite_snapshot:
                    for original_data_file, replaced_data_files in replaced_files:
                        overwrite_snapshot.delete_data_file(original_data_file)
                        for replaced_data_file in replaced_data_files:
                            overwrite_snapshot.append_data_file(replaced_data_file)

        if not delete_snapshot.files_affected and not delete_snapshot.rewrites_needed:
            warnings.warn("Delete operation did not match any records")

    def add_files(
        self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True
    ) -> None:
        """
        Shorthand API for adding files as data files to the table transaction.

        Args:
            file_paths: The list of full file paths to be added as data files to the table

        Raises:
            FileNotFoundError: If the file does not exist.
            ValueError: Raises a ValueError given file_paths contains duplicate files
            ValueError: Raises a ValueError given file_paths already referenced by table
        """
        if len(file_paths) != len(set(file_paths)):
            raise ValueError("File paths must be unique")

        if check_duplicate_files:
            import pyarrow.compute as pc

            expr = pc.field("file_path").isin(file_paths)
            referenced_files = [file["file_path"] for file in self._table.inspect.files().filter(expr).to_pylist()]

            if referenced_files:
                raise ValueError(f"Cannot add files that are already referenced by table, files: {', '.join(referenced_files)}")

        if self.table_metadata.name_mapping() is None:
            self.set_properties(**{
                TableProperties.DEFAULT_NAME_MAPPING: self.table_metadata.schema().name_mapping.model_dump_json()
            })
        with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot:
            data_files = _parquet_files_to_data_files(
                table_metadata=self.table_metadata, file_paths=file_paths, io=self._table.io
            )
            for data_file in data_files:
                update_snapshot.append_data_file(data_file)

    def update_spec(self) -> UpdateSpec:
        """Create a new UpdateSpec to update the partitioning of the table.

        Returns:
            A new UpdateSpec.
        """
        return UpdateSpec(self)

    def remove_properties(self, *removals: str) -> Transaction:
        """Remove properties.

        Args:
            removals: Properties to be removed.

        Returns:
            The alter table builder.
        """
        return self._apply((RemovePropertiesUpdate(removals=removals),))

    def update_location(self, location: str) -> Transaction:
        """Set the new table location.

        Args:
            location: The new location of the table.

        Returns:
            The alter table builder.
        """
        raise NotImplementedError("Not yet implemented")

    def commit_transaction(self) -> Table:
        """Commit the changes to the catalog.

        Returns:
            The table with the updates applied.
        """
        if len(self._updates) > 0:
            self._requirements += (AssertTableUUID(uuid=self.table_metadata.table_uuid),)
            self._table._do_commit(  # pylint: disable=W0212
                updates=self._updates,
                requirements=self._requirements,
            )
            return self._table
        else:
            return self._table

__enter__()

Start a transaction to update the table.

Source code in pyiceberg/table/__init__.py
def __enter__(self) -> Transaction:
    """Start a transaction to update the table."""
    return self

__exit__(exctype, excinst, exctb)

Close and commit the transaction if no exceptions have been raised.

Source code in pyiceberg/table/__init__.py
def __exit__(
    self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType]
) -> None:
    """Close and commit the transaction if no exceptions have been raised."""
    if exctype is None and excinst is None and exctb is None:
        self.commit_transaction()

__init__(table, autocommit=False)

Open a transaction to stage and commit changes to a table.

Parameters:

Name Type Description Default
table Table

The table that will be altered.

required
autocommit bool

Option to automatically commit the changes when they are staged.

False
Source code in pyiceberg/table/__init__.py
def __init__(self, table: Table, autocommit: bool = False):
    """Open a transaction to stage and commit changes to a table.

    Args:
        table: The table that will be altered.
        autocommit: Option to automatically commit the changes when they are staged.
    """
    self.table_metadata = table.metadata
    self._table = table
    self._autocommit = autocommit
    self._updates = ()
    self._requirements = ()

add_files(file_paths, snapshot_properties=EMPTY_DICT, check_duplicate_files=True)

Shorthand API for adding files as data files to the table transaction.

Parameters:

Name Type Description Default
file_paths List[str]

The list of full file paths to be added as data files to the table

required

Raises:

Type Description
FileNotFoundError

If the file does not exist.

ValueError

Raises a ValueError given file_paths contains duplicate files

ValueError

Raises a ValueError given file_paths already referenced by table

Source code in pyiceberg/table/__init__.py
def add_files(
    self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True
) -> None:
    """
    Shorthand API for adding files as data files to the table transaction.

    Args:
        file_paths: The list of full file paths to be added as data files to the table

    Raises:
        FileNotFoundError: If the file does not exist.
        ValueError: Raises a ValueError given file_paths contains duplicate files
        ValueError: Raises a ValueError given file_paths already referenced by table
    """
    if len(file_paths) != len(set(file_paths)):
        raise ValueError("File paths must be unique")

    if check_duplicate_files:
        import pyarrow.compute as pc

        expr = pc.field("file_path").isin(file_paths)
        referenced_files = [file["file_path"] for file in self._table.inspect.files().filter(expr).to_pylist()]

        if referenced_files:
            raise ValueError(f"Cannot add files that are already referenced by table, files: {', '.join(referenced_files)}")

    if self.table_metadata.name_mapping() is None:
        self.set_properties(**{
            TableProperties.DEFAULT_NAME_MAPPING: self.table_metadata.schema().name_mapping.model_dump_json()
        })
    with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot:
        data_files = _parquet_files_to_data_files(
            table_metadata=self.table_metadata, file_paths=file_paths, io=self._table.io
        )
        for data_file in data_files:
            update_snapshot.append_data_file(data_file)

append(df, snapshot_properties=EMPTY_DICT)

Shorthand API for appending a PyArrow table to a table transaction.

Parameters:

Name Type Description Default
df Table

The Arrow dataframe that will be appended to overwrite the table

required
snapshot_properties Dict[str, str]

Custom properties to be added to the snapshot summary

EMPTY_DICT
Source code in pyiceberg/table/__init__.py
def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
    """
    Shorthand API for appending a PyArrow table to a table transaction.

    Args:
        df: The Arrow dataframe that will be appended to overwrite the table
        snapshot_properties: Custom properties to be added to the snapshot summary
    """
    try:
        import pyarrow as pa
    except ModuleNotFoundError as e:
        raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e

    from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _dataframe_to_data_files

    if not isinstance(df, pa.Table):
        raise ValueError(f"Expected PyArrow table, got: {df}")

    if unsupported_partitions := [
        field for field in self.table_metadata.spec().fields if not field.transform.supports_pyarrow_transform
    ]:
        raise ValueError(
            f"Not all partition types are supported for writes. Following partitions cannot be written using pyarrow: {unsupported_partitions}."
        )
    downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
    _check_pyarrow_schema_compatible(
        self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
    )

    manifest_merge_enabled = property_as_bool(
        self.table_metadata.properties,
        TableProperties.MANIFEST_MERGE_ENABLED,
        TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT,
    )
    update_snapshot = self.update_snapshot(snapshot_properties=snapshot_properties)
    append_method = update_snapshot.merge_append if manifest_merge_enabled else update_snapshot.fast_append

    with append_method() as append_files:
        # skip writing data files if the dataframe is empty
        if df.shape[0] > 0:
            data_files = _dataframe_to_data_files(
                table_metadata=self.table_metadata, write_uuid=append_files.commit_uuid, df=df, io=self._table.io
            )
            for data_file in data_files:
                append_files.append_data_file(data_file)

commit_transaction()

Commit the changes to the catalog.

Returns:

Type Description
Table

The table with the updates applied.

Source code in pyiceberg/table/__init__.py
def commit_transaction(self) -> Table:
    """Commit the changes to the catalog.

    Returns:
        The table with the updates applied.
    """
    if len(self._updates) > 0:
        self._requirements += (AssertTableUUID(uuid=self.table_metadata.table_uuid),)
        self._table._do_commit(  # pylint: disable=W0212
            updates=self._updates,
            requirements=self._requirements,
        )
        return self._table
    else:
        return self._table

delete(delete_filter, snapshot_properties=EMPTY_DICT)

Shorthand for deleting record from a table.

An deletee may produce zero or more snapshots based on the operation:

- DELETE: In case existing Parquet files can be dropped completely.
- REPLACE: In case existing Parquet files need to be rewritten

Parameters:

Name Type Description Default
delete_filter Union[str, BooleanExpression]

A boolean expression to delete rows from a table

required
snapshot_properties Dict[str, str]

Custom properties to be added to the snapshot summary

EMPTY_DICT
Source code in pyiceberg/table/__init__.py
def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
    """
    Shorthand for deleting record from a table.

    An deletee may produce zero or more snapshots based on the operation:

        - DELETE: In case existing Parquet files can be dropped completely.
        - REPLACE: In case existing Parquet files need to be rewritten

    Args:
        delete_filter: A boolean expression to delete rows from a table
        snapshot_properties: Custom properties to be added to the snapshot summary
    """
    from pyiceberg.io.pyarrow import (
        ArrowScan,
        _dataframe_to_data_files,
        _expression_to_complementary_pyarrow,
    )

    if (
        self.table_metadata.properties.get(TableProperties.DELETE_MODE, TableProperties.DELETE_MODE_DEFAULT)
        == TableProperties.DELETE_MODE_MERGE_ON_READ
    ):
        warnings.warn("Merge on read is not yet supported, falling back to copy-on-write")

    if isinstance(delete_filter, str):
        delete_filter = _parse_row_filter(delete_filter)

    with self.update_snapshot(snapshot_properties=snapshot_properties).delete() as delete_snapshot:
        delete_snapshot.delete_by_predicate(delete_filter)

    # Check if there are any files that require an actual rewrite of a data file
    if delete_snapshot.rewrites_needed is True:
        bound_delete_filter = bind(self.table_metadata.schema(), delete_filter, case_sensitive=True)
        preserve_row_filter = _expression_to_complementary_pyarrow(bound_delete_filter)

        files = self._scan(row_filter=delete_filter).plan_files()

        commit_uuid = uuid.uuid4()
        counter = itertools.count(0)

        replaced_files: List[Tuple[DataFile, List[DataFile]]] = []
        # This will load the Parquet file into memory, including:
        #   - Filter out the rows based on the delete filter
        #   - Projecting it to the current schema
        #   - Applying the positional deletes if they are there
        # When writing
        #   - Apply the latest partition-spec
        #   - And sort order when added
        for original_file in files:
            df = ArrowScan(
                table_metadata=self.table_metadata,
                io=self._table.io,
                projected_schema=self.table_metadata.schema(),
                row_filter=AlwaysTrue(),
            ).to_table(tasks=[original_file])
            filtered_df = df.filter(preserve_row_filter)

            # Only rewrite if there are records being deleted
            if len(filtered_df) == 0:
                replaced_files.append((original_file.file, []))
            elif len(df) != len(filtered_df):
                replaced_files.append((
                    original_file.file,
                    list(
                        _dataframe_to_data_files(
                            io=self._table.io,
                            df=filtered_df,
                            table_metadata=self.table_metadata,
                            write_uuid=commit_uuid,
                            counter=counter,
                        )
                    ),
                ))

        if len(replaced_files) > 0:
            with self.update_snapshot(snapshot_properties=snapshot_properties).overwrite(
                commit_uuid=commit_uuid
            ) as overwrite_snapshot:
                for original_data_file, replaced_data_files in replaced_files:
                    overwrite_snapshot.delete_data_file(original_data_file)
                    for replaced_data_file in replaced_data_files:
                        overwrite_snapshot.append_data_file(replaced_data_file)

    if not delete_snapshot.files_affected and not delete_snapshot.rewrites_needed:
        warnings.warn("Delete operation did not match any records")

overwrite(df, overwrite_filter=ALWAYS_TRUE, snapshot_properties=EMPTY_DICT)

Shorthand for adding a table overwrite with a PyArrow table to the transaction.

An overwrite may produce zero or more snapshots based on the operation:

- DELETE: In case existing Parquet files can be dropped completely.
- REPLACE: In case existing Parquet files need to be rewritten.
- APPEND: In case new data is being inserted into the table.

Parameters:

Name Type Description Default
df Table

The Arrow dataframe that will be used to overwrite the table

required
overwrite_filter Union[BooleanExpression, str]

ALWAYS_TRUE when you overwrite all the data, or a boolean expression in case of a partial overwrite

ALWAYS_TRUE
snapshot_properties Dict[str, str]

Custom properties to be added to the snapshot summary

EMPTY_DICT
Source code in pyiceberg/table/__init__.py
def overwrite(
    self,
    df: pa.Table,
    overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
    snapshot_properties: Dict[str, str] = EMPTY_DICT,
) -> None:
    """
    Shorthand for adding a table overwrite with a PyArrow table to the transaction.

    An overwrite may produce zero or more snapshots based on the operation:

        - DELETE: In case existing Parquet files can be dropped completely.
        - REPLACE: In case existing Parquet files need to be rewritten.
        - APPEND: In case new data is being inserted into the table.

    Args:
        df: The Arrow dataframe that will be used to overwrite the table
        overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
                          or a boolean expression in case of a partial overwrite
        snapshot_properties: Custom properties to be added to the snapshot summary
    """
    try:
        import pyarrow as pa
    except ModuleNotFoundError as e:
        raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e

    from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _dataframe_to_data_files

    if not isinstance(df, pa.Table):
        raise ValueError(f"Expected PyArrow table, got: {df}")

    if unsupported_partitions := [
        field for field in self.table_metadata.spec().fields if not field.transform.supports_pyarrow_transform
    ]:
        raise ValueError(
            f"Not all partition types are supported for writes. Following partitions cannot be written using pyarrow: {unsupported_partitions}."
        )
    downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
    _check_pyarrow_schema_compatible(
        self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
    )

    self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties)

    with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot:
        # skip writing data files if the dataframe is empty
        if df.shape[0] > 0:
            data_files = _dataframe_to_data_files(
                table_metadata=self.table_metadata, write_uuid=update_snapshot.commit_uuid, df=df, io=self._table.io
            )
            for data_file in data_files:
                update_snapshot.append_data_file(data_file)

remove_properties(*removals)

Remove properties.

Parameters:

Name Type Description Default
removals str

Properties to be removed.

()

Returns:

Type Description
Transaction

The alter table builder.

Source code in pyiceberg/table/__init__.py
def remove_properties(self, *removals: str) -> Transaction:
    """Remove properties.

    Args:
        removals: Properties to be removed.

    Returns:
        The alter table builder.
    """
    return self._apply((RemovePropertiesUpdate(removals=removals),))

set_properties(properties=EMPTY_DICT, **kwargs)

Set properties.

When a property is already set, it will be overwritten.

Parameters:

Name Type Description Default
properties Properties

The properties set on the table.

EMPTY_DICT
kwargs Any

properties can also be pass as kwargs.

{}

Returns:

Type Description
Transaction

The alter table builder.

Source code in pyiceberg/table/__init__.py
def set_properties(self, properties: Properties = EMPTY_DICT, **kwargs: Any) -> Transaction:
    """Set properties.

    When a property is already set, it will be overwritten.

    Args:
        properties: The properties set on the table.
        kwargs: properties can also be pass as kwargs.

    Returns:
        The alter table builder.
    """
    if properties and kwargs:
        raise ValueError("Cannot pass both properties and kwargs")
    updates = properties or kwargs
    return self._apply((SetPropertiesUpdate(updates=updates),))

update_location(location)

Set the new table location.

Parameters:

Name Type Description Default
location str

The new location of the table.

required

Returns:

Type Description
Transaction

The alter table builder.

Source code in pyiceberg/table/__init__.py
def update_location(self, location: str) -> Transaction:
    """Set the new table location.

    Args:
        location: The new location of the table.

    Returns:
        The alter table builder.
    """
    raise NotImplementedError("Not yet implemented")

update_schema(allow_incompatible_changes=False, case_sensitive=True)

Create a new UpdateSchema to alter the columns of this table.

Parameters:

Name Type Description Default
allow_incompatible_changes bool

If changes are allowed that might break downstream consumers.

False
case_sensitive bool

If field names are case-sensitive.

True

Returns:

Type Description
UpdateSchema

A new UpdateSchema.

Source code in pyiceberg/table/__init__.py
def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema:
    """Create a new UpdateSchema to alter the columns of this table.

    Args:
        allow_incompatible_changes: If changes are allowed that might break downstream consumers.
        case_sensitive: If field names are case-sensitive.

    Returns:
        A new UpdateSchema.
    """
    return UpdateSchema(
        self,
        allow_incompatible_changes=allow_incompatible_changes,
        case_sensitive=case_sensitive,
        name_mapping=self.table_metadata.name_mapping(),
    )

update_snapshot(snapshot_properties=EMPTY_DICT)

Create a new UpdateSnapshot to produce a new snapshot for the table.

Returns:

Type Description
UpdateSnapshot

A new UpdateSnapshot

Source code in pyiceberg/table/__init__.py
def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> UpdateSnapshot:
    """Create a new UpdateSnapshot to produce a new snapshot for the table.

    Returns:
        A new UpdateSnapshot
    """
    return UpdateSnapshot(self, io=self._table.io, snapshot_properties=snapshot_properties)

update_spec()

Create a new UpdateSpec to update the partitioning of the table.

Returns:

Type Description
UpdateSpec

A new UpdateSpec.

Source code in pyiceberg/table/__init__.py
def update_spec(self) -> UpdateSpec:
    """Create a new UpdateSpec to update the partitioning of the table.

    Returns:
        A new UpdateSpec.
    """
    return UpdateSpec(self)

upgrade_table_version(format_version)

Set the table to a certain version.

Parameters:

Name Type Description Default
format_version TableVersion

The newly set version.

required

Returns:

Type Description
Transaction

The alter table builder.

Source code in pyiceberg/table/__init__.py
def upgrade_table_version(self, format_version: TableVersion) -> Transaction:
    """Set the table to a certain version.

    Args:
        format_version: The newly set version.

    Returns:
        The alter table builder.
    """
    if format_version not in {1, 2}:
        raise ValueError(f"Unsupported table format version: {format_version}")

    if format_version < self.table_metadata.format_version:
        raise ValueError(f"Cannot downgrade v{self.table_metadata.format_version} table to v{format_version}")

    if format_version > self.table_metadata.format_version:
        return self._apply((UpgradeFormatVersionUpdate(format_version=format_version),))

    return self

WriteTask dataclass

Task with the parameters for writing a DataFile.

Source code in pyiceberg/table/__init__.py
@dataclass(frozen=True)
class WriteTask:
    """Task with the parameters for writing a DataFile."""

    write_uuid: uuid.UUID
    task_id: int
    schema: Schema
    record_batches: List[pa.RecordBatch]
    sort_order_id: Optional[int] = None
    partition_key: Optional[PartitionKey] = None

    def generate_data_file_filename(self, extension: str) -> str:
        # Mimics the behavior in the Java API:
        # https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101
        return f"00000-{self.task_id}-{self.write_uuid}.{extension}"

    def generate_data_file_path(self, extension: str) -> str:
        if self.partition_key:
            file_path = f"{self.partition_key.to_path()}/{self.generate_data_file_filename(extension)}"
            return file_path
        else:
            return self.generate_data_file_filename(extension)