Skip to content

table

AddFileTask dataclass

Task with the parameters for adding a Parquet file as a DataFile.

Source code in pyiceberg/table/__init__.py
@dataclass(frozen=True)
class AddFileTask:
    """Task with the parameters for adding a Parquet file as a DataFile."""

    file_path: str
    partition_field_value: Record

CommitTableRequest

Bases: IcebergBaseModel

A pydantic BaseModel for a table commit request.

Source code in pyiceberg/table/__init__.py
class CommitTableRequest(IcebergBaseModel):
    """A pydantic BaseModel for a table commit request."""

    identifier: TableIdentifier = Field()
    requirements: Tuple[TableRequirement, ...] = Field(default_factory=tuple)
    updates: Tuple[TableUpdate, ...] = Field(default_factory=tuple)

CommitTableResponse

Bases: IcebergBaseModel

A pydantic BaseModel for a table commit response.

Source code in pyiceberg/table/__init__.py
class CommitTableResponse(IcebergBaseModel):
    """A pydantic BaseModel for a table commit response."""

    metadata: TableMetadata
    metadata_location: str = Field(alias="metadata-location")

CreateTableTransaction

Bases: Transaction

A transaction that involves the creation of a a new table.

Source code in pyiceberg/table/__init__.py
class CreateTableTransaction(Transaction):
    """A transaction that involves the creation of a a new table."""

    def _initial_changes(self, table_metadata: TableMetadata) -> None:
        """Set the initial changes that can reconstruct the initial table metadata when creating the CreateTableTransaction."""
        self._updates += (
            AssignUUIDUpdate(uuid=table_metadata.table_uuid),
            UpgradeFormatVersionUpdate(format_version=table_metadata.format_version),
        )

        schema: Schema = table_metadata.schema()
        self._updates += (
            AddSchemaUpdate(schema_=schema, last_column_id=schema.highest_field_id),
            SetCurrentSchemaUpdate(schema_id=-1),
        )

        spec: PartitionSpec = table_metadata.spec()
        if spec.is_unpartitioned():
            self._updates += (AddPartitionSpecUpdate(spec=UNPARTITIONED_PARTITION_SPEC),)
        else:
            self._updates += (AddPartitionSpecUpdate(spec=spec),)
        self._updates += (SetDefaultSpecUpdate(spec_id=-1),)

        sort_order: Optional[SortOrder] = table_metadata.sort_order_by_id(table_metadata.default_sort_order_id)
        if sort_order is None or sort_order.is_unsorted:
            self._updates += (AddSortOrderUpdate(sort_order=UNSORTED_SORT_ORDER),)
        else:
            self._updates += (AddSortOrderUpdate(sort_order=sort_order),)
        self._updates += (SetDefaultSortOrderUpdate(sort_order_id=-1),)

        self._updates += (
            SetLocationUpdate(location=table_metadata.location),
            SetPropertiesUpdate(updates=table_metadata.properties),
        )

    def __init__(self, table: StagedTable):
        super().__init__(table, autocommit=False)
        self._initial_changes(table.metadata)

    def commit_transaction(self) -> Table:
        """Commit the changes to the catalog.

        In the case of a CreateTableTransaction, the only requirement is AssertCreate.
        Returns:
            The table with the updates applied.
        """
        self._requirements = (AssertCreate(),)
        self._table._do_commit(  # pylint: disable=W0212
            updates=self._updates,
            requirements=self._requirements,
        )
        return self._table

_initial_changes(table_metadata)

Set the initial changes that can reconstruct the initial table metadata when creating the CreateTableTransaction.

Source code in pyiceberg/table/__init__.py
def _initial_changes(self, table_metadata: TableMetadata) -> None:
    """Set the initial changes that can reconstruct the initial table metadata when creating the CreateTableTransaction."""
    self._updates += (
        AssignUUIDUpdate(uuid=table_metadata.table_uuid),
        UpgradeFormatVersionUpdate(format_version=table_metadata.format_version),
    )

    schema: Schema = table_metadata.schema()
    self._updates += (
        AddSchemaUpdate(schema_=schema, last_column_id=schema.highest_field_id),
        SetCurrentSchemaUpdate(schema_id=-1),
    )

    spec: PartitionSpec = table_metadata.spec()
    if spec.is_unpartitioned():
        self._updates += (AddPartitionSpecUpdate(spec=UNPARTITIONED_PARTITION_SPEC),)
    else:
        self._updates += (AddPartitionSpecUpdate(spec=spec),)
    self._updates += (SetDefaultSpecUpdate(spec_id=-1),)

    sort_order: Optional[SortOrder] = table_metadata.sort_order_by_id(table_metadata.default_sort_order_id)
    if sort_order is None or sort_order.is_unsorted:
        self._updates += (AddSortOrderUpdate(sort_order=UNSORTED_SORT_ORDER),)
    else:
        self._updates += (AddSortOrderUpdate(sort_order=sort_order),)
    self._updates += (SetDefaultSortOrderUpdate(sort_order_id=-1),)

    self._updates += (
        SetLocationUpdate(location=table_metadata.location),
        SetPropertiesUpdate(updates=table_metadata.properties),
    )

commit_transaction()

Commit the changes to the catalog.

In the case of a CreateTableTransaction, the only requirement is AssertCreate. Returns: The table with the updates applied.

Source code in pyiceberg/table/__init__.py
def commit_transaction(self) -> Table:
    """Commit the changes to the catalog.

    In the case of a CreateTableTransaction, the only requirement is AssertCreate.
    Returns:
        The table with the updates applied.
    """
    self._requirements = (AssertCreate(),)
    self._table._do_commit(  # pylint: disable=W0212
        updates=self._updates,
        requirements=self._requirements,
    )
    return self._table

DataScan

Bases: TableScan

Source code in pyiceberg/table/__init__.py
class DataScan(TableScan):
    def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
        project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id], self.case_sensitive)
        return project(self.row_filter)

    @cached_property
    def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
        return KeyDefaultDict(self._build_partition_projection)

    def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]:
        spec = self.table_metadata.specs()[spec_id]
        return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive)

    def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]:
        spec = self.table_metadata.specs()[spec_id]
        partition_type = spec.partition_type(self.table_metadata.schema())
        partition_schema = Schema(*partition_type.fields)
        partition_expr = self.partition_filters[spec_id]

        # The lambda created here is run in multiple threads.
        # So we avoid creating _EvaluatorExpression methods bound to a single
        # shared instance across multiple threads.
        return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition)

    def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]:
        schema = self.table_metadata.schema()
        include_empty_files = strtobool(self.options.get("include_empty_files", "false"))

        # The lambda created here is run in multiple threads.
        # So we avoid creating _InclusiveMetricsEvaluator methods bound to a single
        # shared instance across multiple threads.
        return lambda data_file: _InclusiveMetricsEvaluator(
            schema,
            self.row_filter,
            self.case_sensitive,
            include_empty_files,
        ).eval(data_file)

    def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], ResidualEvaluator]:
        spec = self.table_metadata.specs()[spec_id]

        # The lambda created here is run in multiple threads.
        # So we avoid creating _EvaluatorExpression methods bound to a single
        # shared instance across multiple threads.
        # return lambda data_file: (partition_schema, partition_expr, self.case_sensitive)(data_file.partition)
        from pyiceberg.expressions.visitors import residual_evaluator_of

        # assert self.row_filter == False
        return lambda datafile: (
            residual_evaluator_of(
                spec=spec,
                expr=self.row_filter,
                case_sensitive=self.case_sensitive,
                schema=self.table_metadata.schema(),
            )
        )

    def _check_sequence_number(self, min_sequence_number: int, manifest: ManifestFile) -> bool:
        """Ensure that no manifests are loaded that contain deletes that are older than the data.

        Args:
            min_sequence_number (int): The minimal sequence number.
            manifest (ManifestFile): A ManifestFile that can be either data or deletes.

        Returns:
            Boolean indicating if it is either a data file, or a relevant delete file.
        """
        return manifest.content == ManifestContent.DATA or (
            # Not interested in deletes that are older than the data
            manifest.content == ManifestContent.DELETES
            and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_sequence_number
        )

    def plan_files(self) -> Iterable[FileScanTask]:
        """Plans the relevant files by filtering on the PartitionSpecs.

        Returns:
            List of FileScanTasks that contain both data and delete files.
        """
        snapshot = self.snapshot()
        if not snapshot:
            return iter([])

        # step 1: filter manifests using partition summaries
        # the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id

        manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)

        residual_evaluators: Dict[int, Callable[[DataFile], ResidualEvaluator]] = KeyDefaultDict(self._build_residual_evaluator)

        manifests = [
            manifest_file
            for manifest_file in snapshot.manifests(self.io)
            if manifest_evaluators[manifest_file.partition_spec_id](manifest_file)
        ]

        # step 2: filter the data files in each manifest
        # this filter depends on the partition spec used to write the manifest file

        partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator)

        min_sequence_number = _min_sequence_number(manifests)

        data_entries: List[ManifestEntry] = []
        positional_delete_entries = SortedList(key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER)

        executor = ExecutorFactory.get_or_create()
        for manifest_entry in chain(
            *executor.map(
                lambda args: _open_manifest(*args),
                [
                    (
                        self.io,
                        manifest,
                        partition_evaluators[manifest.partition_spec_id],
                        residual_evaluators[manifest.partition_spec_id],
                        self._build_metrics_evaluator(),
                    )
                    for manifest in manifests
                    if self._check_sequence_number(min_sequence_number, manifest)
                ],
            )
        ):
            data_file = manifest_entry.data_file
            if data_file.content == DataFileContent.DATA:
                data_entries.append(manifest_entry)
            elif data_file.content == DataFileContent.POSITION_DELETES:
                positional_delete_entries.add(manifest_entry)
            elif data_file.content == DataFileContent.EQUALITY_DELETES:
                raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568")
            else:
                raise ValueError(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}")

        return [
            FileScanTask(
                data_entry.data_file,
                delete_files=_match_deletes_to_data_file(
                    data_entry,
                    positional_delete_entries,
                ),
                residual=residual_evaluators[data_entry.data_file.spec_id](data_entry.data_file).residual_for(
                    data_entry.data_file.partition
                ),
            )
            for data_entry in data_entries
        ]

    def to_arrow(self) -> pa.Table:
        """Read an Arrow table eagerly from this DataScan.

        All rows will be loaded into memory at once.

        Returns:
            pa.Table: Materialized Arrow Table from the Iceberg table's DataScan
        """
        from pyiceberg.io.pyarrow import ArrowScan

        return ArrowScan(
            self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
        ).to_table(self.plan_files())

    def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
        """Return an Arrow RecordBatchReader from this DataScan.

        For large results, using a RecordBatchReader requires less memory than
        loading an Arrow Table for the same DataScan, because a RecordBatch
        is read one at a time.

        Returns:
            pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan
                which can be used to read a stream of record batches one by one.
        """
        import pyarrow as pa

        from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow

        target_schema = schema_to_pyarrow(self.projection())
        batches = ArrowScan(
            self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
        ).to_record_batches(self.plan_files())

        return pa.RecordBatchReader.from_batches(
            target_schema,
            batches,
        )

    def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
        """Read a Pandas DataFrame eagerly from this Iceberg table.

        Returns:
            pd.DataFrame: Materialized Pandas Dataframe from the Iceberg table
        """
        return self.to_arrow().to_pandas(**kwargs)

    def to_duckdb(self, table_name: str, connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection:
        """Shorthand for loading the Iceberg Table in DuckDB.

        Returns:
            DuckDBPyConnection: In memory DuckDB connection with the Iceberg table.
        """
        import duckdb

        con = connection or duckdb.connect(database=":memory:")
        con.register(table_name, self.to_arrow())

        return con

    def to_ray(self) -> ray.data.dataset.Dataset:
        """Read a Ray Dataset eagerly from this Iceberg table.

        Returns:
            ray.data.dataset.Dataset: Materialized Ray Dataset from the Iceberg table
        """
        import ray

        return ray.data.from_arrow(self.to_arrow())

    def to_polars(self) -> pl.DataFrame:
        """Read a Polars DataFrame from this Iceberg table.

        Returns:
            pl.DataFrame: Materialized Polars Dataframe from the Iceberg table
        """
        import polars as pl

        result = pl.from_arrow(self.to_arrow())
        if isinstance(result, pl.Series):
            result = result.to_frame()

        return result

    def count(self) -> int:
        from pyiceberg.io.pyarrow import ArrowScan

        # Usage: Calculates the total number of records in a Scan that haven't had positional deletes.
        res = 0
        # every task is a FileScanTask
        tasks = self.plan_files()

        for task in tasks:
            # task.residual is a Boolean Expression if the filter condition is fully satisfied by the
            # partition value and task.delete_files represents that positional delete haven't been merged yet
            # hence those files have to read as a pyarrow table applying the filter and deletes
            if task.residual == AlwaysTrue() and len(task.delete_files) == 0:
                # Every File has a metadata stat that stores the file record count
                res += task.file.record_count
            else:
                arrow_scan = ArrowScan(
                    table_metadata=self.table_metadata,
                    io=self.io,
                    projected_schema=self.projection(),
                    row_filter=self.row_filter,
                    case_sensitive=self.case_sensitive,
                )
                tbl = arrow_scan.to_table([task])
                res += len(tbl)
        return res

_check_sequence_number(min_sequence_number, manifest)

Ensure that no manifests are loaded that contain deletes that are older than the data.

Parameters:

Name Type Description Default
min_sequence_number int

The minimal sequence number.

required
manifest ManifestFile

A ManifestFile that can be either data or deletes.

required

Returns:

Type Description
bool

Boolean indicating if it is either a data file, or a relevant delete file.

Source code in pyiceberg/table/__init__.py
def _check_sequence_number(self, min_sequence_number: int, manifest: ManifestFile) -> bool:
    """Ensure that no manifests are loaded that contain deletes that are older than the data.

    Args:
        min_sequence_number (int): The minimal sequence number.
        manifest (ManifestFile): A ManifestFile that can be either data or deletes.

    Returns:
        Boolean indicating if it is either a data file, or a relevant delete file.
    """
    return manifest.content == ManifestContent.DATA or (
        # Not interested in deletes that are older than the data
        manifest.content == ManifestContent.DELETES
        and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_sequence_number
    )

plan_files()

Plans the relevant files by filtering on the PartitionSpecs.

Returns:

Type Description
Iterable[FileScanTask]

List of FileScanTasks that contain both data and delete files.

Source code in pyiceberg/table/__init__.py
def plan_files(self) -> Iterable[FileScanTask]:
    """Plans the relevant files by filtering on the PartitionSpecs.

    Returns:
        List of FileScanTasks that contain both data and delete files.
    """
    snapshot = self.snapshot()
    if not snapshot:
        return iter([])

    # step 1: filter manifests using partition summaries
    # the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id

    manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)

    residual_evaluators: Dict[int, Callable[[DataFile], ResidualEvaluator]] = KeyDefaultDict(self._build_residual_evaluator)

    manifests = [
        manifest_file
        for manifest_file in snapshot.manifests(self.io)
        if manifest_evaluators[manifest_file.partition_spec_id](manifest_file)
    ]

    # step 2: filter the data files in each manifest
    # this filter depends on the partition spec used to write the manifest file

    partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator)

    min_sequence_number = _min_sequence_number(manifests)

    data_entries: List[ManifestEntry] = []
    positional_delete_entries = SortedList(key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER)

    executor = ExecutorFactory.get_or_create()
    for manifest_entry in chain(
        *executor.map(
            lambda args: _open_manifest(*args),
            [
                (
                    self.io,
                    manifest,
                    partition_evaluators[manifest.partition_spec_id],
                    residual_evaluators[manifest.partition_spec_id],
                    self._build_metrics_evaluator(),
                )
                for manifest in manifests
                if self._check_sequence_number(min_sequence_number, manifest)
            ],
        )
    ):
        data_file = manifest_entry.data_file
        if data_file.content == DataFileContent.DATA:
            data_entries.append(manifest_entry)
        elif data_file.content == DataFileContent.POSITION_DELETES:
            positional_delete_entries.add(manifest_entry)
        elif data_file.content == DataFileContent.EQUALITY_DELETES:
            raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568")
        else:
            raise ValueError(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}")

    return [
        FileScanTask(
            data_entry.data_file,
            delete_files=_match_deletes_to_data_file(
                data_entry,
                positional_delete_entries,
            ),
            residual=residual_evaluators[data_entry.data_file.spec_id](data_entry.data_file).residual_for(
                data_entry.data_file.partition
            ),
        )
        for data_entry in data_entries
    ]

to_arrow()

Read an Arrow table eagerly from this DataScan.

All rows will be loaded into memory at once.

Returns:

Type Description
Table

pa.Table: Materialized Arrow Table from the Iceberg table's DataScan

Source code in pyiceberg/table/__init__.py
def to_arrow(self) -> pa.Table:
    """Read an Arrow table eagerly from this DataScan.

    All rows will be loaded into memory at once.

    Returns:
        pa.Table: Materialized Arrow Table from the Iceberg table's DataScan
    """
    from pyiceberg.io.pyarrow import ArrowScan

    return ArrowScan(
        self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
    ).to_table(self.plan_files())

to_arrow_batch_reader()

Return an Arrow RecordBatchReader from this DataScan.

For large results, using a RecordBatchReader requires less memory than loading an Arrow Table for the same DataScan, because a RecordBatch is read one at a time.

Returns:

Type Description
RecordBatchReader

pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan which can be used to read a stream of record batches one by one.

Source code in pyiceberg/table/__init__.py
def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
    """Return an Arrow RecordBatchReader from this DataScan.

    For large results, using a RecordBatchReader requires less memory than
    loading an Arrow Table for the same DataScan, because a RecordBatch
    is read one at a time.

    Returns:
        pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan
            which can be used to read a stream of record batches one by one.
    """
    import pyarrow as pa

    from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow

    target_schema = schema_to_pyarrow(self.projection())
    batches = ArrowScan(
        self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
    ).to_record_batches(self.plan_files())

    return pa.RecordBatchReader.from_batches(
        target_schema,
        batches,
    )

to_duckdb(table_name, connection=None)

Shorthand for loading the Iceberg Table in DuckDB.

Returns:

Name Type Description
DuckDBPyConnection DuckDBPyConnection

In memory DuckDB connection with the Iceberg table.

Source code in pyiceberg/table/__init__.py
def to_duckdb(self, table_name: str, connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection:
    """Shorthand for loading the Iceberg Table in DuckDB.

    Returns:
        DuckDBPyConnection: In memory DuckDB connection with the Iceberg table.
    """
    import duckdb

    con = connection or duckdb.connect(database=":memory:")
    con.register(table_name, self.to_arrow())

    return con

to_pandas(**kwargs)

Read a Pandas DataFrame eagerly from this Iceberg table.

Returns:

Type Description
DataFrame

pd.DataFrame: Materialized Pandas Dataframe from the Iceberg table

Source code in pyiceberg/table/__init__.py
def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
    """Read a Pandas DataFrame eagerly from this Iceberg table.

    Returns:
        pd.DataFrame: Materialized Pandas Dataframe from the Iceberg table
    """
    return self.to_arrow().to_pandas(**kwargs)

to_polars()

Read a Polars DataFrame from this Iceberg table.

Returns:

Type Description
DataFrame

pl.DataFrame: Materialized Polars Dataframe from the Iceberg table

Source code in pyiceberg/table/__init__.py
def to_polars(self) -> pl.DataFrame:
    """Read a Polars DataFrame from this Iceberg table.

    Returns:
        pl.DataFrame: Materialized Polars Dataframe from the Iceberg table
    """
    import polars as pl

    result = pl.from_arrow(self.to_arrow())
    if isinstance(result, pl.Series):
        result = result.to_frame()

    return result

to_ray()

Read a Ray Dataset eagerly from this Iceberg table.

Returns:

Type Description
Dataset

ray.data.dataset.Dataset: Materialized Ray Dataset from the Iceberg table

Source code in pyiceberg/table/__init__.py
def to_ray(self) -> ray.data.dataset.Dataset:
    """Read a Ray Dataset eagerly from this Iceberg table.

    Returns:
        ray.data.dataset.Dataset: Materialized Ray Dataset from the Iceberg table
    """
    import ray

    return ray.data.from_arrow(self.to_arrow())

FileScanTask dataclass

Bases: ScanTask

Task representing a data file and its corresponding delete files.

Source code in pyiceberg/table/__init__.py
@dataclass(init=False)
class FileScanTask(ScanTask):
    """Task representing a data file and its corresponding delete files."""

    file: DataFile
    delete_files: Set[DataFile]
    start: int
    length: int
    residual: BooleanExpression

    def __init__(
        self,
        data_file: DataFile,
        delete_files: Optional[Set[DataFile]] = None,
        start: Optional[int] = None,
        length: Optional[int] = None,
        residual: BooleanExpression = ALWAYS_TRUE,
    ) -> None:
        self.file = data_file
        self.delete_files = delete_files or set()
        self.start = start or 0
        self.length = length or data_file.file_size_in_bytes
        self.residual = residual

Namespace

Bases: IcebergRootModel[List[str]]

Reference to one or more levels of a namespace.

Source code in pyiceberg/table/__init__.py
class Namespace(IcebergRootModel[List[str]]):
    """Reference to one or more levels of a namespace."""

    root: List[str] = Field(
        ...,
        description="Reference to one or more levels of a namespace",
    )

StaticTable

Bases: Table

Load a table directly from a metadata file (i.e., without using a catalog).

Source code in pyiceberg/table/__init__.py
class StaticTable(Table):
    """Load a table directly from a metadata file (i.e., without using a catalog)."""

    def refresh(self) -> Table:
        """Refresh the current table metadata."""
        raise NotImplementedError("To be implemented")

    @classmethod
    def from_metadata(cls, metadata_location: str, properties: Properties = EMPTY_DICT) -> StaticTable:
        io = load_file_io(properties=properties, location=metadata_location)
        file = io.new_input(metadata_location)

        from pyiceberg.serializers import FromInputFile

        metadata = FromInputFile.table_metadata(file)

        from pyiceberg.catalog.noop import NoopCatalog

        return cls(
            identifier=("static-table", metadata_location),
            metadata_location=metadata_location,
            metadata=metadata,
            io=load_file_io({**properties, **metadata.properties}),
            catalog=NoopCatalog("static-table"),
        )

refresh()

Refresh the current table metadata.

Source code in pyiceberg/table/__init__.py
def refresh(self) -> Table:
    """Refresh the current table metadata."""
    raise NotImplementedError("To be implemented")

Table

An Iceberg table.

Source code in pyiceberg/table/__init__.py
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
class Table:
    """An Iceberg table."""

    _identifier: Identifier = Field()
    metadata: TableMetadata
    metadata_location: str = Field()
    io: FileIO
    catalog: Catalog
    config: Dict[str, str]

    def __init__(
        self,
        identifier: Identifier,
        metadata: TableMetadata,
        metadata_location: str,
        io: FileIO,
        catalog: Catalog,
        config: Dict[str, str] = EMPTY_DICT,
    ) -> None:
        self._identifier = identifier
        self.metadata = metadata
        self.metadata_location = metadata_location
        self.io = io
        self.catalog = catalog
        self.config = config

    def transaction(self) -> Transaction:
        """Create a new transaction object to first stage the changes, and then commit them to the catalog.

        Returns:
            The transaction object
        """
        return Transaction(self)

    @property
    def inspect(self) -> InspectTable:
        """Return the InspectTable object to browse the table metadata.

        Returns:
            InspectTable object based on this Table.
        """
        return InspectTable(self)

    def refresh(self) -> Table:
        """Refresh the current table metadata.

        Returns:
            An updated instance of the same Iceberg table
        """
        fresh = self.catalog.load_table(self._identifier)
        self.metadata = fresh.metadata
        self.io = fresh.io
        self.metadata_location = fresh.metadata_location
        return self

    def name(self) -> Identifier:
        """Return the identifier of this table.

        Returns:
            An Identifier tuple of the table name
        """
        return self._identifier

    def scan(
        self,
        row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE,
        selected_fields: Tuple[str, ...] = ("*",),
        case_sensitive: bool = True,
        snapshot_id: Optional[int] = None,
        options: Properties = EMPTY_DICT,
        limit: Optional[int] = None,
    ) -> DataScan:
        """Fetch a DataScan based on the table's current metadata.

            The data scan can be used to project the table's data
            that matches the provided row_filter onto the table's
            current schema.

        Args:
            row_filter:
                A string or BooleanExpression that describes the
                desired rows
            selected_fields:
                A tuple of strings representing the column names
                to return in the output dataframe.
            case_sensitive:
                If True column matching is case sensitive
            snapshot_id:
                Optional Snapshot ID to time travel to. If None,
                scans the table as of the current snapshot ID.
            options:
                Additional Table properties as a dictionary of
                string key value pairs to use for this scan.
            limit:
                An integer representing the number of rows to
                return in the scan result. If None, fetches all
                matching rows.

        Returns:
            A DataScan based on the table's current metadata.
        """
        return DataScan(
            table_metadata=self.metadata,
            io=self.io,
            row_filter=row_filter,
            selected_fields=selected_fields,
            case_sensitive=case_sensitive,
            snapshot_id=snapshot_id,
            options=options,
            limit=limit,
        )

    @property
    def format_version(self) -> TableVersion:
        return self.metadata.format_version

    def schema(self) -> Schema:
        """Return the schema for this table."""
        return next(schema for schema in self.metadata.schemas if schema.schema_id == self.metadata.current_schema_id)

    def schemas(self) -> Dict[int, Schema]:
        """Return a dict of the schema of this table."""
        return {schema.schema_id: schema for schema in self.metadata.schemas}

    def spec(self) -> PartitionSpec:
        """Return the partition spec of this table."""
        return next(spec for spec in self.metadata.partition_specs if spec.spec_id == self.metadata.default_spec_id)

    def specs(self) -> Dict[int, PartitionSpec]:
        """Return a dict the partition specs this table."""
        return {spec.spec_id: spec for spec in self.metadata.partition_specs}

    def sort_order(self) -> SortOrder:
        """Return the sort order of this table."""
        return next(
            sort_order for sort_order in self.metadata.sort_orders if sort_order.order_id == self.metadata.default_sort_order_id
        )

    def sort_orders(self) -> Dict[int, SortOrder]:
        """Return a dict of the sort orders of this table."""
        return {sort_order.order_id: sort_order for sort_order in self.metadata.sort_orders}

    def last_partition_id(self) -> int:
        """Return the highest assigned partition field ID across all specs or 999 if only the unpartitioned spec exists."""
        if self.metadata.last_partition_id:
            return self.metadata.last_partition_id
        return PARTITION_FIELD_ID_START - 1

    @property
    def properties(self) -> Dict[str, str]:
        """Properties of the table."""
        return self.metadata.properties

    def location(self) -> str:
        """Return the table's base location."""
        return self.metadata.location

    def location_provider(self) -> LocationProvider:
        """Return the table's location provider."""
        return load_location_provider(table_location=self.metadata.location, table_properties=self.metadata.properties)

    @property
    def last_sequence_number(self) -> int:
        return self.metadata.last_sequence_number

    def current_snapshot(self) -> Optional[Snapshot]:
        """Get the current snapshot for this table, or None if there is no current snapshot."""
        if self.metadata.current_snapshot_id is not None:
            return self.snapshot_by_id(self.metadata.current_snapshot_id)
        return None

    def snapshots(self) -> List[Snapshot]:
        return self.metadata.snapshots

    def snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]:
        """Get the snapshot of this table with the given id, or None if there is no matching snapshot."""
        return self.metadata.snapshot_by_id(snapshot_id)

    def snapshot_by_name(self, name: str) -> Optional[Snapshot]:
        """Return the snapshot referenced by the given name or null if no such reference exists."""
        if ref := self.metadata.refs.get(name):
            return self.snapshot_by_id(ref.snapshot_id)
        return None

    def snapshot_as_of_timestamp(self, timestamp_ms: int, inclusive: bool = True) -> Optional[Snapshot]:
        """Get the snapshot that was current as of or right before the given timestamp, or None if there is no matching snapshot.

        Args:
            timestamp_ms: Find snapshot that was current at/before this timestamp
            inclusive: Includes timestamp_ms in search when True. Excludes timestamp_ms when False
        """
        for log_entry in reversed(self.history()):
            if (inclusive and log_entry.timestamp_ms <= timestamp_ms) or log_entry.timestamp_ms < timestamp_ms:
                return self.snapshot_by_id(log_entry.snapshot_id)
        return None

    def history(self) -> List[SnapshotLogEntry]:
        """Get the snapshot history of this table."""
        return self.metadata.snapshot_log

    def manage_snapshots(self) -> ManageSnapshots:
        """
        Shorthand to run snapshot management operations like create branch, create tag, etc.

        Use table.manage_snapshots().<operation>().commit() to run a specific operation.
        Use table.manage_snapshots().<operation-one>().<operation-two>().commit() to run multiple operations.
        Pending changes are applied on commit.

        We can also use context managers to make more changes. For example,

        with table.manage_snapshots() as ms:
           ms.create_tag(snapshot_id1, "Tag_A").create_tag(snapshot_id2, "Tag_B")
        """
        return ManageSnapshots(transaction=Transaction(self, autocommit=True))

    def update_statistics(self) -> UpdateStatistics:
        """
        Shorthand to run statistics management operations like add statistics and remove statistics.

        Use table.update_statistics().<operation>().commit() to run a specific operation.
        Use table.update_statistics().<operation-one>().<operation-two>().commit() to run multiple operations.

        Pending changes are applied on commit.

        We can also use context managers to make more changes. For example:

        with table.update_statistics() as update:
            update.set_statistics(statistics_file=statistics_file)
            update.remove_statistics(snapshot_id=2)
        """
        return UpdateStatistics(transaction=Transaction(self, autocommit=True))

    def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema:
        """Create a new UpdateSchema to alter the columns of this table.

        Args:
            allow_incompatible_changes: If changes are allowed that might break downstream consumers.
            case_sensitive: If field names are case-sensitive.

        Returns:
            A new UpdateSchema.
        """
        return UpdateSchema(
            transaction=Transaction(self, autocommit=True),
            allow_incompatible_changes=allow_incompatible_changes,
            case_sensitive=case_sensitive,
            name_mapping=self.name_mapping(),
        )

    def name_mapping(self) -> Optional[NameMapping]:
        """Return the table's field-id NameMapping."""
        return self.metadata.name_mapping()

    def upsert(
        self,
        df: pa.Table,
        join_cols: Optional[List[str]] = None,
        when_matched_update_all: bool = True,
        when_not_matched_insert_all: bool = True,
        case_sensitive: bool = True,
    ) -> UpsertResult:
        """Shorthand API for performing an upsert to an iceberg table.

        Args:

            df: The input dataframe to upsert with the table's data.
            join_cols: Columns to join on, if not provided, it will use the identifier-field-ids.
            when_matched_update_all: Bool indicating to update rows that are matched but require an update due to a value in a non-key column changing
            when_not_matched_insert_all: Bool indicating new rows to be inserted that do not match any existing rows in the table
            case_sensitive: Bool indicating if the match should be case-sensitive

            To learn more about the identifier-field-ids: https://iceberg.apache.org/spec/#identifier-field-ids

                Example Use Cases:
                    Case 1: Both Parameters = True (Full Upsert)
                    Existing row found → Update it
                    New row found → Insert it

                    Case 2: when_matched_update_all = False, when_not_matched_insert_all = True
                    Existing row found → Do nothing (no updates)
                    New row found → Insert it

                    Case 3: when_matched_update_all = True, when_not_matched_insert_all = False
                    Existing row found → Update it
                    New row found → Do nothing (no inserts)

                    Case 4: Both Parameters = False (No Merge Effect)
                    Existing row found → Do nothing
                    New row found → Do nothing
                    (Function effectively does nothing)


        Returns:
            An UpsertResult class (contains details of rows updated and inserted)
        """
        try:
            import pyarrow as pa  # noqa: F401
        except ModuleNotFoundError as e:
            raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e

        from pyiceberg.io.pyarrow import expression_to_pyarrow
        from pyiceberg.table import upsert_util

        if join_cols is None:
            join_cols = []
            for field_id in self.schema().identifier_field_ids:
                col = self.schema().find_column_name(field_id)
                if col is not None:
                    join_cols.append(col)
                else:
                    raise ValueError(f"Field-ID could not be found: {join_cols}")

        if len(join_cols) == 0:
            raise ValueError("Join columns could not be found, please set identifier-field-ids or pass in explicitly.")

        if not when_matched_update_all and not when_not_matched_insert_all:
            raise ValueError("no upsert options selected...exiting")

        if upsert_util.has_duplicate_rows(df, join_cols):
            raise ValueError("Duplicate rows found in source dataset based on the key columns. No upsert executed")

        from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible

        downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
        _check_pyarrow_schema_compatible(
            self.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
        )

        # get list of rows that exist so we don't have to load the entire target table
        matched_predicate = upsert_util.create_match_filter(df, join_cols)
        matched_iceberg_table = self.scan(row_filter=matched_predicate, case_sensitive=case_sensitive).to_arrow()

        update_row_cnt = 0
        insert_row_cnt = 0

        with self.transaction() as tx:
            if when_matched_update_all:
                # function get_rows_to_update is doing a check on non-key columns to see if any of the values have actually changed
                # we don't want to do just a blanket overwrite for matched rows if the actual non-key column data hasn't changed
                # this extra step avoids unnecessary IO and writes
                rows_to_update = upsert_util.get_rows_to_update(df, matched_iceberg_table, join_cols)

                update_row_cnt = len(rows_to_update)

                # build the match predicate filter
                overwrite_mask_predicate = upsert_util.create_match_filter(rows_to_update, join_cols)

                tx.overwrite(rows_to_update, overwrite_filter=overwrite_mask_predicate)

            if when_not_matched_insert_all:
                expr_match = upsert_util.create_match_filter(matched_iceberg_table, join_cols)
                expr_match_bound = bind(self.schema(), expr_match, case_sensitive=case_sensitive)
                expr_match_arrow = expression_to_pyarrow(expr_match_bound)
                rows_to_insert = df.filter(~expr_match_arrow)

                insert_row_cnt = len(rows_to_insert)

                tx.append(rows_to_insert)

        return UpsertResult(rows_updated=update_row_cnt, rows_inserted=insert_row_cnt)

    def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
        """
        Shorthand API for appending a PyArrow table to the table.

        Args:
            df: The Arrow dataframe that will be appended to overwrite the table
            snapshot_properties: Custom properties to be added to the snapshot summary
        """
        with self.transaction() as tx:
            tx.append(df=df, snapshot_properties=snapshot_properties)

    def dynamic_partition_overwrite(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
        """Shorthand for dynamic overwriting the table with a PyArrow table.

        Old partitions are auto detected and replaced with data files created for input arrow table.
        Args:
            df: The Arrow dataframe that will be used to overwrite the table
            snapshot_properties: Custom properties to be added to the snapshot summary
        """
        with self.transaction() as tx:
            tx.dynamic_partition_overwrite(df=df, snapshot_properties=snapshot_properties)

    def overwrite(
        self,
        df: pa.Table,
        overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
        snapshot_properties: Dict[str, str] = EMPTY_DICT,
        case_sensitive: bool = True,
    ) -> None:
        """
        Shorthand for overwriting the table with a PyArrow table.

        An overwrite may produce zero or more snapshots based on the operation:

            - DELETE: In case existing Parquet files can be dropped completely.
            - REPLACE: In case existing Parquet files need to be rewritten.
            - APPEND: In case new data is being inserted into the table.

        Args:
            df: The Arrow dataframe that will be used to overwrite the table
            overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
                              or a boolean expression in case of a partial overwrite
            snapshot_properties: Custom properties to be added to the snapshot summary
            case_sensitive: A bool determine if the provided `overwrite_filter` is case-sensitive
        """
        with self.transaction() as tx:
            tx.overwrite(
                df=df, overwrite_filter=overwrite_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties
            )

    def delete(
        self,
        delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
        snapshot_properties: Dict[str, str] = EMPTY_DICT,
        case_sensitive: bool = True,
    ) -> None:
        """
        Shorthand for deleting rows from the table.

        Args:
            delete_filter: The predicate that used to remove rows
            snapshot_properties: Custom properties to be added to the snapshot summary
            case_sensitive: A bool determine if the provided `delete_filter` is case-sensitive
        """
        with self.transaction() as tx:
            tx.delete(delete_filter=delete_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties)

    def add_files(
        self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True
    ) -> None:
        """
        Shorthand API for adding files as data files to the table.

        Args:
            file_paths: The list of full file paths to be added as data files to the table

        Raises:
            FileNotFoundError: If the file does not exist.
        """
        with self.transaction() as tx:
            tx.add_files(
                file_paths=file_paths, snapshot_properties=snapshot_properties, check_duplicate_files=check_duplicate_files
            )

    def update_spec(self, case_sensitive: bool = True) -> UpdateSpec:
        return UpdateSpec(Transaction(self, autocommit=True), case_sensitive=case_sensitive)

    def refs(self) -> Dict[str, SnapshotRef]:
        """Return the snapshot references in the table."""
        return self.metadata.refs

    def _do_commit(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...]) -> None:
        response = self.catalog.commit_table(self, requirements, updates)

        # https://github.com/apache/iceberg/blob/f6faa58/core/src/main/java/org/apache/iceberg/CatalogUtil.java#L527
        # delete old metadata if METADATA_DELETE_AFTER_COMMIT_ENABLED is set to true and uses
        # TableProperties.METADATA_PREVIOUS_VERSIONS_MAX to determine how many previous versions to keep -
        # everything else will be removed.
        try:
            self.catalog._delete_old_metadata(self.io, self.metadata, response.metadata)
        except Exception as e:
            warnings.warn(f"Failed to delete old metadata after commit: {e}")

        self.metadata = response.metadata
        self.metadata_location = response.metadata_location

    def __eq__(self, other: Any) -> bool:
        """Return the equality of two instances of the Table class."""
        return (
            self.name() == other.name() and self.metadata == other.metadata and self.metadata_location == other.metadata_location
            if isinstance(other, Table)
            else False
        )

    def __repr__(self) -> str:
        """Return the string representation of the Table class."""
        table_name = self.catalog.table_name_from(self._identifier)
        schema_str = ",\n  ".join(str(column) for column in self.schema().columns if self.schema())
        partition_str = f"partition by: [{', '.join(field.name for field in self.spec().fields if self.spec())}]"
        sort_order_str = f"sort order: [{', '.join(str(field) for field in self.sort_order().fields if self.sort_order())}]"
        snapshot_str = f"snapshot: {str(self.current_snapshot()) if self.current_snapshot() else 'null'}"
        result_str = f"{table_name}(\n  {schema_str}\n),\n{partition_str},\n{sort_order_str},\n{snapshot_str}"
        return result_str

    def to_daft(self) -> daft.DataFrame:
        """Read a Daft DataFrame lazily from this Iceberg table.

        Returns:
            daft.DataFrame: Unmaterialized Daft Dataframe created from the Iceberg table
        """
        import daft

        return daft.read_iceberg(self)

    def to_polars(self) -> pl.LazyFrame:
        """Lazily read from this Apache Iceberg table.

        Returns:
            pl.LazyFrame: Unmaterialized Polars LazyFrame created from the Iceberg table
        """
        import polars as pl

        return pl.scan_iceberg(self)

inspect property

Return the InspectTable object to browse the table metadata.

Returns:

Type Description
InspectTable

InspectTable object based on this Table.

properties property

Properties of the table.

__eq__(other)

Return the equality of two instances of the Table class.

Source code in pyiceberg/table/__init__.py
def __eq__(self, other: Any) -> bool:
    """Return the equality of two instances of the Table class."""
    return (
        self.name() == other.name() and self.metadata == other.metadata and self.metadata_location == other.metadata_location
        if isinstance(other, Table)
        else False
    )

__repr__()

Return the string representation of the Table class.

Source code in pyiceberg/table/__init__.py
def __repr__(self) -> str:
    """Return the string representation of the Table class."""
    table_name = self.catalog.table_name_from(self._identifier)
    schema_str = ",\n  ".join(str(column) for column in self.schema().columns if self.schema())
    partition_str = f"partition by: [{', '.join(field.name for field in self.spec().fields if self.spec())}]"
    sort_order_str = f"sort order: [{', '.join(str(field) for field in self.sort_order().fields if self.sort_order())}]"
    snapshot_str = f"snapshot: {str(self.current_snapshot()) if self.current_snapshot() else 'null'}"
    result_str = f"{table_name}(\n  {schema_str}\n),\n{partition_str},\n{sort_order_str},\n{snapshot_str}"
    return result_str

add_files(file_paths, snapshot_properties=EMPTY_DICT, check_duplicate_files=True)

Shorthand API for adding files as data files to the table.

Parameters:

Name Type Description Default
file_paths List[str]

The list of full file paths to be added as data files to the table

required

Raises:

Type Description
FileNotFoundError

If the file does not exist.

Source code in pyiceberg/table/__init__.py
def add_files(
    self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True
) -> None:
    """
    Shorthand API for adding files as data files to the table.

    Args:
        file_paths: The list of full file paths to be added as data files to the table

    Raises:
        FileNotFoundError: If the file does not exist.
    """
    with self.transaction() as tx:
        tx.add_files(
            file_paths=file_paths, snapshot_properties=snapshot_properties, check_duplicate_files=check_duplicate_files
        )

append(df, snapshot_properties=EMPTY_DICT)

Shorthand API for appending a PyArrow table to the table.

Parameters:

Name Type Description Default
df Table

The Arrow dataframe that will be appended to overwrite the table

required
snapshot_properties Dict[str, str]

Custom properties to be added to the snapshot summary

EMPTY_DICT
Source code in pyiceberg/table/__init__.py
def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
    """
    Shorthand API for appending a PyArrow table to the table.

    Args:
        df: The Arrow dataframe that will be appended to overwrite the table
        snapshot_properties: Custom properties to be added to the snapshot summary
    """
    with self.transaction() as tx:
        tx.append(df=df, snapshot_properties=snapshot_properties)

current_snapshot()

Get the current snapshot for this table, or None if there is no current snapshot.

Source code in pyiceberg/table/__init__.py
def current_snapshot(self) -> Optional[Snapshot]:
    """Get the current snapshot for this table, or None if there is no current snapshot."""
    if self.metadata.current_snapshot_id is not None:
        return self.snapshot_by_id(self.metadata.current_snapshot_id)
    return None

delete(delete_filter=ALWAYS_TRUE, snapshot_properties=EMPTY_DICT, case_sensitive=True)

Shorthand for deleting rows from the table.

Parameters:

Name Type Description Default
delete_filter Union[BooleanExpression, str]

The predicate that used to remove rows

ALWAYS_TRUE
snapshot_properties Dict[str, str]

Custom properties to be added to the snapshot summary

EMPTY_DICT
case_sensitive bool

A bool determine if the provided delete_filter is case-sensitive

True
Source code in pyiceberg/table/__init__.py
def delete(
    self,
    delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
    snapshot_properties: Dict[str, str] = EMPTY_DICT,
    case_sensitive: bool = True,
) -> None:
    """
    Shorthand for deleting rows from the table.

    Args:
        delete_filter: The predicate that used to remove rows
        snapshot_properties: Custom properties to be added to the snapshot summary
        case_sensitive: A bool determine if the provided `delete_filter` is case-sensitive
    """
    with self.transaction() as tx:
        tx.delete(delete_filter=delete_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties)

dynamic_partition_overwrite(df, snapshot_properties=EMPTY_DICT)

Shorthand for dynamic overwriting the table with a PyArrow table.

Old partitions are auto detected and replaced with data files created for input arrow table. Args: df: The Arrow dataframe that will be used to overwrite the table snapshot_properties: Custom properties to be added to the snapshot summary

Source code in pyiceberg/table/__init__.py
def dynamic_partition_overwrite(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
    """Shorthand for dynamic overwriting the table with a PyArrow table.

    Old partitions are auto detected and replaced with data files created for input arrow table.
    Args:
        df: The Arrow dataframe that will be used to overwrite the table
        snapshot_properties: Custom properties to be added to the snapshot summary
    """
    with self.transaction() as tx:
        tx.dynamic_partition_overwrite(df=df, snapshot_properties=snapshot_properties)

history()

Get the snapshot history of this table.

Source code in pyiceberg/table/__init__.py
def history(self) -> List[SnapshotLogEntry]:
    """Get the snapshot history of this table."""
    return self.metadata.snapshot_log

last_partition_id()

Return the highest assigned partition field ID across all specs or 999 if only the unpartitioned spec exists.

Source code in pyiceberg/table/__init__.py
def last_partition_id(self) -> int:
    """Return the highest assigned partition field ID across all specs or 999 if only the unpartitioned spec exists."""
    if self.metadata.last_partition_id:
        return self.metadata.last_partition_id
    return PARTITION_FIELD_ID_START - 1

location()

Return the table's base location.

Source code in pyiceberg/table/__init__.py
def location(self) -> str:
    """Return the table's base location."""
    return self.metadata.location

location_provider()

Return the table's location provider.

Source code in pyiceberg/table/__init__.py
def location_provider(self) -> LocationProvider:
    """Return the table's location provider."""
    return load_location_provider(table_location=self.metadata.location, table_properties=self.metadata.properties)

manage_snapshots()

Shorthand to run snapshot management operations like create branch, create tag, etc.

Use table.manage_snapshots().().commit() to run a specific operation. Use table.manage_snapshots().().().commit() to run multiple operations. Pending changes are applied on commit.

We can also use context managers to make more changes. For example,

with table.manage_snapshots() as ms: ms.create_tag(snapshot_id1, "Tag_A").create_tag(snapshot_id2, "Tag_B")

Source code in pyiceberg/table/__init__.py
def manage_snapshots(self) -> ManageSnapshots:
    """
    Shorthand to run snapshot management operations like create branch, create tag, etc.

    Use table.manage_snapshots().<operation>().commit() to run a specific operation.
    Use table.manage_snapshots().<operation-one>().<operation-two>().commit() to run multiple operations.
    Pending changes are applied on commit.

    We can also use context managers to make more changes. For example,

    with table.manage_snapshots() as ms:
       ms.create_tag(snapshot_id1, "Tag_A").create_tag(snapshot_id2, "Tag_B")
    """
    return ManageSnapshots(transaction=Transaction(self, autocommit=True))

name()

Return the identifier of this table.

Returns:

Type Description
Identifier

An Identifier tuple of the table name

Source code in pyiceberg/table/__init__.py
def name(self) -> Identifier:
    """Return the identifier of this table.

    Returns:
        An Identifier tuple of the table name
    """
    return self._identifier

name_mapping()

Return the table's field-id NameMapping.

Source code in pyiceberg/table/__init__.py
def name_mapping(self) -> Optional[NameMapping]:
    """Return the table's field-id NameMapping."""
    return self.metadata.name_mapping()

overwrite(df, overwrite_filter=ALWAYS_TRUE, snapshot_properties=EMPTY_DICT, case_sensitive=True)

Shorthand for overwriting the table with a PyArrow table.

An overwrite may produce zero or more snapshots based on the operation:

- DELETE: In case existing Parquet files can be dropped completely.
- REPLACE: In case existing Parquet files need to be rewritten.
- APPEND: In case new data is being inserted into the table.

Parameters:

Name Type Description Default
df Table

The Arrow dataframe that will be used to overwrite the table

required
overwrite_filter Union[BooleanExpression, str]

ALWAYS_TRUE when you overwrite all the data, or a boolean expression in case of a partial overwrite

ALWAYS_TRUE
snapshot_properties Dict[str, str]

Custom properties to be added to the snapshot summary

EMPTY_DICT
case_sensitive bool

A bool determine if the provided overwrite_filter is case-sensitive

True
Source code in pyiceberg/table/__init__.py
def overwrite(
    self,
    df: pa.Table,
    overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
    snapshot_properties: Dict[str, str] = EMPTY_DICT,
    case_sensitive: bool = True,
) -> None:
    """
    Shorthand for overwriting the table with a PyArrow table.

    An overwrite may produce zero or more snapshots based on the operation:

        - DELETE: In case existing Parquet files can be dropped completely.
        - REPLACE: In case existing Parquet files need to be rewritten.
        - APPEND: In case new data is being inserted into the table.

    Args:
        df: The Arrow dataframe that will be used to overwrite the table
        overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
                          or a boolean expression in case of a partial overwrite
        snapshot_properties: Custom properties to be added to the snapshot summary
        case_sensitive: A bool determine if the provided `overwrite_filter` is case-sensitive
    """
    with self.transaction() as tx:
        tx.overwrite(
            df=df, overwrite_filter=overwrite_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties
        )

refresh()

Refresh the current table metadata.

Returns:

Type Description
Table

An updated instance of the same Iceberg table

Source code in pyiceberg/table/__init__.py
def refresh(self) -> Table:
    """Refresh the current table metadata.

    Returns:
        An updated instance of the same Iceberg table
    """
    fresh = self.catalog.load_table(self._identifier)
    self.metadata = fresh.metadata
    self.io = fresh.io
    self.metadata_location = fresh.metadata_location
    return self

refs()

Return the snapshot references in the table.

Source code in pyiceberg/table/__init__.py
def refs(self) -> Dict[str, SnapshotRef]:
    """Return the snapshot references in the table."""
    return self.metadata.refs

scan(row_filter=ALWAYS_TRUE, selected_fields=('*',), case_sensitive=True, snapshot_id=None, options=EMPTY_DICT, limit=None)

Fetch a DataScan based on the table's current metadata.

The data scan can be used to project the table's data
that matches the provided row_filter onto the table's
current schema.

Parameters:

Name Type Description Default
row_filter Union[str, BooleanExpression]

A string or BooleanExpression that describes the desired rows

ALWAYS_TRUE
selected_fields Tuple[str, ...]

A tuple of strings representing the column names to return in the output dataframe.

('*',)
case_sensitive bool

If True column matching is case sensitive

True
snapshot_id Optional[int]

Optional Snapshot ID to time travel to. If None, scans the table as of the current snapshot ID.

None
options Properties

Additional Table properties as a dictionary of string key value pairs to use for this scan.

EMPTY_DICT
limit Optional[int]

An integer representing the number of rows to return in the scan result. If None, fetches all matching rows.

None

Returns:

Type Description
DataScan

A DataScan based on the table's current metadata.

Source code in pyiceberg/table/__init__.py
def scan(
    self,
    row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE,
    selected_fields: Tuple[str, ...] = ("*",),
    case_sensitive: bool = True,
    snapshot_id: Optional[int] = None,
    options: Properties = EMPTY_DICT,
    limit: Optional[int] = None,
) -> DataScan:
    """Fetch a DataScan based on the table's current metadata.

        The data scan can be used to project the table's data
        that matches the provided row_filter onto the table's
        current schema.

    Args:
        row_filter:
            A string or BooleanExpression that describes the
            desired rows
        selected_fields:
            A tuple of strings representing the column names
            to return in the output dataframe.
        case_sensitive:
            If True column matching is case sensitive
        snapshot_id:
            Optional Snapshot ID to time travel to. If None,
            scans the table as of the current snapshot ID.
        options:
            Additional Table properties as a dictionary of
            string key value pairs to use for this scan.
        limit:
            An integer representing the number of rows to
            return in the scan result. If None, fetches all
            matching rows.

    Returns:
        A DataScan based on the table's current metadata.
    """
    return DataScan(
        table_metadata=self.metadata,
        io=self.io,
        row_filter=row_filter,
        selected_fields=selected_fields,
        case_sensitive=case_sensitive,
        snapshot_id=snapshot_id,
        options=options,
        limit=limit,
    )

schema()

Return the schema for this table.

Source code in pyiceberg/table/__init__.py
def schema(self) -> Schema:
    """Return the schema for this table."""
    return next(schema for schema in self.metadata.schemas if schema.schema_id == self.metadata.current_schema_id)

schemas()

Return a dict of the schema of this table.

Source code in pyiceberg/table/__init__.py
def schemas(self) -> Dict[int, Schema]:
    """Return a dict of the schema of this table."""
    return {schema.schema_id: schema for schema in self.metadata.schemas}

snapshot_as_of_timestamp(timestamp_ms, inclusive=True)

Get the snapshot that was current as of or right before the given timestamp, or None if there is no matching snapshot.

Parameters:

Name Type Description Default
timestamp_ms int

Find snapshot that was current at/before this timestamp

required
inclusive bool

Includes timestamp_ms in search when True. Excludes timestamp_ms when False

True
Source code in pyiceberg/table/__init__.py
def snapshot_as_of_timestamp(self, timestamp_ms: int, inclusive: bool = True) -> Optional[Snapshot]:
    """Get the snapshot that was current as of or right before the given timestamp, or None if there is no matching snapshot.

    Args:
        timestamp_ms: Find snapshot that was current at/before this timestamp
        inclusive: Includes timestamp_ms in search when True. Excludes timestamp_ms when False
    """
    for log_entry in reversed(self.history()):
        if (inclusive and log_entry.timestamp_ms <= timestamp_ms) or log_entry.timestamp_ms < timestamp_ms:
            return self.snapshot_by_id(log_entry.snapshot_id)
    return None

snapshot_by_id(snapshot_id)

Get the snapshot of this table with the given id, or None if there is no matching snapshot.

Source code in pyiceberg/table/__init__.py
def snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]:
    """Get the snapshot of this table with the given id, or None if there is no matching snapshot."""
    return self.metadata.snapshot_by_id(snapshot_id)

snapshot_by_name(name)

Return the snapshot referenced by the given name or null if no such reference exists.

Source code in pyiceberg/table/__init__.py
def snapshot_by_name(self, name: str) -> Optional[Snapshot]:
    """Return the snapshot referenced by the given name or null if no such reference exists."""
    if ref := self.metadata.refs.get(name):
        return self.snapshot_by_id(ref.snapshot_id)
    return None

sort_order()

Return the sort order of this table.

Source code in pyiceberg/table/__init__.py
def sort_order(self) -> SortOrder:
    """Return the sort order of this table."""
    return next(
        sort_order for sort_order in self.metadata.sort_orders if sort_order.order_id == self.metadata.default_sort_order_id
    )

sort_orders()

Return a dict of the sort orders of this table.

Source code in pyiceberg/table/__init__.py
def sort_orders(self) -> Dict[int, SortOrder]:
    """Return a dict of the sort orders of this table."""
    return {sort_order.order_id: sort_order for sort_order in self.metadata.sort_orders}

spec()

Return the partition spec of this table.

Source code in pyiceberg/table/__init__.py
def spec(self) -> PartitionSpec:
    """Return the partition spec of this table."""
    return next(spec for spec in self.metadata.partition_specs if spec.spec_id == self.metadata.default_spec_id)

specs()

Return a dict the partition specs this table.

Source code in pyiceberg/table/__init__.py
def specs(self) -> Dict[int, PartitionSpec]:
    """Return a dict the partition specs this table."""
    return {spec.spec_id: spec for spec in self.metadata.partition_specs}

to_daft()

Read a Daft DataFrame lazily from this Iceberg table.

Returns:

Type Description
DataFrame

daft.DataFrame: Unmaterialized Daft Dataframe created from the Iceberg table

Source code in pyiceberg/table/__init__.py
def to_daft(self) -> daft.DataFrame:
    """Read a Daft DataFrame lazily from this Iceberg table.

    Returns:
        daft.DataFrame: Unmaterialized Daft Dataframe created from the Iceberg table
    """
    import daft

    return daft.read_iceberg(self)

to_polars()

Lazily read from this Apache Iceberg table.

Returns:

Type Description
LazyFrame

pl.LazyFrame: Unmaterialized Polars LazyFrame created from the Iceberg table

Source code in pyiceberg/table/__init__.py
def to_polars(self) -> pl.LazyFrame:
    """Lazily read from this Apache Iceberg table.

    Returns:
        pl.LazyFrame: Unmaterialized Polars LazyFrame created from the Iceberg table
    """
    import polars as pl

    return pl.scan_iceberg(self)

transaction()

Create a new transaction object to first stage the changes, and then commit them to the catalog.

Returns:

Type Description
Transaction

The transaction object

Source code in pyiceberg/table/__init__.py
def transaction(self) -> Transaction:
    """Create a new transaction object to first stage the changes, and then commit them to the catalog.

    Returns:
        The transaction object
    """
    return Transaction(self)

update_schema(allow_incompatible_changes=False, case_sensitive=True)

Create a new UpdateSchema to alter the columns of this table.

Parameters:

Name Type Description Default
allow_incompatible_changes bool

If changes are allowed that might break downstream consumers.

False
case_sensitive bool

If field names are case-sensitive.

True

Returns:

Type Description
UpdateSchema

A new UpdateSchema.

Source code in pyiceberg/table/__init__.py
def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema:
    """Create a new UpdateSchema to alter the columns of this table.

    Args:
        allow_incompatible_changes: If changes are allowed that might break downstream consumers.
        case_sensitive: If field names are case-sensitive.

    Returns:
        A new UpdateSchema.
    """
    return UpdateSchema(
        transaction=Transaction(self, autocommit=True),
        allow_incompatible_changes=allow_incompatible_changes,
        case_sensitive=case_sensitive,
        name_mapping=self.name_mapping(),
    )

update_statistics()

Shorthand to run statistics management operations like add statistics and remove statistics.

Use table.update_statistics().().commit() to run a specific operation. Use table.update_statistics().().().commit() to run multiple operations.

Pending changes are applied on commit.

We can also use context managers to make more changes. For example:

with table.update_statistics() as update: update.set_statistics(statistics_file=statistics_file) update.remove_statistics(snapshot_id=2)

Source code in pyiceberg/table/__init__.py
def update_statistics(self) -> UpdateStatistics:
    """
    Shorthand to run statistics management operations like add statistics and remove statistics.

    Use table.update_statistics().<operation>().commit() to run a specific operation.
    Use table.update_statistics().<operation-one>().<operation-two>().commit() to run multiple operations.

    Pending changes are applied on commit.

    We can also use context managers to make more changes. For example:

    with table.update_statistics() as update:
        update.set_statistics(statistics_file=statistics_file)
        update.remove_statistics(snapshot_id=2)
    """
    return UpdateStatistics(transaction=Transaction(self, autocommit=True))

upsert(df, join_cols=None, when_matched_update_all=True, when_not_matched_insert_all=True, case_sensitive=True)

Shorthand API for performing an upsert to an iceberg table.

Args:

df: The input dataframe to upsert with the table's data.
join_cols: Columns to join on, if not provided, it will use the identifier-field-ids.
when_matched_update_all: Bool indicating to update rows that are matched but require an update due to a value in a non-key column changing
when_not_matched_insert_all: Bool indicating new rows to be inserted that do not match any existing rows in the table
case_sensitive: Bool indicating if the match should be case-sensitive

To learn more about the identifier-field-ids: https://iceberg.apache.org/spec/#identifier-field-ids

    Example Use Cases:
        Case 1: Both Parameters = True (Full Upsert)
        Existing row found → Update it
        New row found → Insert it

        Case 2: when_matched_update_all = False, when_not_matched_insert_all = True
        Existing row found → Do nothing (no updates)
        New row found → Insert it

        Case 3: when_matched_update_all = True, when_not_matched_insert_all = False
        Existing row found → Update it
        New row found → Do nothing (no inserts)

        Case 4: Both Parameters = False (No Merge Effect)
        Existing row found → Do nothing
        New row found → Do nothing
        (Function effectively does nothing)

Returns:

Type Description
UpsertResult

An UpsertResult class (contains details of rows updated and inserted)

Source code in pyiceberg/table/__init__.py
def upsert(
    self,
    df: pa.Table,
    join_cols: Optional[List[str]] = None,
    when_matched_update_all: bool = True,
    when_not_matched_insert_all: bool = True,
    case_sensitive: bool = True,
) -> UpsertResult:
    """Shorthand API for performing an upsert to an iceberg table.

    Args:

        df: The input dataframe to upsert with the table's data.
        join_cols: Columns to join on, if not provided, it will use the identifier-field-ids.
        when_matched_update_all: Bool indicating to update rows that are matched but require an update due to a value in a non-key column changing
        when_not_matched_insert_all: Bool indicating new rows to be inserted that do not match any existing rows in the table
        case_sensitive: Bool indicating if the match should be case-sensitive

        To learn more about the identifier-field-ids: https://iceberg.apache.org/spec/#identifier-field-ids

            Example Use Cases:
                Case 1: Both Parameters = True (Full Upsert)
                Existing row found → Update it
                New row found → Insert it

                Case 2: when_matched_update_all = False, when_not_matched_insert_all = True
                Existing row found → Do nothing (no updates)
                New row found → Insert it

                Case 3: when_matched_update_all = True, when_not_matched_insert_all = False
                Existing row found → Update it
                New row found → Do nothing (no inserts)

                Case 4: Both Parameters = False (No Merge Effect)
                Existing row found → Do nothing
                New row found → Do nothing
                (Function effectively does nothing)


    Returns:
        An UpsertResult class (contains details of rows updated and inserted)
    """
    try:
        import pyarrow as pa  # noqa: F401
    except ModuleNotFoundError as e:
        raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e

    from pyiceberg.io.pyarrow import expression_to_pyarrow
    from pyiceberg.table import upsert_util

    if join_cols is None:
        join_cols = []
        for field_id in self.schema().identifier_field_ids:
            col = self.schema().find_column_name(field_id)
            if col is not None:
                join_cols.append(col)
            else:
                raise ValueError(f"Field-ID could not be found: {join_cols}")

    if len(join_cols) == 0:
        raise ValueError("Join columns could not be found, please set identifier-field-ids or pass in explicitly.")

    if not when_matched_update_all and not when_not_matched_insert_all:
        raise ValueError("no upsert options selected...exiting")

    if upsert_util.has_duplicate_rows(df, join_cols):
        raise ValueError("Duplicate rows found in source dataset based on the key columns. No upsert executed")

    from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible

    downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
    _check_pyarrow_schema_compatible(
        self.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
    )

    # get list of rows that exist so we don't have to load the entire target table
    matched_predicate = upsert_util.create_match_filter(df, join_cols)
    matched_iceberg_table = self.scan(row_filter=matched_predicate, case_sensitive=case_sensitive).to_arrow()

    update_row_cnt = 0
    insert_row_cnt = 0

    with self.transaction() as tx:
        if when_matched_update_all:
            # function get_rows_to_update is doing a check on non-key columns to see if any of the values have actually changed
            # we don't want to do just a blanket overwrite for matched rows if the actual non-key column data hasn't changed
            # this extra step avoids unnecessary IO and writes
            rows_to_update = upsert_util.get_rows_to_update(df, matched_iceberg_table, join_cols)

            update_row_cnt = len(rows_to_update)

            # build the match predicate filter
            overwrite_mask_predicate = upsert_util.create_match_filter(rows_to_update, join_cols)

            tx.overwrite(rows_to_update, overwrite_filter=overwrite_mask_predicate)

        if when_not_matched_insert_all:
            expr_match = upsert_util.create_match_filter(matched_iceberg_table, join_cols)
            expr_match_bound = bind(self.schema(), expr_match, case_sensitive=case_sensitive)
            expr_match_arrow = expression_to_pyarrow(expr_match_bound)
            rows_to_insert = df.filter(~expr_match_arrow)

            insert_row_cnt = len(rows_to_insert)

            tx.append(rows_to_insert)

    return UpsertResult(rows_updated=update_row_cnt, rows_inserted=insert_row_cnt)

TableIdentifier

Bases: IcebergBaseModel

Fully Qualified identifier to a table.

Source code in pyiceberg/table/__init__.py
class TableIdentifier(IcebergBaseModel):
    """Fully Qualified identifier to a table."""

    namespace: Namespace
    name: str

TableScan

Bases: ABC

Source code in pyiceberg/table/__init__.py
class TableScan(ABC):
    table_metadata: TableMetadata
    io: FileIO
    row_filter: BooleanExpression
    selected_fields: Tuple[str, ...]
    case_sensitive: bool
    snapshot_id: Optional[int]
    options: Properties
    limit: Optional[int]

    def __init__(
        self,
        table_metadata: TableMetadata,
        io: FileIO,
        row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE,
        selected_fields: Tuple[str, ...] = ("*",),
        case_sensitive: bool = True,
        snapshot_id: Optional[int] = None,
        options: Properties = EMPTY_DICT,
        limit: Optional[int] = None,
    ):
        self.table_metadata = table_metadata
        self.io = io
        self.row_filter = _parse_row_filter(row_filter)
        self.selected_fields = selected_fields
        self.case_sensitive = case_sensitive
        self.snapshot_id = snapshot_id
        self.options = options
        self.limit = limit

    def snapshot(self) -> Optional[Snapshot]:
        if self.snapshot_id:
            return self.table_metadata.snapshot_by_id(self.snapshot_id)
        return self.table_metadata.current_snapshot()

    def projection(self) -> Schema:
        current_schema = self.table_metadata.schema()
        if self.snapshot_id is not None:
            snapshot = self.table_metadata.snapshot_by_id(self.snapshot_id)
            if snapshot is not None:
                if snapshot.schema_id is not None:
                    try:
                        current_schema = next(
                            schema for schema in self.table_metadata.schemas if schema.schema_id == snapshot.schema_id
                        )
                    except StopIteration:
                        warnings.warn(f"Metadata does not contain schema with id: {snapshot.schema_id}")
            else:
                raise ValueError(f"Snapshot not found: {self.snapshot_id}")

        if "*" in self.selected_fields:
            return current_schema

        return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive)

    @abstractmethod
    def plan_files(self) -> Iterable[ScanTask]: ...

    @abstractmethod
    def to_arrow(self) -> pa.Table: ...

    @abstractmethod
    def to_pandas(self, **kwargs: Any) -> pd.DataFrame: ...

    @abstractmethod
    def to_polars(self) -> pl.DataFrame: ...

    def update(self: S, **overrides: Any) -> S:
        """Create a copy of this table scan with updated fields."""
        return type(self)(**{**self.__dict__, **overrides})

    def use_ref(self: S, name: str) -> S:
        if self.snapshot_id:
            raise ValueError(f"Cannot override ref, already set snapshot id={self.snapshot_id}")
        if snapshot := self.table_metadata.snapshot_by_name(name):
            return self.update(snapshot_id=snapshot.snapshot_id)

        raise ValueError(f"Cannot scan unknown ref={name}")

    def select(self: S, *field_names: str) -> S:
        if "*" in self.selected_fields:
            return self.update(selected_fields=field_names)
        return self.update(selected_fields=tuple(set(self.selected_fields).intersection(set(field_names))))

    def filter(self: S, expr: Union[str, BooleanExpression]) -> S:
        return self.update(row_filter=And(self.row_filter, _parse_row_filter(expr)))

    def with_case_sensitive(self: S, case_sensitive: bool = True) -> S:
        return self.update(case_sensitive=case_sensitive)

    @abstractmethod
    def count(self) -> int: ...

update(**overrides)

Create a copy of this table scan with updated fields.

Source code in pyiceberg/table/__init__.py
def update(self: S, **overrides: Any) -> S:
    """Create a copy of this table scan with updated fields."""
    return type(self)(**{**self.__dict__, **overrides})

Transaction

Source code in pyiceberg/table/__init__.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
class Transaction:
    _table: Table
    table_metadata: TableMetadata
    _autocommit: bool
    _updates: Tuple[TableUpdate, ...]
    _requirements: Tuple[TableRequirement, ...]

    def __init__(self, table: Table, autocommit: bool = False):
        """Open a transaction to stage and commit changes to a table.

        Args:
            table: The table that will be altered.
            autocommit: Option to automatically commit the changes when they are staged.
        """
        self.table_metadata = table.metadata
        self._table = table
        self._autocommit = autocommit
        self._updates = ()
        self._requirements = ()

    def __enter__(self) -> Transaction:
        """Start a transaction to update the table."""
        return self

    def __exit__(
        self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType]
    ) -> None:
        """Close and commit the transaction if no exceptions have been raised."""
        if exctype is None and excinst is None and exctb is None:
            self.commit_transaction()

    def _apply(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...] = ()) -> Transaction:
        """Check if the requirements are met, and applies the updates to the metadata."""
        for requirement in requirements:
            requirement.validate(self.table_metadata)

        self._updates += updates

        # For the requirements, it does not make sense to add a requirement more than once
        # For example, you cannot assert that the current schema has two different IDs
        existing_requirements = {type(requirement) for requirement in self._requirements}
        for new_requirement in requirements:
            if type(new_requirement) not in existing_requirements:
                self._requirements = self._requirements + (new_requirement,)

        self.table_metadata = update_table_metadata(self.table_metadata, updates)

        if self._autocommit:
            self.commit_transaction()
            self._updates = ()
            self._requirements = ()

        return self

    def _scan(self, row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, case_sensitive: bool = True) -> DataScan:
        """Minimal data scan of the table with the current state of the transaction."""
        return DataScan(
            table_metadata=self.table_metadata, io=self._table.io, row_filter=row_filter, case_sensitive=case_sensitive
        )

    def upgrade_table_version(self, format_version: TableVersion) -> Transaction:
        """Set the table to a certain version.

        Args:
            format_version: The newly set version.

        Returns:
            The alter table builder.
        """
        if format_version not in {1, 2}:
            raise ValueError(f"Unsupported table format version: {format_version}")

        if format_version < self.table_metadata.format_version:
            raise ValueError(f"Cannot downgrade v{self.table_metadata.format_version} table to v{format_version}")

        if format_version > self.table_metadata.format_version:
            return self._apply((UpgradeFormatVersionUpdate(format_version=format_version),))

        return self

    def set_properties(self, properties: Properties = EMPTY_DICT, **kwargs: Any) -> Transaction:
        """Set properties.

        When a property is already set, it will be overwritten.

        Args:
            properties: The properties set on the table.
            kwargs: properties can also be pass as kwargs.

        Returns:
            The alter table builder.
        """
        if properties and kwargs:
            raise ValueError("Cannot pass both properties and kwargs")
        updates = properties or kwargs
        return self._apply((SetPropertiesUpdate(updates=updates),))

    def _set_ref_snapshot(
        self,
        snapshot_id: int,
        ref_name: str,
        type: str,
        max_ref_age_ms: Optional[int] = None,
        max_snapshot_age_ms: Optional[int] = None,
        min_snapshots_to_keep: Optional[int] = None,
    ) -> UpdatesAndRequirements:
        """Update a ref to a snapshot.

        Returns:
            The updates and requirements for the set-snapshot-ref staged
        """
        updates = (
            SetSnapshotRefUpdate(
                snapshot_id=snapshot_id,
                ref_name=ref_name,
                type=type,
                max_ref_age_ms=max_ref_age_ms,
                max_snapshot_age_ms=max_snapshot_age_ms,
                min_snapshots_to_keep=min_snapshots_to_keep,
            ),
        )
        requirements = (
            AssertRefSnapshotId(
                snapshot_id=self.table_metadata.refs[ref_name].snapshot_id if ref_name in self.table_metadata.refs else None,
                ref=ref_name,
            ),
        )

        return updates, requirements

    def _build_partition_predicate(self, partition_records: Set[Record]) -> BooleanExpression:
        """Build a filter predicate matching any of the input partition records.

        Args:
            partition_records: A set of partition records to match
        Returns:
            A predicate matching any of the input partition records.
        """
        partition_spec = self.table_metadata.spec()
        schema = self.table_metadata.schema()
        partition_fields = [schema.find_field(field.source_id).name for field in partition_spec.fields]

        expr: BooleanExpression = AlwaysFalse()
        for partition_record in partition_records:
            match_partition_expression: BooleanExpression = AlwaysTrue()

            for pos, partition_field in enumerate(partition_fields):
                predicate = (
                    EqualTo(Reference(partition_field), partition_record[pos])
                    if partition_record[pos] is not None
                    else IsNull(Reference(partition_field))
                )
                match_partition_expression = And(match_partition_expression, predicate)
            expr = Or(expr, match_partition_expression)
        return expr

    def _append_snapshot_producer(self, snapshot_properties: Dict[str, str]) -> _FastAppendFiles:
        """Determine the append type based on table properties.

        Args:
            snapshot_properties: Custom properties to be added to the snapshot summary
        Returns:
            Either a fast-append or a merge-append snapshot producer.
        """
        manifest_merge_enabled = property_as_bool(
            self.table_metadata.properties,
            TableProperties.MANIFEST_MERGE_ENABLED,
            TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT,
        )
        update_snapshot = self.update_snapshot(snapshot_properties=snapshot_properties)
        return update_snapshot.merge_append() if manifest_merge_enabled else update_snapshot.fast_append()

    def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema:
        """Create a new UpdateSchema to alter the columns of this table.

        Args:
            allow_incompatible_changes: If changes are allowed that might break downstream consumers.
            case_sensitive: If field names are case-sensitive.

        Returns:
            A new UpdateSchema.
        """
        return UpdateSchema(
            self,
            allow_incompatible_changes=allow_incompatible_changes,
            case_sensitive=case_sensitive,
            name_mapping=self.table_metadata.name_mapping(),
        )

    def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> UpdateSnapshot:
        """Create a new UpdateSnapshot to produce a new snapshot for the table.

        Returns:
            A new UpdateSnapshot
        """
        return UpdateSnapshot(self, io=self._table.io, snapshot_properties=snapshot_properties)

    def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
        """
        Shorthand API for appending a PyArrow table to a table transaction.

        Args:
            df: The Arrow dataframe that will be appended to overwrite the table
            snapshot_properties: Custom properties to be added to the snapshot summary
        """
        try:
            import pyarrow as pa
        except ModuleNotFoundError as e:
            raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e

        from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _dataframe_to_data_files

        if not isinstance(df, pa.Table):
            raise ValueError(f"Expected PyArrow table, got: {df}")

        if unsupported_partitions := [
            field for field in self.table_metadata.spec().fields if not field.transform.supports_pyarrow_transform
        ]:
            raise ValueError(
                f"Not all partition types are supported for writes. Following partitions cannot be written using pyarrow: {unsupported_partitions}."
            )
        downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
        _check_pyarrow_schema_compatible(
            self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
        )

        with self._append_snapshot_producer(snapshot_properties) as append_files:
            # skip writing data files if the dataframe is empty
            if df.shape[0] > 0:
                data_files = list(
                    _dataframe_to_data_files(
                        table_metadata=self.table_metadata, write_uuid=append_files.commit_uuid, df=df, io=self._table.io
                    )
                )
                for data_file in data_files:
                    append_files.append_data_file(data_file)

    def dynamic_partition_overwrite(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
        """
        Shorthand for overwriting existing partitions with a PyArrow table.

        The function detects partition values in the provided arrow table using the current
        partition spec, and deletes existing partitions matching these values. Finally, the
        data in the table is appended to the table.

        Args:
            df: The Arrow dataframe that will be used to overwrite the table
            snapshot_properties: Custom properties to be added to the snapshot summary
        """
        try:
            import pyarrow as pa
        except ModuleNotFoundError as e:
            raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e

        from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _dataframe_to_data_files

        if not isinstance(df, pa.Table):
            raise ValueError(f"Expected PyArrow table, got: {df}")

        if self.table_metadata.spec().is_unpartitioned():
            raise ValueError("Cannot apply dynamic overwrite on an unpartitioned table.")

        for field in self.table_metadata.spec().fields:
            if not isinstance(field.transform, IdentityTransform):
                raise ValueError(
                    f"For now dynamic overwrite does not support a table with non-identity-transform field in the latest partition spec: {field}"
                )

        downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
        _check_pyarrow_schema_compatible(
            self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
        )

        # If dataframe does not have data, there is no need to overwrite
        if df.shape[0] == 0:
            return

        append_snapshot_commit_uuid = uuid.uuid4()
        data_files: List[DataFile] = list(
            _dataframe_to_data_files(
                table_metadata=self._table.metadata, write_uuid=append_snapshot_commit_uuid, df=df, io=self._table.io
            )
        )

        partitions_to_overwrite = {data_file.partition for data_file in data_files}
        delete_filter = self._build_partition_predicate(partition_records=partitions_to_overwrite)
        self.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties)

        with self._append_snapshot_producer(snapshot_properties) as append_files:
            append_files.commit_uuid = append_snapshot_commit_uuid
            for data_file in data_files:
                append_files.append_data_file(data_file)

    def overwrite(
        self,
        df: pa.Table,
        overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
        snapshot_properties: Dict[str, str] = EMPTY_DICT,
        case_sensitive: bool = True,
    ) -> None:
        """
        Shorthand for adding a table overwrite with a PyArrow table to the transaction.

        An overwrite may produce zero or more snapshots based on the operation:

            - DELETE: In case existing Parquet files can be dropped completely.
            - REPLACE: In case existing Parquet files need to be rewritten.
            - APPEND: In case new data is being inserted into the table.

        Args:
            df: The Arrow dataframe that will be used to overwrite the table
            overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
                              or a boolean expression in case of a partial overwrite
            case_sensitive: A bool determine if the provided `overwrite_filter` is case-sensitive
            snapshot_properties: Custom properties to be added to the snapshot summary
        """
        try:
            import pyarrow as pa
        except ModuleNotFoundError as e:
            raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e

        from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _dataframe_to_data_files

        if not isinstance(df, pa.Table):
            raise ValueError(f"Expected PyArrow table, got: {df}")

        if unsupported_partitions := [
            field for field in self.table_metadata.spec().fields if not field.transform.supports_pyarrow_transform
        ]:
            raise ValueError(
                f"Not all partition types are supported for writes. Following partitions cannot be written using pyarrow: {unsupported_partitions}."
            )
        downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
        _check_pyarrow_schema_compatible(
            self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
        )

        if overwrite_filter != AlwaysFalse():
            # Only delete when the filter is != AlwaysFalse
            self.delete(delete_filter=overwrite_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties)

        with self._append_snapshot_producer(snapshot_properties) as append_files:
            # skip writing data files if the dataframe is empty
            if df.shape[0] > 0:
                data_files = _dataframe_to_data_files(
                    table_metadata=self.table_metadata, write_uuid=append_files.commit_uuid, df=df, io=self._table.io
                )
                for data_file in data_files:
                    append_files.append_data_file(data_file)

    def delete(
        self,
        delete_filter: Union[str, BooleanExpression],
        snapshot_properties: Dict[str, str] = EMPTY_DICT,
        case_sensitive: bool = True,
    ) -> None:
        """
        Shorthand for deleting record from a table.

        A delete may produce zero or more snapshots based on the operation:

            - DELETE: In case existing Parquet files can be dropped completely.
            - REPLACE: In case existing Parquet files need to be rewritten

        Args:
            delete_filter: A boolean expression to delete rows from a table
            snapshot_properties: Custom properties to be added to the snapshot summary
            case_sensitive: A bool determine if the provided `delete_filter` is case-sensitive
        """
        from pyiceberg.io.pyarrow import (
            ArrowScan,
            _dataframe_to_data_files,
            _expression_to_complementary_pyarrow,
        )

        if (
            self.table_metadata.properties.get(TableProperties.DELETE_MODE, TableProperties.DELETE_MODE_DEFAULT)
            == TableProperties.DELETE_MODE_MERGE_ON_READ
        ):
            warnings.warn("Merge on read is not yet supported, falling back to copy-on-write")

        if isinstance(delete_filter, str):
            delete_filter = _parse_row_filter(delete_filter)

        with self.update_snapshot(snapshot_properties=snapshot_properties).delete() as delete_snapshot:
            delete_snapshot.delete_by_predicate(delete_filter, case_sensitive)

        # Check if there are any files that require an actual rewrite of a data file
        if delete_snapshot.rewrites_needed is True:
            bound_delete_filter = bind(self.table_metadata.schema(), delete_filter, case_sensitive)
            preserve_row_filter = _expression_to_complementary_pyarrow(bound_delete_filter)

            files = self._scan(row_filter=delete_filter, case_sensitive=case_sensitive).plan_files()

            commit_uuid = uuid.uuid4()
            counter = itertools.count(0)

            replaced_files: List[Tuple[DataFile, List[DataFile]]] = []
            # This will load the Parquet file into memory, including:
            #   - Filter out the rows based on the delete filter
            #   - Projecting it to the current schema
            #   - Applying the positional deletes if they are there
            # When writing
            #   - Apply the latest partition-spec
            #   - And sort order when added
            for original_file in files:
                df = ArrowScan(
                    table_metadata=self.table_metadata,
                    io=self._table.io,
                    projected_schema=self.table_metadata.schema(),
                    row_filter=AlwaysTrue(),
                ).to_table(tasks=[original_file])
                filtered_df = df.filter(preserve_row_filter)

                # Only rewrite if there are records being deleted
                if len(filtered_df) == 0:
                    replaced_files.append((original_file.file, []))
                elif len(df) != len(filtered_df):
                    replaced_files.append(
                        (
                            original_file.file,
                            list(
                                _dataframe_to_data_files(
                                    io=self._table.io,
                                    df=filtered_df,
                                    table_metadata=self.table_metadata,
                                    write_uuid=commit_uuid,
                                    counter=counter,
                                )
                            ),
                        )
                    )

            if len(replaced_files) > 0:
                with self.update_snapshot(snapshot_properties=snapshot_properties).overwrite() as overwrite_snapshot:
                    overwrite_snapshot.commit_uuid = commit_uuid
                    for original_data_file, replaced_data_files in replaced_files:
                        overwrite_snapshot.delete_data_file(original_data_file)
                        for replaced_data_file in replaced_data_files:
                            overwrite_snapshot.append_data_file(replaced_data_file)

        if not delete_snapshot.files_affected and not delete_snapshot.rewrites_needed:
            warnings.warn("Delete operation did not match any records")

    def add_files(
        self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True
    ) -> None:
        """
        Shorthand API for adding files as data files to the table transaction.

        Args:
            file_paths: The list of full file paths to be added as data files to the table

        Raises:
            FileNotFoundError: If the file does not exist.
            ValueError: Raises a ValueError given file_paths contains duplicate files
            ValueError: Raises a ValueError given file_paths already referenced by table
        """
        if len(file_paths) != len(set(file_paths)):
            raise ValueError("File paths must be unique")

        if check_duplicate_files:
            import pyarrow.compute as pc

            expr = pc.field("file_path").isin(file_paths)
            referenced_files = [file["file_path"] for file in self._table.inspect.files().filter(expr).to_pylist()]

            if referenced_files:
                raise ValueError(f"Cannot add files that are already referenced by table, files: {', '.join(referenced_files)}")

        if self.table_metadata.name_mapping() is None:
            self.set_properties(
                **{TableProperties.DEFAULT_NAME_MAPPING: self.table_metadata.schema().name_mapping.model_dump_json()}
            )
        with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot:
            data_files = _parquet_files_to_data_files(
                table_metadata=self.table_metadata, file_paths=file_paths, io=self._table.io
            )
            for data_file in data_files:
                update_snapshot.append_data_file(data_file)

    def update_spec(self) -> UpdateSpec:
        """Create a new UpdateSpec to update the partitioning of the table.

        Returns:
            A new UpdateSpec.
        """
        return UpdateSpec(self)

    def remove_properties(self, *removals: str) -> Transaction:
        """Remove properties.

        Args:
            removals: Properties to be removed.

        Returns:
            The alter table builder.
        """
        return self._apply((RemovePropertiesUpdate(removals=removals),))

    def update_location(self, location: str) -> Transaction:
        """Set the new table location.

        Args:
            location: The new location of the table.

        Returns:
            The alter table builder.
        """
        raise NotImplementedError("Not yet implemented")

    def commit_transaction(self) -> Table:
        """Commit the changes to the catalog.

        Returns:
            The table with the updates applied.
        """
        if len(self._updates) > 0:
            self._requirements += (AssertTableUUID(uuid=self.table_metadata.table_uuid),)
            self._table._do_commit(  # pylint: disable=W0212
                updates=self._updates,
                requirements=self._requirements,
            )
            return self._table
        else:
            return self._table

__enter__()

Start a transaction to update the table.

Source code in pyiceberg/table/__init__.py
def __enter__(self) -> Transaction:
    """Start a transaction to update the table."""
    return self

__exit__(exctype, excinst, exctb)

Close and commit the transaction if no exceptions have been raised.

Source code in pyiceberg/table/__init__.py
def __exit__(
    self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType]
) -> None:
    """Close and commit the transaction if no exceptions have been raised."""
    if exctype is None and excinst is None and exctb is None:
        self.commit_transaction()

__init__(table, autocommit=False)

Open a transaction to stage and commit changes to a table.

Parameters:

Name Type Description Default
table Table

The table that will be altered.

required
autocommit bool

Option to automatically commit the changes when they are staged.

False
Source code in pyiceberg/table/__init__.py
def __init__(self, table: Table, autocommit: bool = False):
    """Open a transaction to stage and commit changes to a table.

    Args:
        table: The table that will be altered.
        autocommit: Option to automatically commit the changes when they are staged.
    """
    self.table_metadata = table.metadata
    self._table = table
    self._autocommit = autocommit
    self._updates = ()
    self._requirements = ()

_append_snapshot_producer(snapshot_properties)

Determine the append type based on table properties.

Parameters:

Name Type Description Default
snapshot_properties Dict[str, str]

Custom properties to be added to the snapshot summary

required

Returns: Either a fast-append or a merge-append snapshot producer.

Source code in pyiceberg/table/__init__.py
def _append_snapshot_producer(self, snapshot_properties: Dict[str, str]) -> _FastAppendFiles:
    """Determine the append type based on table properties.

    Args:
        snapshot_properties: Custom properties to be added to the snapshot summary
    Returns:
        Either a fast-append or a merge-append snapshot producer.
    """
    manifest_merge_enabled = property_as_bool(
        self.table_metadata.properties,
        TableProperties.MANIFEST_MERGE_ENABLED,
        TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT,
    )
    update_snapshot = self.update_snapshot(snapshot_properties=snapshot_properties)
    return update_snapshot.merge_append() if manifest_merge_enabled else update_snapshot.fast_append()

_apply(updates, requirements=())

Check if the requirements are met, and applies the updates to the metadata.

Source code in pyiceberg/table/__init__.py
def _apply(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...] = ()) -> Transaction:
    """Check if the requirements are met, and applies the updates to the metadata."""
    for requirement in requirements:
        requirement.validate(self.table_metadata)

    self._updates += updates

    # For the requirements, it does not make sense to add a requirement more than once
    # For example, you cannot assert that the current schema has two different IDs
    existing_requirements = {type(requirement) for requirement in self._requirements}
    for new_requirement in requirements:
        if type(new_requirement) not in existing_requirements:
            self._requirements = self._requirements + (new_requirement,)

    self.table_metadata = update_table_metadata(self.table_metadata, updates)

    if self._autocommit:
        self.commit_transaction()
        self._updates = ()
        self._requirements = ()

    return self

_build_partition_predicate(partition_records)

Build a filter predicate matching any of the input partition records.

Parameters:

Name Type Description Default
partition_records Set[Record]

A set of partition records to match

required

Returns: A predicate matching any of the input partition records.

Source code in pyiceberg/table/__init__.py
def _build_partition_predicate(self, partition_records: Set[Record]) -> BooleanExpression:
    """Build a filter predicate matching any of the input partition records.

    Args:
        partition_records: A set of partition records to match
    Returns:
        A predicate matching any of the input partition records.
    """
    partition_spec = self.table_metadata.spec()
    schema = self.table_metadata.schema()
    partition_fields = [schema.find_field(field.source_id).name for field in partition_spec.fields]

    expr: BooleanExpression = AlwaysFalse()
    for partition_record in partition_records:
        match_partition_expression: BooleanExpression = AlwaysTrue()

        for pos, partition_field in enumerate(partition_fields):
            predicate = (
                EqualTo(Reference(partition_field), partition_record[pos])
                if partition_record[pos] is not None
                else IsNull(Reference(partition_field))
            )
            match_partition_expression = And(match_partition_expression, predicate)
        expr = Or(expr, match_partition_expression)
    return expr

_scan(row_filter=ALWAYS_TRUE, case_sensitive=True)

Minimal data scan of the table with the current state of the transaction.

Source code in pyiceberg/table/__init__.py
def _scan(self, row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, case_sensitive: bool = True) -> DataScan:
    """Minimal data scan of the table with the current state of the transaction."""
    return DataScan(
        table_metadata=self.table_metadata, io=self._table.io, row_filter=row_filter, case_sensitive=case_sensitive
    )

_set_ref_snapshot(snapshot_id, ref_name, type, max_ref_age_ms=None, max_snapshot_age_ms=None, min_snapshots_to_keep=None)

Update a ref to a snapshot.

Returns:

Type Description
UpdatesAndRequirements

The updates and requirements for the set-snapshot-ref staged

Source code in pyiceberg/table/__init__.py
def _set_ref_snapshot(
    self,
    snapshot_id: int,
    ref_name: str,
    type: str,
    max_ref_age_ms: Optional[int] = None,
    max_snapshot_age_ms: Optional[int] = None,
    min_snapshots_to_keep: Optional[int] = None,
) -> UpdatesAndRequirements:
    """Update a ref to a snapshot.

    Returns:
        The updates and requirements for the set-snapshot-ref staged
    """
    updates = (
        SetSnapshotRefUpdate(
            snapshot_id=snapshot_id,
            ref_name=ref_name,
            type=type,
            max_ref_age_ms=max_ref_age_ms,
            max_snapshot_age_ms=max_snapshot_age_ms,
            min_snapshots_to_keep=min_snapshots_to_keep,
        ),
    )
    requirements = (
        AssertRefSnapshotId(
            snapshot_id=self.table_metadata.refs[ref_name].snapshot_id if ref_name in self.table_metadata.refs else None,
            ref=ref_name,
        ),
    )

    return updates, requirements

add_files(file_paths, snapshot_properties=EMPTY_DICT, check_duplicate_files=True)

Shorthand API for adding files as data files to the table transaction.

Parameters:

Name Type Description Default
file_paths List[str]

The list of full file paths to be added as data files to the table

required

Raises:

Type Description
FileNotFoundError

If the file does not exist.

ValueError

Raises a ValueError given file_paths contains duplicate files

ValueError

Raises a ValueError given file_paths already referenced by table

Source code in pyiceberg/table/__init__.py
def add_files(
    self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True
) -> None:
    """
    Shorthand API for adding files as data files to the table transaction.

    Args:
        file_paths: The list of full file paths to be added as data files to the table

    Raises:
        FileNotFoundError: If the file does not exist.
        ValueError: Raises a ValueError given file_paths contains duplicate files
        ValueError: Raises a ValueError given file_paths already referenced by table
    """
    if len(file_paths) != len(set(file_paths)):
        raise ValueError("File paths must be unique")

    if check_duplicate_files:
        import pyarrow.compute as pc

        expr = pc.field("file_path").isin(file_paths)
        referenced_files = [file["file_path"] for file in self._table.inspect.files().filter(expr).to_pylist()]

        if referenced_files:
            raise ValueError(f"Cannot add files that are already referenced by table, files: {', '.join(referenced_files)}")

    if self.table_metadata.name_mapping() is None:
        self.set_properties(
            **{TableProperties.DEFAULT_NAME_MAPPING: self.table_metadata.schema().name_mapping.model_dump_json()}
        )
    with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot:
        data_files = _parquet_files_to_data_files(
            table_metadata=self.table_metadata, file_paths=file_paths, io=self._table.io
        )
        for data_file in data_files:
            update_snapshot.append_data_file(data_file)

append(df, snapshot_properties=EMPTY_DICT)

Shorthand API for appending a PyArrow table to a table transaction.

Parameters:

Name Type Description Default
df Table

The Arrow dataframe that will be appended to overwrite the table

required
snapshot_properties Dict[str, str]

Custom properties to be added to the snapshot summary

EMPTY_DICT
Source code in pyiceberg/table/__init__.py
def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
    """
    Shorthand API for appending a PyArrow table to a table transaction.

    Args:
        df: The Arrow dataframe that will be appended to overwrite the table
        snapshot_properties: Custom properties to be added to the snapshot summary
    """
    try:
        import pyarrow as pa
    except ModuleNotFoundError as e:
        raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e

    from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _dataframe_to_data_files

    if not isinstance(df, pa.Table):
        raise ValueError(f"Expected PyArrow table, got: {df}")

    if unsupported_partitions := [
        field for field in self.table_metadata.spec().fields if not field.transform.supports_pyarrow_transform
    ]:
        raise ValueError(
            f"Not all partition types are supported for writes. Following partitions cannot be written using pyarrow: {unsupported_partitions}."
        )
    downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
    _check_pyarrow_schema_compatible(
        self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
    )

    with self._append_snapshot_producer(snapshot_properties) as append_files:
        # skip writing data files if the dataframe is empty
        if df.shape[0] > 0:
            data_files = list(
                _dataframe_to_data_files(
                    table_metadata=self.table_metadata, write_uuid=append_files.commit_uuid, df=df, io=self._table.io
                )
            )
            for data_file in data_files:
                append_files.append_data_file(data_file)

commit_transaction()

Commit the changes to the catalog.

Returns:

Type Description
Table

The table with the updates applied.

Source code in pyiceberg/table/__init__.py
def commit_transaction(self) -> Table:
    """Commit the changes to the catalog.

    Returns:
        The table with the updates applied.
    """
    if len(self._updates) > 0:
        self._requirements += (AssertTableUUID(uuid=self.table_metadata.table_uuid),)
        self._table._do_commit(  # pylint: disable=W0212
            updates=self._updates,
            requirements=self._requirements,
        )
        return self._table
    else:
        return self._table

delete(delete_filter, snapshot_properties=EMPTY_DICT, case_sensitive=True)

Shorthand for deleting record from a table.

A delete may produce zero or more snapshots based on the operation:

- DELETE: In case existing Parquet files can be dropped completely.
- REPLACE: In case existing Parquet files need to be rewritten

Parameters:

Name Type Description Default
delete_filter Union[str, BooleanExpression]

A boolean expression to delete rows from a table

required
snapshot_properties Dict[str, str]

Custom properties to be added to the snapshot summary

EMPTY_DICT
case_sensitive bool

A bool determine if the provided delete_filter is case-sensitive

True
Source code in pyiceberg/table/__init__.py
def delete(
    self,
    delete_filter: Union[str, BooleanExpression],
    snapshot_properties: Dict[str, str] = EMPTY_DICT,
    case_sensitive: bool = True,
) -> None:
    """
    Shorthand for deleting record from a table.

    A delete may produce zero or more snapshots based on the operation:

        - DELETE: In case existing Parquet files can be dropped completely.
        - REPLACE: In case existing Parquet files need to be rewritten

    Args:
        delete_filter: A boolean expression to delete rows from a table
        snapshot_properties: Custom properties to be added to the snapshot summary
        case_sensitive: A bool determine if the provided `delete_filter` is case-sensitive
    """
    from pyiceberg.io.pyarrow import (
        ArrowScan,
        _dataframe_to_data_files,
        _expression_to_complementary_pyarrow,
    )

    if (
        self.table_metadata.properties.get(TableProperties.DELETE_MODE, TableProperties.DELETE_MODE_DEFAULT)
        == TableProperties.DELETE_MODE_MERGE_ON_READ
    ):
        warnings.warn("Merge on read is not yet supported, falling back to copy-on-write")

    if isinstance(delete_filter, str):
        delete_filter = _parse_row_filter(delete_filter)

    with self.update_snapshot(snapshot_properties=snapshot_properties).delete() as delete_snapshot:
        delete_snapshot.delete_by_predicate(delete_filter, case_sensitive)

    # Check if there are any files that require an actual rewrite of a data file
    if delete_snapshot.rewrites_needed is True:
        bound_delete_filter = bind(self.table_metadata.schema(), delete_filter, case_sensitive)
        preserve_row_filter = _expression_to_complementary_pyarrow(bound_delete_filter)

        files = self._scan(row_filter=delete_filter, case_sensitive=case_sensitive).plan_files()

        commit_uuid = uuid.uuid4()
        counter = itertools.count(0)

        replaced_files: List[Tuple[DataFile, List[DataFile]]] = []
        # This will load the Parquet file into memory, including:
        #   - Filter out the rows based on the delete filter
        #   - Projecting it to the current schema
        #   - Applying the positional deletes if they are there
        # When writing
        #   - Apply the latest partition-spec
        #   - And sort order when added
        for original_file in files:
            df = ArrowScan(
                table_metadata=self.table_metadata,
                io=self._table.io,
                projected_schema=self.table_metadata.schema(),
                row_filter=AlwaysTrue(),
            ).to_table(tasks=[original_file])
            filtered_df = df.filter(preserve_row_filter)

            # Only rewrite if there are records being deleted
            if len(filtered_df) == 0:
                replaced_files.append((original_file.file, []))
            elif len(df) != len(filtered_df):
                replaced_files.append(
                    (
                        original_file.file,
                        list(
                            _dataframe_to_data_files(
                                io=self._table.io,
                                df=filtered_df,
                                table_metadata=self.table_metadata,
                                write_uuid=commit_uuid,
                                counter=counter,
                            )
                        ),
                    )
                )

        if len(replaced_files) > 0:
            with self.update_snapshot(snapshot_properties=snapshot_properties).overwrite() as overwrite_snapshot:
                overwrite_snapshot.commit_uuid = commit_uuid
                for original_data_file, replaced_data_files in replaced_files:
                    overwrite_snapshot.delete_data_file(original_data_file)
                    for replaced_data_file in replaced_data_files:
                        overwrite_snapshot.append_data_file(replaced_data_file)

    if not delete_snapshot.files_affected and not delete_snapshot.rewrites_needed:
        warnings.warn("Delete operation did not match any records")

dynamic_partition_overwrite(df, snapshot_properties=EMPTY_DICT)

Shorthand for overwriting existing partitions with a PyArrow table.

The function detects partition values in the provided arrow table using the current partition spec, and deletes existing partitions matching these values. Finally, the data in the table is appended to the table.

Parameters:

Name Type Description Default
df Table

The Arrow dataframe that will be used to overwrite the table

required
snapshot_properties Dict[str, str]

Custom properties to be added to the snapshot summary

EMPTY_DICT
Source code in pyiceberg/table/__init__.py
def dynamic_partition_overwrite(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
    """
    Shorthand for overwriting existing partitions with a PyArrow table.

    The function detects partition values in the provided arrow table using the current
    partition spec, and deletes existing partitions matching these values. Finally, the
    data in the table is appended to the table.

    Args:
        df: The Arrow dataframe that will be used to overwrite the table
        snapshot_properties: Custom properties to be added to the snapshot summary
    """
    try:
        import pyarrow as pa
    except ModuleNotFoundError as e:
        raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e

    from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _dataframe_to_data_files

    if not isinstance(df, pa.Table):
        raise ValueError(f"Expected PyArrow table, got: {df}")

    if self.table_metadata.spec().is_unpartitioned():
        raise ValueError("Cannot apply dynamic overwrite on an unpartitioned table.")

    for field in self.table_metadata.spec().fields:
        if not isinstance(field.transform, IdentityTransform):
            raise ValueError(
                f"For now dynamic overwrite does not support a table with non-identity-transform field in the latest partition spec: {field}"
            )

    downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
    _check_pyarrow_schema_compatible(
        self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
    )

    # If dataframe does not have data, there is no need to overwrite
    if df.shape[0] == 0:
        return

    append_snapshot_commit_uuid = uuid.uuid4()
    data_files: List[DataFile] = list(
        _dataframe_to_data_files(
            table_metadata=self._table.metadata, write_uuid=append_snapshot_commit_uuid, df=df, io=self._table.io
        )
    )

    partitions_to_overwrite = {data_file.partition for data_file in data_files}
    delete_filter = self._build_partition_predicate(partition_records=partitions_to_overwrite)
    self.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties)

    with self._append_snapshot_producer(snapshot_properties) as append_files:
        append_files.commit_uuid = append_snapshot_commit_uuid
        for data_file in data_files:
            append_files.append_data_file(data_file)

overwrite(df, overwrite_filter=ALWAYS_TRUE, snapshot_properties=EMPTY_DICT, case_sensitive=True)

Shorthand for adding a table overwrite with a PyArrow table to the transaction.

An overwrite may produce zero or more snapshots based on the operation:

- DELETE: In case existing Parquet files can be dropped completely.
- REPLACE: In case existing Parquet files need to be rewritten.
- APPEND: In case new data is being inserted into the table.

Parameters:

Name Type Description Default
df Table

The Arrow dataframe that will be used to overwrite the table

required
overwrite_filter Union[BooleanExpression, str]

ALWAYS_TRUE when you overwrite all the data, or a boolean expression in case of a partial overwrite

ALWAYS_TRUE
case_sensitive bool

A bool determine if the provided overwrite_filter is case-sensitive

True
snapshot_properties Dict[str, str]

Custom properties to be added to the snapshot summary

EMPTY_DICT
Source code in pyiceberg/table/__init__.py
def overwrite(
    self,
    df: pa.Table,
    overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
    snapshot_properties: Dict[str, str] = EMPTY_DICT,
    case_sensitive: bool = True,
) -> None:
    """
    Shorthand for adding a table overwrite with a PyArrow table to the transaction.

    An overwrite may produce zero or more snapshots based on the operation:

        - DELETE: In case existing Parquet files can be dropped completely.
        - REPLACE: In case existing Parquet files need to be rewritten.
        - APPEND: In case new data is being inserted into the table.

    Args:
        df: The Arrow dataframe that will be used to overwrite the table
        overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
                          or a boolean expression in case of a partial overwrite
        case_sensitive: A bool determine if the provided `overwrite_filter` is case-sensitive
        snapshot_properties: Custom properties to be added to the snapshot summary
    """
    try:
        import pyarrow as pa
    except ModuleNotFoundError as e:
        raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e

    from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _dataframe_to_data_files

    if not isinstance(df, pa.Table):
        raise ValueError(f"Expected PyArrow table, got: {df}")

    if unsupported_partitions := [
        field for field in self.table_metadata.spec().fields if not field.transform.supports_pyarrow_transform
    ]:
        raise ValueError(
            f"Not all partition types are supported for writes. Following partitions cannot be written using pyarrow: {unsupported_partitions}."
        )
    downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
    _check_pyarrow_schema_compatible(
        self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
    )

    if overwrite_filter != AlwaysFalse():
        # Only delete when the filter is != AlwaysFalse
        self.delete(delete_filter=overwrite_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties)

    with self._append_snapshot_producer(snapshot_properties) as append_files:
        # skip writing data files if the dataframe is empty
        if df.shape[0] > 0:
            data_files = _dataframe_to_data_files(
                table_metadata=self.table_metadata, write_uuid=append_files.commit_uuid, df=df, io=self._table.io
            )
            for data_file in data_files:
                append_files.append_data_file(data_file)

remove_properties(*removals)

Remove properties.

Parameters:

Name Type Description Default
removals str

Properties to be removed.

()

Returns:

Type Description
Transaction

The alter table builder.

Source code in pyiceberg/table/__init__.py
def remove_properties(self, *removals: str) -> Transaction:
    """Remove properties.

    Args:
        removals: Properties to be removed.

    Returns:
        The alter table builder.
    """
    return self._apply((RemovePropertiesUpdate(removals=removals),))

set_properties(properties=EMPTY_DICT, **kwargs)

Set properties.

When a property is already set, it will be overwritten.

Parameters:

Name Type Description Default
properties Properties

The properties set on the table.

EMPTY_DICT
kwargs Any

properties can also be pass as kwargs.

{}

Returns:

Type Description
Transaction

The alter table builder.

Source code in pyiceberg/table/__init__.py
def set_properties(self, properties: Properties = EMPTY_DICT, **kwargs: Any) -> Transaction:
    """Set properties.

    When a property is already set, it will be overwritten.

    Args:
        properties: The properties set on the table.
        kwargs: properties can also be pass as kwargs.

    Returns:
        The alter table builder.
    """
    if properties and kwargs:
        raise ValueError("Cannot pass both properties and kwargs")
    updates = properties or kwargs
    return self._apply((SetPropertiesUpdate(updates=updates),))

update_location(location)

Set the new table location.

Parameters:

Name Type Description Default
location str

The new location of the table.

required

Returns:

Type Description
Transaction

The alter table builder.

Source code in pyiceberg/table/__init__.py
def update_location(self, location: str) -> Transaction:
    """Set the new table location.

    Args:
        location: The new location of the table.

    Returns:
        The alter table builder.
    """
    raise NotImplementedError("Not yet implemented")

update_schema(allow_incompatible_changes=False, case_sensitive=True)

Create a new UpdateSchema to alter the columns of this table.

Parameters:

Name Type Description Default
allow_incompatible_changes bool

If changes are allowed that might break downstream consumers.

False
case_sensitive bool

If field names are case-sensitive.

True

Returns:

Type Description
UpdateSchema

A new UpdateSchema.

Source code in pyiceberg/table/__init__.py
def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema:
    """Create a new UpdateSchema to alter the columns of this table.

    Args:
        allow_incompatible_changes: If changes are allowed that might break downstream consumers.
        case_sensitive: If field names are case-sensitive.

    Returns:
        A new UpdateSchema.
    """
    return UpdateSchema(
        self,
        allow_incompatible_changes=allow_incompatible_changes,
        case_sensitive=case_sensitive,
        name_mapping=self.table_metadata.name_mapping(),
    )

update_snapshot(snapshot_properties=EMPTY_DICT)

Create a new UpdateSnapshot to produce a new snapshot for the table.

Returns:

Type Description
UpdateSnapshot

A new UpdateSnapshot

Source code in pyiceberg/table/__init__.py
def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> UpdateSnapshot:
    """Create a new UpdateSnapshot to produce a new snapshot for the table.

    Returns:
        A new UpdateSnapshot
    """
    return UpdateSnapshot(self, io=self._table.io, snapshot_properties=snapshot_properties)

update_spec()

Create a new UpdateSpec to update the partitioning of the table.

Returns:

Type Description
UpdateSpec

A new UpdateSpec.

Source code in pyiceberg/table/__init__.py
def update_spec(self) -> UpdateSpec:
    """Create a new UpdateSpec to update the partitioning of the table.

    Returns:
        A new UpdateSpec.
    """
    return UpdateSpec(self)

upgrade_table_version(format_version)

Set the table to a certain version.

Parameters:

Name Type Description Default
format_version TableVersion

The newly set version.

required

Returns:

Type Description
Transaction

The alter table builder.

Source code in pyiceberg/table/__init__.py
def upgrade_table_version(self, format_version: TableVersion) -> Transaction:
    """Set the table to a certain version.

    Args:
        format_version: The newly set version.

    Returns:
        The alter table builder.
    """
    if format_version not in {1, 2}:
        raise ValueError(f"Unsupported table format version: {format_version}")

    if format_version < self.table_metadata.format_version:
        raise ValueError(f"Cannot downgrade v{self.table_metadata.format_version} table to v{format_version}")

    if format_version > self.table_metadata.format_version:
        return self._apply((UpgradeFormatVersionUpdate(format_version=format_version),))

    return self

UpsertResult dataclass

Summary the upsert operation.

Source code in pyiceberg/table/__init__.py
@dataclass()
class UpsertResult:
    """Summary the upsert operation."""

    rows_updated: int = 0
    rows_inserted: int = 0

WriteTask dataclass

Task with the parameters for writing a DataFile.

Source code in pyiceberg/table/__init__.py
@dataclass(frozen=True)
class WriteTask:
    """Task with the parameters for writing a DataFile."""

    write_uuid: uuid.UUID
    task_id: int
    schema: Schema
    record_batches: List[pa.RecordBatch]
    sort_order_id: Optional[int] = None
    partition_key: Optional[PartitionKey] = None

    def generate_data_file_filename(self, extension: str) -> str:
        # Mimics the behavior in the Java API:
        # https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101
        return f"00000-{self.task_id}-{self.write_uuid}.{extension}"

_match_deletes_to_data_file(data_entry, positional_delete_entries)

Check if the delete file is relevant for the data file.

Using the column metrics to see if the filename is in the lower and upper bound.

Parameters:

Name Type Description Default
data_entry ManifestEntry

The manifest entry path of the datafile.

required
positional_delete_entries List[ManifestEntry]

All the candidate positional deletes manifest entries.

required

Returns:

Type Description
Set[DataFile]

A set of files that are relevant for the data file.

Source code in pyiceberg/table/__init__.py
def _match_deletes_to_data_file(data_entry: ManifestEntry, positional_delete_entries: SortedList[ManifestEntry]) -> Set[DataFile]:
    """Check if the delete file is relevant for the data file.

    Using the column metrics to see if the filename is in the lower and upper bound.

    Args:
        data_entry (ManifestEntry): The manifest entry path of the datafile.
        positional_delete_entries (List[ManifestEntry]): All the candidate positional deletes manifest entries.

    Returns:
        A set of files that are relevant for the data file.
    """
    relevant_entries = positional_delete_entries[positional_delete_entries.bisect_right(data_entry) :]

    if len(relevant_entries) > 0:
        evaluator = _InclusiveMetricsEvaluator(POSITIONAL_DELETE_SCHEMA, EqualTo("file_path", data_entry.data_file.file_path))
        return {
            positional_delete_entry.data_file
            for positional_delete_entry in relevant_entries
            if evaluator.eval(positional_delete_entry.data_file)
        }
    else:
        return set()

_open_manifest(io, manifest, partition_filter, residual_evaluator, metrics_evaluator)

Open a manifest file and return matching manifest entries.

Returns:

Type Description
List[ManifestEntry]

A list of ManifestEntry that matches the provided filters.

Source code in pyiceberg/table/__init__.py
def _open_manifest(
    io: FileIO,
    manifest: ManifestFile,
    partition_filter: Callable[[DataFile], bool],
    residual_evaluator: Callable[[Record], BooleanExpression],
    metrics_evaluator: Callable[[DataFile], bool],
) -> List[ManifestEntry]:
    """Open a manifest file and return matching manifest entries.

    Returns:
        A list of ManifestEntry that matches the provided filters.
    """
    return [
        manifest_entry
        for manifest_entry in manifest.fetch_manifest_entry(io, discard_deleted=True)
        if partition_filter(manifest_entry.data_file) and metrics_evaluator(manifest_entry.data_file)
    ]

_parquet_files_to_data_files(table_metadata, file_paths, io)

Convert a list files into DataFiles.

Returns:

Type Description
Iterable[DataFile]

An iterable that supplies DataFiles that describe the parquet files.

Source code in pyiceberg/table/__init__.py
def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List[str], io: FileIO) -> Iterable[DataFile]:
    """Convert a list files into DataFiles.

    Returns:
        An iterable that supplies DataFiles that describe the parquet files.
    """
    from pyiceberg.io.pyarrow import parquet_files_to_data_files

    yield from parquet_files_to_data_files(io=io, table_metadata=table_metadata, file_paths=iter(file_paths))

_parse_row_filter(expr)

Accept an expression in the form of a BooleanExpression or a string.

In the case of a string, it will be converted into a unbound BooleanExpression.

Parameters:

Name Type Description Default
expr Union[str, BooleanExpression]

Expression as a BooleanExpression or a string.

required

Returns: An unbound BooleanExpression.

Source code in pyiceberg/table/__init__.py
def _parse_row_filter(expr: Union[str, BooleanExpression]) -> BooleanExpression:
    """Accept an expression in the form of a BooleanExpression or a string.

    In the case of a string, it will be converted into a unbound BooleanExpression.

    Args:
        expr: Expression as a BooleanExpression or a string.

    Returns: An unbound BooleanExpression.
    """
    return parser.parse(expr) if isinstance(expr, str) else expr