Skip to content

table

CommitTableRequest

Bases: IcebergBaseModel

A pydantic BaseModel for a table commit request.

Source code in pyiceberg/table/__init__.py
class CommitTableRequest(IcebergBaseModel):
    """A pydantic BaseModel for a table commit request."""

    identifier: TableIdentifier = Field()
    requirements: Tuple[TableRequirement, ...] = Field(default_factory=tuple)
    updates: Tuple[TableUpdate, ...] = Field(default_factory=tuple)

CommitTableResponse

Bases: IcebergBaseModel

A pydantic BaseModel for a table commit response.

Source code in pyiceberg/table/__init__.py
class CommitTableResponse(IcebergBaseModel):
    """A pydantic BaseModel for a table commit response."""

    metadata: TableMetadata
    metadata_location: str = Field(alias="metadata-location")

CreateTableTransaction

Bases: Transaction

A transaction that involves the creation of a new table.

Source code in pyiceberg/table/__init__.py
class CreateTableTransaction(Transaction):
    """A transaction that involves the creation of a new table."""

    def _initial_changes(self, table_metadata: TableMetadata) -> None:
        """Set the initial changes that can reconstruct the initial table metadata when creating the CreateTableTransaction."""
        self._updates += (
            AssignUUIDUpdate(uuid=table_metadata.table_uuid),
            UpgradeFormatVersionUpdate(format_version=table_metadata.format_version),
        )

        schema: Schema = table_metadata.schema()
        self._updates += (
            AddSchemaUpdate(schema_=schema),
            SetCurrentSchemaUpdate(schema_id=-1),
        )

        spec: PartitionSpec = table_metadata.spec()
        if spec.is_unpartitioned():
            self._updates += (AddPartitionSpecUpdate(spec=UNPARTITIONED_PARTITION_SPEC),)
        else:
            self._updates += (AddPartitionSpecUpdate(spec=spec),)
        self._updates += (SetDefaultSpecUpdate(spec_id=-1),)

        sort_order: Optional[SortOrder] = table_metadata.sort_order_by_id(table_metadata.default_sort_order_id)
        if sort_order is None or sort_order.is_unsorted:
            self._updates += (AddSortOrderUpdate(sort_order=UNSORTED_SORT_ORDER),)
        else:
            self._updates += (AddSortOrderUpdate(sort_order=sort_order),)
        self._updates += (SetDefaultSortOrderUpdate(sort_order_id=-1),)

        self._updates += (
            SetLocationUpdate(location=table_metadata.location),
            SetPropertiesUpdate(updates=table_metadata.properties),
        )

    def __init__(self, table: StagedTable):
        super().__init__(table, autocommit=False)
        self._initial_changes(table.metadata)

    def commit_transaction(self) -> Table:
        """Commit the changes to the catalog.

        In the case of a CreateTableTransaction, the only requirement is AssertCreate.
        Returns:
            The table with the updates applied.
        """
        if len(self._updates) > 0:
            self._table._do_commit(  # pylint: disable=W0212
                updates=self._updates,
                requirements=(AssertCreate(),),
            )

        self._updates = ()
        self._requirements = ()

        return self._table

commit_transaction()

Commit the changes to the catalog.

In the case of a CreateTableTransaction, the only requirement is AssertCreate. Returns: The table with the updates applied.

Source code in pyiceberg/table/__init__.py
def commit_transaction(self) -> Table:
    """Commit the changes to the catalog.

    In the case of a CreateTableTransaction, the only requirement is AssertCreate.
    Returns:
        The table with the updates applied.
    """
    if len(self._updates) > 0:
        self._table._do_commit(  # pylint: disable=W0212
            updates=self._updates,
            requirements=(AssertCreate(),),
        )

    self._updates = ()
    self._requirements = ()

    return self._table

DataScan

Bases: TableScan

Source code in pyiceberg/table/__init__.py
class DataScan(TableScan):
    def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
        project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id], self.case_sensitive)
        return project(self.row_filter)

    @cached_property
    def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
        return KeyDefaultDict(self._build_partition_projection)

    def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]:
        spec = self.table_metadata.specs()[spec_id]
        return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive)

    def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]:
        spec = self.table_metadata.specs()[spec_id]
        partition_type = spec.partition_type(self.table_metadata.schema())
        partition_schema = Schema(*partition_type.fields)
        partition_expr = self.partition_filters[spec_id]

        # The lambda created here is run in multiple threads.
        # So we avoid creating _EvaluatorExpression methods bound to a single
        # shared instance across multiple threads.
        return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition)

    def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]:
        schema = self.table_metadata.schema()
        include_empty_files = strtobool(self.options.get("include_empty_files", "false"))

        # The lambda created here is run in multiple threads.
        # So we avoid creating _InclusiveMetricsEvaluator methods bound to a single
        # shared instance across multiple threads.
        return lambda data_file: _InclusiveMetricsEvaluator(
            schema,
            self.row_filter,
            self.case_sensitive,
            include_empty_files,
        ).eval(data_file)

    def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], ResidualEvaluator]:
        spec = self.table_metadata.specs()[spec_id]

        from pyiceberg.expressions.visitors import residual_evaluator_of

        # The lambda created here is run in multiple threads.
        # So we avoid creating _EvaluatorExpression methods bound to a single
        # shared instance across multiple threads.
        return lambda datafile: (
            residual_evaluator_of(
                spec=spec,
                expr=self.row_filter,
                case_sensitive=self.case_sensitive,
                schema=self.table_metadata.schema(),
            )
        )

    @staticmethod
    def _check_sequence_number(min_sequence_number: int, manifest: ManifestFile) -> bool:
        """Ensure that no manifests are loaded that contain deletes that are older than the data.

        Args:
            min_sequence_number (int): The minimal sequence number.
            manifest (ManifestFile): A ManifestFile that can be either data or deletes.

        Returns:
            Boolean indicating if it is either a data file, or a relevant delete file.
        """
        return manifest.content == ManifestContent.DATA or (
            # Not interested in deletes that are older than the data
            manifest.content == ManifestContent.DELETES
            and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_sequence_number
        )

    def plan_files(self) -> Iterable[FileScanTask]:
        """Plans the relevant files by filtering on the PartitionSpecs.

        Returns:
            List of FileScanTasks that contain both data and delete files.
        """
        snapshot = self.snapshot()
        if not snapshot:
            return iter([])

        # step 1: filter manifests using partition summaries
        # the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id

        manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)

        residual_evaluators: Dict[int, Callable[[DataFile], ResidualEvaluator]] = KeyDefaultDict(self._build_residual_evaluator)

        manifests = [
            manifest_file
            for manifest_file in snapshot.manifests(self.io)
            if manifest_evaluators[manifest_file.partition_spec_id](manifest_file)
        ]

        # step 2: filter the data files in each manifest
        # this filter depends on the partition spec used to write the manifest file

        partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator)

        min_sequence_number = _min_sequence_number(manifests)

        data_entries: List[ManifestEntry] = []
        positional_delete_entries = SortedList(key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER)

        executor = ExecutorFactory.get_or_create()
        for manifest_entry in chain(
            *executor.map(
                lambda args: _open_manifest(*args),
                [
                    (
                        self.io,
                        manifest,
                        partition_evaluators[manifest.partition_spec_id],
                        self._build_metrics_evaluator(),
                    )
                    for manifest in manifests
                    if self._check_sequence_number(min_sequence_number, manifest)
                ],
            )
        ):
            data_file = manifest_entry.data_file
            if data_file.content == DataFileContent.DATA:
                data_entries.append(manifest_entry)
            elif data_file.content == DataFileContent.POSITION_DELETES:
                positional_delete_entries.add(manifest_entry)
            elif data_file.content == DataFileContent.EQUALITY_DELETES:
                raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568")
            else:
                raise ValueError(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}")

        return [
            FileScanTask(
                data_entry.data_file,
                delete_files=_match_deletes_to_data_file(
                    data_entry,
                    positional_delete_entries,
                ),
                residual=residual_evaluators[data_entry.data_file.spec_id](data_entry.data_file).residual_for(
                    data_entry.data_file.partition
                ),
            )
            for data_entry in data_entries
        ]

    def to_arrow(self) -> pa.Table:
        """Read an Arrow table eagerly from this DataScan.

        All rows will be loaded into memory at once.

        Returns:
            pa.Table: Materialized Arrow Table from the Iceberg table's DataScan
        """
        from pyiceberg.io.pyarrow import ArrowScan

        return ArrowScan(
            self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
        ).to_table(self.plan_files())

    def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
        """Return an Arrow RecordBatchReader from this DataScan.

        For large results, using a RecordBatchReader requires less memory than
        loading an Arrow Table for the same DataScan, because a RecordBatch
        is read one at a time.

        Returns:
            pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan
                which can be used to read a stream of record batches one by one.
        """
        import pyarrow as pa

        from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow

        target_schema = schema_to_pyarrow(self.projection())
        batches = ArrowScan(
            self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
        ).to_record_batches(self.plan_files())

        return pa.RecordBatchReader.from_batches(
            target_schema,
            batches,
        ).cast(target_schema)

    def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
        """Read a Pandas DataFrame eagerly from this Iceberg table.

        Returns:
            pd.DataFrame: Materialized Pandas Dataframe from the Iceberg table
        """
        return self.to_arrow().to_pandas(**kwargs)

    def to_duckdb(self, table_name: str, connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection:
        """Shorthand for loading the Iceberg Table in DuckDB.

        Returns:
            DuckDBPyConnection: In memory DuckDB connection with the Iceberg table.
        """
        import duckdb

        con = connection or duckdb.connect(database=":memory:")
        con.register(table_name, self.to_arrow())

        return con

    def to_ray(self) -> ray.data.dataset.Dataset:
        """Read a Ray Dataset eagerly from this Iceberg table.

        Returns:
            ray.data.dataset.Dataset: Materialized Ray Dataset from the Iceberg table
        """
        import ray

        return ray.data.from_arrow(self.to_arrow())

    def to_polars(self) -> pl.DataFrame:
        """Read a Polars DataFrame from this Iceberg table.

        Returns:
            pl.DataFrame: Materialized Polars Dataframe from the Iceberg table
        """
        import polars as pl

        result = pl.from_arrow(self.to_arrow())
        if isinstance(result, pl.Series):
            result = result.to_frame()

        return result

    def count(self) -> int:
        from pyiceberg.io.pyarrow import ArrowScan

        # Usage: Calculates the total number of records in a Scan that haven't had positional deletes.
        res = 0
        # every task is a FileScanTask
        tasks = self.plan_files()

        for task in tasks:
            # task.residual is a Boolean Expression if the filter condition is fully satisfied by the
            # partition value and task.delete_files represents that positional delete haven't been merged yet
            # hence those files have to read as a pyarrow table applying the filter and deletes
            if task.residual == AlwaysTrue() and len(task.delete_files) == 0:
                # Every File has a metadata stat that stores the file record count
                res += task.file.record_count
            else:
                arrow_scan = ArrowScan(
                    table_metadata=self.table_metadata,
                    io=self.io,
                    projected_schema=self.projection(),
                    row_filter=self.row_filter,
                    case_sensitive=self.case_sensitive,
                )
                tbl = arrow_scan.to_table([task])
                res += len(tbl)
        return res

plan_files()

Plans the relevant files by filtering on the PartitionSpecs.

Returns:

Type Description
Iterable[FileScanTask]

List of FileScanTasks that contain both data and delete files.

Source code in pyiceberg/table/__init__.py
def plan_files(self) -> Iterable[FileScanTask]:
    """Plans the relevant files by filtering on the PartitionSpecs.

    Returns:
        List of FileScanTasks that contain both data and delete files.
    """
    snapshot = self.snapshot()
    if not snapshot:
        return iter([])

    # step 1: filter manifests using partition summaries
    # the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id

    manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)

    residual_evaluators: Dict[int, Callable[[DataFile], ResidualEvaluator]] = KeyDefaultDict(self._build_residual_evaluator)

    manifests = [
        manifest_file
        for manifest_file in snapshot.manifests(self.io)
        if manifest_evaluators[manifest_file.partition_spec_id](manifest_file)
    ]

    # step 2: filter the data files in each manifest
    # this filter depends on the partition spec used to write the manifest file

    partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator)

    min_sequence_number = _min_sequence_number(manifests)

    data_entries: List[ManifestEntry] = []
    positional_delete_entries = SortedList(key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER)

    executor = ExecutorFactory.get_or_create()
    for manifest_entry in chain(
        *executor.map(
            lambda args: _open_manifest(*args),
            [
                (
                    self.io,
                    manifest,
                    partition_evaluators[manifest.partition_spec_id],
                    self._build_metrics_evaluator(),
                )
                for manifest in manifests
                if self._check_sequence_number(min_sequence_number, manifest)
            ],
        )
    ):
        data_file = manifest_entry.data_file
        if data_file.content == DataFileContent.DATA:
            data_entries.append(manifest_entry)
        elif data_file.content == DataFileContent.POSITION_DELETES:
            positional_delete_entries.add(manifest_entry)
        elif data_file.content == DataFileContent.EQUALITY_DELETES:
            raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568")
        else:
            raise ValueError(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}")

    return [
        FileScanTask(
            data_entry.data_file,
            delete_files=_match_deletes_to_data_file(
                data_entry,
                positional_delete_entries,
            ),
            residual=residual_evaluators[data_entry.data_file.spec_id](data_entry.data_file).residual_for(
                data_entry.data_file.partition
            ),
        )
        for data_entry in data_entries
    ]

to_arrow()

Read an Arrow table eagerly from this DataScan.

All rows will be loaded into memory at once.

Returns:

Type Description
Table

pa.Table: Materialized Arrow Table from the Iceberg table's DataScan

Source code in pyiceberg/table/__init__.py
def to_arrow(self) -> pa.Table:
    """Read an Arrow table eagerly from this DataScan.

    All rows will be loaded into memory at once.

    Returns:
        pa.Table: Materialized Arrow Table from the Iceberg table's DataScan
    """
    from pyiceberg.io.pyarrow import ArrowScan

    return ArrowScan(
        self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
    ).to_table(self.plan_files())

to_arrow_batch_reader()

Return an Arrow RecordBatchReader from this DataScan.

For large results, using a RecordBatchReader requires less memory than loading an Arrow Table for the same DataScan, because a RecordBatch is read one at a time.

Returns:

Type Description
RecordBatchReader

pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan which can be used to read a stream of record batches one by one.

Source code in pyiceberg/table/__init__.py
def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
    """Return an Arrow RecordBatchReader from this DataScan.

    For large results, using a RecordBatchReader requires less memory than
    loading an Arrow Table for the same DataScan, because a RecordBatch
    is read one at a time.

    Returns:
        pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan
            which can be used to read a stream of record batches one by one.
    """
    import pyarrow as pa

    from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow

    target_schema = schema_to_pyarrow(self.projection())
    batches = ArrowScan(
        self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
    ).to_record_batches(self.plan_files())

    return pa.RecordBatchReader.from_batches(
        target_schema,
        batches,
    ).cast(target_schema)

to_duckdb(table_name, connection=None)

Shorthand for loading the Iceberg Table in DuckDB.

Returns:

Name Type Description
DuckDBPyConnection DuckDBPyConnection

In memory DuckDB connection with the Iceberg table.

Source code in pyiceberg/table/__init__.py
def to_duckdb(self, table_name: str, connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection:
    """Shorthand for loading the Iceberg Table in DuckDB.

    Returns:
        DuckDBPyConnection: In memory DuckDB connection with the Iceberg table.
    """
    import duckdb

    con = connection or duckdb.connect(database=":memory:")
    con.register(table_name, self.to_arrow())

    return con

to_pandas(**kwargs)

Read a Pandas DataFrame eagerly from this Iceberg table.

Returns:

Type Description
DataFrame

pd.DataFrame: Materialized Pandas Dataframe from the Iceberg table

Source code in pyiceberg/table/__init__.py
def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
    """Read a Pandas DataFrame eagerly from this Iceberg table.

    Returns:
        pd.DataFrame: Materialized Pandas Dataframe from the Iceberg table
    """
    return self.to_arrow().to_pandas(**kwargs)

to_polars()

Read a Polars DataFrame from this Iceberg table.

Returns:

Type Description
DataFrame

pl.DataFrame: Materialized Polars Dataframe from the Iceberg table

Source code in pyiceberg/table/__init__.py
def to_polars(self) -> pl.DataFrame:
    """Read a Polars DataFrame from this Iceberg table.

    Returns:
        pl.DataFrame: Materialized Polars Dataframe from the Iceberg table
    """
    import polars as pl

    result = pl.from_arrow(self.to_arrow())
    if isinstance(result, pl.Series):
        result = result.to_frame()

    return result

to_ray()

Read a Ray Dataset eagerly from this Iceberg table.

Returns:

Type Description
Dataset

ray.data.dataset.Dataset: Materialized Ray Dataset from the Iceberg table

Source code in pyiceberg/table/__init__.py
def to_ray(self) -> ray.data.dataset.Dataset:
    """Read a Ray Dataset eagerly from this Iceberg table.

    Returns:
        ray.data.dataset.Dataset: Materialized Ray Dataset from the Iceberg table
    """
    import ray

    return ray.data.from_arrow(self.to_arrow())

FileScanTask dataclass

Bases: ScanTask

Task representing a data file and its corresponding delete files.

Source code in pyiceberg/table/__init__.py
@dataclass(init=False)
class FileScanTask(ScanTask):
    """Task representing a data file and its corresponding delete files."""

    file: DataFile
    delete_files: Set[DataFile]
    start: int
    length: int
    residual: BooleanExpression

    def __init__(
        self,
        data_file: DataFile,
        delete_files: Optional[Set[DataFile]] = None,
        start: Optional[int] = None,
        length: Optional[int] = None,
        residual: BooleanExpression = ALWAYS_TRUE,
    ) -> None:
        self.file = data_file
        self.delete_files = delete_files or set()
        self.start = start or 0
        self.length = length or data_file.file_size_in_bytes
        self.residual = residual

Namespace

Bases: IcebergRootModel[List[str]]

Reference to one or more levels of a namespace.

Source code in pyiceberg/table/__init__.py
class Namespace(IcebergRootModel[List[str]]):
    """Reference to one or more levels of a namespace."""

    root: List[str] = Field(
        ...,
        description="Reference to one or more levels of a namespace",
    )

StaticTable

Bases: Table

Load a table directly from a metadata file (i.e., without using a catalog).

Source code in pyiceberg/table/__init__.py
class StaticTable(Table):
    """Load a table directly from a metadata file (i.e., without using a catalog)."""

    def refresh(self) -> Table:
        """Refresh the current table metadata."""
        raise NotImplementedError("To be implemented")

    @classmethod
    def _metadata_location_from_version_hint(cls, metadata_location: str, properties: Properties = EMPTY_DICT) -> str:
        version_hint_location = os.path.join(metadata_location, "metadata", "version-hint.text")
        io = load_file_io(properties=properties, location=version_hint_location)
        file = io.new_input(version_hint_location)

        with file.open() as stream:
            content = stream.read().decode("utf-8")

        if content.endswith(".metadata.json"):
            return os.path.join(metadata_location, "metadata", content)
        elif content.isnumeric():
            return os.path.join(metadata_location, "metadata", "v%s.metadata.json").format(content)
        else:
            return os.path.join(metadata_location, "metadata", "%s.metadata.json").format(content)

    @classmethod
    def from_metadata(cls, metadata_location: str, properties: Properties = EMPTY_DICT) -> StaticTable:
        if not metadata_location.endswith(".metadata.json"):
            metadata_location = StaticTable._metadata_location_from_version_hint(metadata_location, properties)

        io = load_file_io(properties=properties, location=metadata_location)
        file = io.new_input(metadata_location)

        from pyiceberg.serializers import FromInputFile

        metadata = FromInputFile.table_metadata(file)

        from pyiceberg.catalog.noop import NoopCatalog

        return cls(
            identifier=("static-table", metadata_location),
            metadata_location=metadata_location,
            metadata=metadata,
            io=load_file_io({**properties, **metadata.properties}),
            catalog=NoopCatalog("static-table"),
        )

refresh()

Refresh the current table metadata.

Source code in pyiceberg/table/__init__.py
def refresh(self) -> Table:
    """Refresh the current table metadata."""
    raise NotImplementedError("To be implemented")

Table

An Iceberg table.

Source code in pyiceberg/table/__init__.py
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
class Table:
    """An Iceberg table."""

    _identifier: Identifier = Field()
    metadata: TableMetadata
    metadata_location: str = Field()
    io: FileIO
    catalog: Catalog
    config: Dict[str, str]

    def __init__(
        self,
        identifier: Identifier,
        metadata: TableMetadata,
        metadata_location: str,
        io: FileIO,
        catalog: Catalog,
        config: Dict[str, str] = EMPTY_DICT,
    ) -> None:
        self._identifier = identifier
        self.metadata = metadata
        self.metadata_location = metadata_location
        self.io = io
        self.catalog = catalog
        self.config = config

    def transaction(self) -> Transaction:
        """Create a new transaction object to first stage the changes, and then commit them to the catalog.

        Returns:
            The transaction object
        """
        return Transaction(self)

    @property
    def inspect(self) -> InspectTable:
        """Return the InspectTable object to browse the table metadata.

        Returns:
            InspectTable object based on this Table.
        """
        return InspectTable(self)

    @property
    def maintenance(self) -> MaintenanceTable:
        """Return the MaintenanceTable object for maintenance.

        Returns:
            MaintenanceTable object based on this Table.
        """
        return MaintenanceTable(self)

    def refresh(self) -> Table:
        """Refresh the current table metadata.

        Returns:
            An updated instance of the same Iceberg table
        """
        fresh = self.catalog.load_table(self._identifier)
        self.metadata = fresh.metadata
        self.io = fresh.io
        self.metadata_location = fresh.metadata_location
        return self

    def name(self) -> Identifier:
        """Return the identifier of this table.

        Returns:
            An Identifier tuple of the table name
        """
        return self._identifier

    def scan(
        self,
        row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE,
        selected_fields: Tuple[str, ...] = ("*",),
        case_sensitive: bool = True,
        snapshot_id: Optional[int] = None,
        options: Properties = EMPTY_DICT,
        limit: Optional[int] = None,
    ) -> DataScan:
        """Fetch a DataScan based on the table's current metadata.

            The data scan can be used to project the table's data
            that matches the provided row_filter onto the table's
            current schema.

        Args:
            row_filter:
                A string or BooleanExpression that describes the
                desired rows
            selected_fields:
                A tuple of strings representing the column names
                to return in the output dataframe.
            case_sensitive:
                If True column matching is case sensitive
            snapshot_id:
                Optional Snapshot ID to time travel to. If None,
                scans the table as of the current snapshot ID.
            options:
                Additional Table properties as a dictionary of
                string key value pairs to use for this scan.
            limit:
                An integer representing the number of rows to
                return in the scan result. If None, fetches all
                matching rows.

        Returns:
            A DataScan based on the table's current metadata.
        """
        return DataScan(
            table_metadata=self.metadata,
            io=self.io,
            row_filter=row_filter,
            selected_fields=selected_fields,
            case_sensitive=case_sensitive,
            snapshot_id=snapshot_id,
            options=options,
            limit=limit,
        )

    @property
    def format_version(self) -> TableVersion:
        return self.metadata.format_version

    def schema(self) -> Schema:
        """Return the schema for this table."""
        return next(schema for schema in self.metadata.schemas if schema.schema_id == self.metadata.current_schema_id)

    def schemas(self) -> Dict[int, Schema]:
        """Return a dict of the schema of this table."""
        return {schema.schema_id: schema for schema in self.metadata.schemas}

    def spec(self) -> PartitionSpec:
        """Return the partition spec of this table."""
        return next(spec for spec in self.metadata.partition_specs if spec.spec_id == self.metadata.default_spec_id)

    def specs(self) -> Dict[int, PartitionSpec]:
        """Return a dict the partition specs this table."""
        return {spec.spec_id: spec for spec in self.metadata.partition_specs}

    def sort_order(self) -> SortOrder:
        """Return the sort order of this table."""
        return next(
            sort_order for sort_order in self.metadata.sort_orders if sort_order.order_id == self.metadata.default_sort_order_id
        )

    def sort_orders(self) -> Dict[int, SortOrder]:
        """Return a dict of the sort orders of this table."""
        return {sort_order.order_id: sort_order for sort_order in self.metadata.sort_orders}

    def last_partition_id(self) -> int:
        """Return the highest assigned partition field ID across all specs or 999 if only the unpartitioned spec exists."""
        if self.metadata.last_partition_id:
            return self.metadata.last_partition_id
        return PARTITION_FIELD_ID_START - 1

    @property
    def properties(self) -> Dict[str, str]:
        """Properties of the table."""
        return self.metadata.properties

    def location(self) -> str:
        """Return the table's base location."""
        return self.metadata.location

    def location_provider(self) -> LocationProvider:
        """Return the table's location provider."""
        return load_location_provider(table_location=self.metadata.location, table_properties=self.metadata.properties)

    @property
    def last_sequence_number(self) -> int:
        return self.metadata.last_sequence_number

    def current_snapshot(self) -> Optional[Snapshot]:
        """Get the current snapshot for this table, or None if there is no current snapshot."""
        if self.metadata.current_snapshot_id is not None:
            return self.snapshot_by_id(self.metadata.current_snapshot_id)
        return None

    def snapshots(self) -> List[Snapshot]:
        return self.metadata.snapshots

    def snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]:
        """Get the snapshot of this table with the given id, or None if there is no matching snapshot."""
        return self.metadata.snapshot_by_id(snapshot_id)

    def snapshot_by_name(self, name: str) -> Optional[Snapshot]:
        """Return the snapshot referenced by the given name or null if no such reference exists."""
        if ref := self.metadata.refs.get(name):
            return self.snapshot_by_id(ref.snapshot_id)
        return None

    def snapshot_as_of_timestamp(self, timestamp_ms: int, inclusive: bool = True) -> Optional[Snapshot]:
        """Get the snapshot that was current as of or right before the given timestamp, or None if there is no matching snapshot.

        Args:
            timestamp_ms: Find snapshot that was current at/before this timestamp
            inclusive: Includes timestamp_ms in search when True. Excludes timestamp_ms when False
        """
        for log_entry in reversed(self.history()):
            if (inclusive and log_entry.timestamp_ms <= timestamp_ms) or log_entry.timestamp_ms < timestamp_ms:
                return self.snapshot_by_id(log_entry.snapshot_id)
        return None

    def history(self) -> List[SnapshotLogEntry]:
        """Get the snapshot history of this table."""
        return self.metadata.snapshot_log

    def manage_snapshots(self) -> ManageSnapshots:
        """
        Shorthand to run snapshot management operations like create branch, create tag, etc.

        Use table.manage_snapshots().<operation>().commit() to run a specific operation.
        Use table.manage_snapshots().<operation-one>().<operation-two>().commit() to run multiple operations.
        Pending changes are applied on commit.

        We can also use context managers to make more changes. For example,

        with table.manage_snapshots() as ms:
           ms.create_tag(snapshot_id1, "Tag_A").create_tag(snapshot_id2, "Tag_B")
        """
        return ManageSnapshots(transaction=Transaction(self, autocommit=True))

    def update_statistics(self) -> UpdateStatistics:
        """
        Shorthand to run statistics management operations like add statistics and remove statistics.

        Use table.update_statistics().<operation>().commit() to run a specific operation.
        Use table.update_statistics().<operation-one>().<operation-two>().commit() to run multiple operations.

        Pending changes are applied on commit.

        We can also use context managers to make more changes. For example:

        with table.update_statistics() as update:
            update.set_statistics(statistics_file=statistics_file)
            update.remove_statistics(snapshot_id=2)
        """
        return UpdateStatistics(transaction=Transaction(self, autocommit=True))

    def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema:
        """Create a new UpdateSchema to alter the columns of this table.

        Args:
            allow_incompatible_changes: If changes are allowed that might break downstream consumers.
            case_sensitive: If field names are case-sensitive.

        Returns:
            A new UpdateSchema.
        """
        return UpdateSchema(
            transaction=Transaction(self, autocommit=True),
            allow_incompatible_changes=allow_incompatible_changes,
            case_sensitive=case_sensitive,
            name_mapping=self.name_mapping(),
        )

    def name_mapping(self) -> Optional[NameMapping]:
        """Return the table's field-id NameMapping."""
        return self.metadata.name_mapping()

    def upsert(
        self,
        df: pa.Table,
        join_cols: Optional[List[str]] = None,
        when_matched_update_all: bool = True,
        when_not_matched_insert_all: bool = True,
        case_sensitive: bool = True,
        branch: Optional[str] = MAIN_BRANCH,
    ) -> UpsertResult:
        """Shorthand API for performing an upsert to an iceberg table.

        Args:

            df: The input dataframe to upsert with the table's data.
            join_cols: Columns to join on, if not provided, it will use the identifier-field-ids.
            when_matched_update_all: Bool indicating to update rows that are matched but require an update due to a value in a non-key column changing
            when_not_matched_insert_all: Bool indicating new rows to be inserted that do not match any existing rows in the table
            case_sensitive: Bool indicating if the match should be case-sensitive
            branch: Branch Reference to run the upsert operation

            To learn more about the identifier-field-ids: https://iceberg.apache.org/spec/#identifier-field-ids

                Example Use Cases:
                    Case 1: Both Parameters = True (Full Upsert)
                    Existing row found → Update it
                    New row found → Insert it

                    Case 2: when_matched_update_all = False, when_not_matched_insert_all = True
                    Existing row found → Do nothing (no updates)
                    New row found → Insert it

                    Case 3: when_matched_update_all = True, when_not_matched_insert_all = False
                    Existing row found → Update it
                    New row found → Do nothing (no inserts)

                    Case 4: Both Parameters = False (No Merge Effect)
                    Existing row found → Do nothing
                    New row found → Do nothing
                    (Function effectively does nothing)


        Returns:
            An UpsertResult class (contains details of rows updated and inserted)
        """
        with self.transaction() as tx:
            return tx.upsert(
                df=df,
                join_cols=join_cols,
                when_matched_update_all=when_matched_update_all,
                when_not_matched_insert_all=when_not_matched_insert_all,
                case_sensitive=case_sensitive,
                branch=branch,
            )

    def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: Optional[str] = MAIN_BRANCH) -> None:
        """
        Shorthand API for appending a PyArrow table to the table.

        Args:
            df: The Arrow dataframe that will be appended to overwrite the table
            snapshot_properties: Custom properties to be added to the snapshot summary
            branch: Branch Reference to run the append operation
        """
        with self.transaction() as tx:
            tx.append(df=df, snapshot_properties=snapshot_properties, branch=branch)

    def dynamic_partition_overwrite(
        self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: Optional[str] = MAIN_BRANCH
    ) -> None:
        """Shorthand for dynamic overwriting the table with a PyArrow table.

        Old partitions are auto detected and replaced with data files created for input arrow table.
        Args:
            df: The Arrow dataframe that will be used to overwrite the table
            snapshot_properties: Custom properties to be added to the snapshot summary
            branch: Branch Reference to run the dynamic partition overwrite operation
        """
        with self.transaction() as tx:
            tx.dynamic_partition_overwrite(df=df, snapshot_properties=snapshot_properties, branch=branch)

    def overwrite(
        self,
        df: pa.Table,
        overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
        snapshot_properties: Dict[str, str] = EMPTY_DICT,
        case_sensitive: bool = True,
        branch: Optional[str] = MAIN_BRANCH,
    ) -> None:
        """
        Shorthand for overwriting the table with a PyArrow table.

        An overwrite may produce zero or more snapshots based on the operation:

            - DELETE: In case existing Parquet files can be dropped completely.
            - REPLACE: In case existing Parquet files need to be rewritten.
            - APPEND: In case new data is being inserted into the table.

        Args:
            df: The Arrow dataframe that will be used to overwrite the table
            overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
                              or a boolean expression in case of a partial overwrite
            snapshot_properties: Custom properties to be added to the snapshot summary
            case_sensitive: A bool determine if the provided `overwrite_filter` is case-sensitive
            branch: Branch Reference to run the overwrite operation
        """
        with self.transaction() as tx:
            tx.overwrite(
                df=df,
                overwrite_filter=overwrite_filter,
                case_sensitive=case_sensitive,
                snapshot_properties=snapshot_properties,
                branch=branch,
            )

    def delete(
        self,
        delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
        snapshot_properties: Dict[str, str] = EMPTY_DICT,
        case_sensitive: bool = True,
        branch: Optional[str] = MAIN_BRANCH,
    ) -> None:
        """
        Shorthand for deleting rows from the table.

        Args:
            delete_filter: The predicate that used to remove rows
            snapshot_properties: Custom properties to be added to the snapshot summary
            case_sensitive: A bool determine if the provided `delete_filter` is case-sensitive
            branch: Branch Reference to run the delete operation
        """
        with self.transaction() as tx:
            tx.delete(
                delete_filter=delete_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties, branch=branch
            )

    def add_files(
        self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True
    ) -> None:
        """
        Shorthand API for adding files as data files to the table.

        Args:
            file_paths: The list of full file paths to be added as data files to the table

        Raises:
            FileNotFoundError: If the file does not exist.
        """
        with self.transaction() as tx:
            tx.add_files(
                file_paths=file_paths, snapshot_properties=snapshot_properties, check_duplicate_files=check_duplicate_files
            )

    def update_spec(self, case_sensitive: bool = True) -> UpdateSpec:
        return UpdateSpec(Transaction(self, autocommit=True), case_sensitive=case_sensitive)

    def refs(self) -> Dict[str, SnapshotRef]:
        """Return the snapshot references in the table."""
        return self.metadata.refs

    def _do_commit(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...]) -> None:
        response = self.catalog.commit_table(self, requirements, updates)

        # https://github.com/apache/iceberg/blob/f6faa58/core/src/main/java/org/apache/iceberg/CatalogUtil.java#L527
        # delete old metadata if METADATA_DELETE_AFTER_COMMIT_ENABLED is set to true and uses
        # TableProperties.METADATA_PREVIOUS_VERSIONS_MAX to determine how many previous versions to keep -
        # everything else will be removed.
        try:
            self.catalog._delete_old_metadata(self.io, self.metadata, response.metadata)
        except Exception as e:
            warnings.warn(f"Failed to delete old metadata after commit: {e}")

        self.metadata = response.metadata
        self.metadata_location = response.metadata_location

    def __eq__(self, other: Any) -> bool:
        """Return the equality of two instances of the Table class."""
        return (
            self.name() == other.name() and self.metadata == other.metadata and self.metadata_location == other.metadata_location
            if isinstance(other, Table)
            else False
        )

    def __repr__(self) -> str:
        """Return the string representation of the Table class."""
        table_name = self.catalog.table_name_from(self._identifier)
        schema_str = ",\n  ".join(str(column) for column in self.schema().columns if self.schema())
        partition_str = f"partition by: [{', '.join(field.name for field in self.spec().fields if self.spec())}]"
        sort_order_str = f"sort order: [{', '.join(str(field) for field in self.sort_order().fields if self.sort_order())}]"
        snapshot_str = f"snapshot: {str(self.current_snapshot()) if self.current_snapshot() else 'null'}"
        result_str = f"{table_name}(\n  {schema_str}\n),\n{partition_str},\n{sort_order_str},\n{snapshot_str}"
        return result_str

    def to_daft(self) -> daft.DataFrame:
        """Read a Daft DataFrame lazily from this Iceberg table.

        Returns:
            daft.DataFrame: Unmaterialized Daft Dataframe created from the Iceberg table
        """
        import daft

        return daft.read_iceberg(self)

    def to_bodo(self) -> bd.DataFrame:
        """Read a bodo DataFrame lazily from this Iceberg table.

        Returns:
            bd.DataFrame: Unmaterialized Bodo Dataframe created from the Iceberg table
        """
        import bodo.pandas as bd

        return bd.read_iceberg_table(self)

    def to_polars(self) -> pl.LazyFrame:
        """Lazily read from this Apache Iceberg table.

        Returns:
            pl.LazyFrame: Unmaterialized Polars LazyFrame created from the Iceberg table
        """
        import polars as pl

        return pl.scan_iceberg(self)

    def __datafusion_table_provider__(self) -> "IcebergDataFusionTable":
        """Return the DataFusion table provider PyCapsule interface.

        To support DataFusion features such as push down filtering, this function will return a PyCapsule
        interface that conforms to the FFI Table Provider required by DataFusion. From an end user perspective
        you should not need to call this function directly. Instead you can use ``register_table_provider`` in
        the DataFusion SessionContext.

        Returns:
            A PyCapsule DataFusion TableProvider interface.

        Example:
            ```python
            from datafusion import SessionContext
            from pyiceberg.catalog import load_catalog
            import pyarrow as pa
            catalog = load_catalog("catalog", type="in-memory")
            catalog.create_namespace_if_not_exists("default")
            data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
            iceberg_table = catalog.create_table("default.test", schema=data.schema)
            iceberg_table.append(data)
            ctx = SessionContext()
            ctx.register_table_provider("test", iceberg_table)
            ctx.table("test").show()
            ```
            Results in
            ```
            DataFrame()
            +---+---+
            | x | y |
            +---+---+
            | 1 | 4 |
            | 2 | 5 |
            | 3 | 6 |
            +---+---+
            ```
        """
        from pyiceberg_core.datafusion import IcebergDataFusionTable

        return IcebergDataFusionTable(
            identifier=self.name(),
            metadata_location=self.metadata_location,
            file_io_properties=self.io.properties,
        ).__datafusion_table_provider__()

inspect property

Return the InspectTable object to browse the table metadata.

Returns:

Type Description
InspectTable

InspectTable object based on this Table.

maintenance property

Return the MaintenanceTable object for maintenance.

Returns:

Type Description
MaintenanceTable

MaintenanceTable object based on this Table.

properties property

Properties of the table.

__datafusion_table_provider__()

Return the DataFusion table provider PyCapsule interface.

To support DataFusion features such as push down filtering, this function will return a PyCapsule interface that conforms to the FFI Table Provider required by DataFusion. From an end user perspective you should not need to call this function directly. Instead you can use register_table_provider in the DataFusion SessionContext.

Returns:

Type Description
'IcebergDataFusionTable'

A PyCapsule DataFusion TableProvider interface.

Example

from datafusion import SessionContext
from pyiceberg.catalog import load_catalog
import pyarrow as pa
catalog = load_catalog("catalog", type="in-memory")
catalog.create_namespace_if_not_exists("default")
data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
iceberg_table = catalog.create_table("default.test", schema=data.schema)
iceberg_table.append(data)
ctx = SessionContext()
ctx.register_table_provider("test", iceberg_table)
ctx.table("test").show()
Results in
DataFrame()
+---+---+
| x | y |
+---+---+
| 1 | 4 |
| 2 | 5 |
| 3 | 6 |
+---+---+

Source code in pyiceberg/table/__init__.py
def __datafusion_table_provider__(self) -> "IcebergDataFusionTable":
    """Return the DataFusion table provider PyCapsule interface.

    To support DataFusion features such as push down filtering, this function will return a PyCapsule
    interface that conforms to the FFI Table Provider required by DataFusion. From an end user perspective
    you should not need to call this function directly. Instead you can use ``register_table_provider`` in
    the DataFusion SessionContext.

    Returns:
        A PyCapsule DataFusion TableProvider interface.

    Example:
        ```python
        from datafusion import SessionContext
        from pyiceberg.catalog import load_catalog
        import pyarrow as pa
        catalog = load_catalog("catalog", type="in-memory")
        catalog.create_namespace_if_not_exists("default")
        data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
        iceberg_table = catalog.create_table("default.test", schema=data.schema)
        iceberg_table.append(data)
        ctx = SessionContext()
        ctx.register_table_provider("test", iceberg_table)
        ctx.table("test").show()
        ```
        Results in
        ```
        DataFrame()
        +---+---+
        | x | y |
        +---+---+
        | 1 | 4 |
        | 2 | 5 |
        | 3 | 6 |
        +---+---+
        ```
    """
    from pyiceberg_core.datafusion import IcebergDataFusionTable

    return IcebergDataFusionTable(
        identifier=self.name(),
        metadata_location=self.metadata_location,
        file_io_properties=self.io.properties,
    ).__datafusion_table_provider__()

__eq__(other)

Return the equality of two instances of the Table class.

Source code in pyiceberg/table/__init__.py
def __eq__(self, other: Any) -> bool:
    """Return the equality of two instances of the Table class."""
    return (
        self.name() == other.name() and self.metadata == other.metadata and self.metadata_location == other.metadata_location
        if isinstance(other, Table)
        else False
    )

__repr__()

Return the string representation of the Table class.

Source code in pyiceberg/table/__init__.py
def __repr__(self) -> str:
    """Return the string representation of the Table class."""
    table_name = self.catalog.table_name_from(self._identifier)
    schema_str = ",\n  ".join(str(column) for column in self.schema().columns if self.schema())
    partition_str = f"partition by: [{', '.join(field.name for field in self.spec().fields if self.spec())}]"
    sort_order_str = f"sort order: [{', '.join(str(field) for field in self.sort_order().fields if self.sort_order())}]"
    snapshot_str = f"snapshot: {str(self.current_snapshot()) if self.current_snapshot() else 'null'}"
    result_str = f"{table_name}(\n  {schema_str}\n),\n{partition_str},\n{sort_order_str},\n{snapshot_str}"
    return result_str

add_files(file_paths, snapshot_properties=EMPTY_DICT, check_duplicate_files=True)

Shorthand API for adding files as data files to the table.

Parameters:

Name Type Description Default
file_paths List[str]

The list of full file paths to be added as data files to the table

required

Raises:

Type Description
FileNotFoundError

If the file does not exist.

Source code in pyiceberg/table/__init__.py
def add_files(
    self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True
) -> None:
    """
    Shorthand API for adding files as data files to the table.

    Args:
        file_paths: The list of full file paths to be added as data files to the table

    Raises:
        FileNotFoundError: If the file does not exist.
    """
    with self.transaction() as tx:
        tx.add_files(
            file_paths=file_paths, snapshot_properties=snapshot_properties, check_duplicate_files=check_duplicate_files
        )

append(df, snapshot_properties=EMPTY_DICT, branch=MAIN_BRANCH)

Shorthand API for appending a PyArrow table to the table.

Parameters:

Name Type Description Default
df Table

The Arrow dataframe that will be appended to overwrite the table

required
snapshot_properties Dict[str, str]

Custom properties to be added to the snapshot summary

EMPTY_DICT
branch Optional[str]

Branch Reference to run the append operation

MAIN_BRANCH
Source code in pyiceberg/table/__init__.py
def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: Optional[str] = MAIN_BRANCH) -> None:
    """
    Shorthand API for appending a PyArrow table to the table.

    Args:
        df: The Arrow dataframe that will be appended to overwrite the table
        snapshot_properties: Custom properties to be added to the snapshot summary
        branch: Branch Reference to run the append operation
    """
    with self.transaction() as tx:
        tx.append(df=df, snapshot_properties=snapshot_properties, branch=branch)

current_snapshot()

Get the current snapshot for this table, or None if there is no current snapshot.

Source code in pyiceberg/table/__init__.py
def current_snapshot(self) -> Optional[Snapshot]:
    """Get the current snapshot for this table, or None if there is no current snapshot."""
    if self.metadata.current_snapshot_id is not None:
        return self.snapshot_by_id(self.metadata.current_snapshot_id)
    return None

delete(delete_filter=ALWAYS_TRUE, snapshot_properties=EMPTY_DICT, case_sensitive=True, branch=MAIN_BRANCH)

Shorthand for deleting rows from the table.

Parameters:

Name Type Description Default
delete_filter Union[BooleanExpression, str]

The predicate that used to remove rows

ALWAYS_TRUE
snapshot_properties Dict[str, str]

Custom properties to be added to the snapshot summary

EMPTY_DICT
case_sensitive bool

A bool determine if the provided delete_filter is case-sensitive

True
branch Optional[str]

Branch Reference to run the delete operation

MAIN_BRANCH
Source code in pyiceberg/table/__init__.py
def delete(
    self,
    delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
    snapshot_properties: Dict[str, str] = EMPTY_DICT,
    case_sensitive: bool = True,
    branch: Optional[str] = MAIN_BRANCH,
) -> None:
    """
    Shorthand for deleting rows from the table.

    Args:
        delete_filter: The predicate that used to remove rows
        snapshot_properties: Custom properties to be added to the snapshot summary
        case_sensitive: A bool determine if the provided `delete_filter` is case-sensitive
        branch: Branch Reference to run the delete operation
    """
    with self.transaction() as tx:
        tx.delete(
            delete_filter=delete_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties, branch=branch
        )

dynamic_partition_overwrite(df, snapshot_properties=EMPTY_DICT, branch=MAIN_BRANCH)

Shorthand for dynamic overwriting the table with a PyArrow table.

Old partitions are auto detected and replaced with data files created for input arrow table. Args: df: The Arrow dataframe that will be used to overwrite the table snapshot_properties: Custom properties to be added to the snapshot summary branch: Branch Reference to run the dynamic partition overwrite operation

Source code in pyiceberg/table/__init__.py
def dynamic_partition_overwrite(
    self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: Optional[str] = MAIN_BRANCH
) -> None:
    """Shorthand for dynamic overwriting the table with a PyArrow table.

    Old partitions are auto detected and replaced with data files created for input arrow table.
    Args:
        df: The Arrow dataframe that will be used to overwrite the table
        snapshot_properties: Custom properties to be added to the snapshot summary
        branch: Branch Reference to run the dynamic partition overwrite operation
    """
    with self.transaction() as tx:
        tx.dynamic_partition_overwrite(df=df, snapshot_properties=snapshot_properties, branch=branch)

history()

Get the snapshot history of this table.

Source code in pyiceberg/table/__init__.py
def history(self) -> List[SnapshotLogEntry]:
    """Get the snapshot history of this table."""
    return self.metadata.snapshot_log

last_partition_id()

Return the highest assigned partition field ID across all specs or 999 if only the unpartitioned spec exists.

Source code in pyiceberg/table/__init__.py
def last_partition_id(self) -> int:
    """Return the highest assigned partition field ID across all specs or 999 if only the unpartitioned spec exists."""
    if self.metadata.last_partition_id:
        return self.metadata.last_partition_id
    return PARTITION_FIELD_ID_START - 1

location()

Return the table's base location.

Source code in pyiceberg/table/__init__.py
def location(self) -> str:
    """Return the table's base location."""
    return self.metadata.location

location_provider()

Return the table's location provider.

Source code in pyiceberg/table/__init__.py
def location_provider(self) -> LocationProvider:
    """Return the table's location provider."""
    return load_location_provider(table_location=self.metadata.location, table_properties=self.metadata.properties)

manage_snapshots()

Shorthand to run snapshot management operations like create branch, create tag, etc.

Use table.manage_snapshots().().commit() to run a specific operation. Use table.manage_snapshots().().().commit() to run multiple operations. Pending changes are applied on commit.

We can also use context managers to make more changes. For example,

with table.manage_snapshots() as ms: ms.create_tag(snapshot_id1, "Tag_A").create_tag(snapshot_id2, "Tag_B")

Source code in pyiceberg/table/__init__.py
def manage_snapshots(self) -> ManageSnapshots:
    """
    Shorthand to run snapshot management operations like create branch, create tag, etc.

    Use table.manage_snapshots().<operation>().commit() to run a specific operation.
    Use table.manage_snapshots().<operation-one>().<operation-two>().commit() to run multiple operations.
    Pending changes are applied on commit.

    We can also use context managers to make more changes. For example,

    with table.manage_snapshots() as ms:
       ms.create_tag(snapshot_id1, "Tag_A").create_tag(snapshot_id2, "Tag_B")
    """
    return ManageSnapshots(transaction=Transaction(self, autocommit=True))

name()

Return the identifier of this table.

Returns:

Type Description
Identifier

An Identifier tuple of the table name

Source code in pyiceberg/table/__init__.py
def name(self) -> Identifier:
    """Return the identifier of this table.

    Returns:
        An Identifier tuple of the table name
    """
    return self._identifier

name_mapping()

Return the table's field-id NameMapping.

Source code in pyiceberg/table/__init__.py
def name_mapping(self) -> Optional[NameMapping]:
    """Return the table's field-id NameMapping."""
    return self.metadata.name_mapping()

overwrite(df, overwrite_filter=ALWAYS_TRUE, snapshot_properties=EMPTY_DICT, case_sensitive=True, branch=MAIN_BRANCH)

Shorthand for overwriting the table with a PyArrow table.

An overwrite may produce zero or more snapshots based on the operation:

- DELETE: In case existing Parquet files can be dropped completely.
- REPLACE: In case existing Parquet files need to be rewritten.
- APPEND: In case new data is being inserted into the table.

Parameters:

Name Type Description Default
df Table

The Arrow dataframe that will be used to overwrite the table

required
overwrite_filter Union[BooleanExpression, str]

ALWAYS_TRUE when you overwrite all the data, or a boolean expression in case of a partial overwrite

ALWAYS_TRUE
snapshot_properties Dict[str, str]

Custom properties to be added to the snapshot summary

EMPTY_DICT
case_sensitive bool

A bool determine if the provided overwrite_filter is case-sensitive

True
branch Optional[str]

Branch Reference to run the overwrite operation

MAIN_BRANCH
Source code in pyiceberg/table/__init__.py
def overwrite(
    self,
    df: pa.Table,
    overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
    snapshot_properties: Dict[str, str] = EMPTY_DICT,
    case_sensitive: bool = True,
    branch: Optional[str] = MAIN_BRANCH,
) -> None:
    """
    Shorthand for overwriting the table with a PyArrow table.

    An overwrite may produce zero or more snapshots based on the operation:

        - DELETE: In case existing Parquet files can be dropped completely.
        - REPLACE: In case existing Parquet files need to be rewritten.
        - APPEND: In case new data is being inserted into the table.

    Args:
        df: The Arrow dataframe that will be used to overwrite the table
        overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
                          or a boolean expression in case of a partial overwrite
        snapshot_properties: Custom properties to be added to the snapshot summary
        case_sensitive: A bool determine if the provided `overwrite_filter` is case-sensitive
        branch: Branch Reference to run the overwrite operation
    """
    with self.transaction() as tx:
        tx.overwrite(
            df=df,
            overwrite_filter=overwrite_filter,
            case_sensitive=case_sensitive,
            snapshot_properties=snapshot_properties,
            branch=branch,
        )

refresh()

Refresh the current table metadata.

Returns:

Type Description
Table

An updated instance of the same Iceberg table

Source code in pyiceberg/table/__init__.py
def refresh(self) -> Table:
    """Refresh the current table metadata.

    Returns:
        An updated instance of the same Iceberg table
    """
    fresh = self.catalog.load_table(self._identifier)
    self.metadata = fresh.metadata
    self.io = fresh.io
    self.metadata_location = fresh.metadata_location
    return self

refs()

Return the snapshot references in the table.

Source code in pyiceberg/table/__init__.py
def refs(self) -> Dict[str, SnapshotRef]:
    """Return the snapshot references in the table."""
    return self.metadata.refs

scan(row_filter=ALWAYS_TRUE, selected_fields=('*',), case_sensitive=True, snapshot_id=None, options=EMPTY_DICT, limit=None)

Fetch a DataScan based on the table's current metadata.

The data scan can be used to project the table's data
that matches the provided row_filter onto the table's
current schema.

Parameters:

Name Type Description Default
row_filter Union[str, BooleanExpression]

A string or BooleanExpression that describes the desired rows

ALWAYS_TRUE
selected_fields Tuple[str, ...]

A tuple of strings representing the column names to return in the output dataframe.

('*',)
case_sensitive bool

If True column matching is case sensitive

True
snapshot_id Optional[int]

Optional Snapshot ID to time travel to. If None, scans the table as of the current snapshot ID.

None
options Properties

Additional Table properties as a dictionary of string key value pairs to use for this scan.

EMPTY_DICT
limit Optional[int]

An integer representing the number of rows to return in the scan result. If None, fetches all matching rows.

None

Returns:

Type Description
DataScan

A DataScan based on the table's current metadata.

Source code in pyiceberg/table/__init__.py
def scan(
    self,
    row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE,
    selected_fields: Tuple[str, ...] = ("*",),
    case_sensitive: bool = True,
    snapshot_id: Optional[int] = None,
    options: Properties = EMPTY_DICT,
    limit: Optional[int] = None,
) -> DataScan:
    """Fetch a DataScan based on the table's current metadata.

        The data scan can be used to project the table's data
        that matches the provided row_filter onto the table's
        current schema.

    Args:
        row_filter:
            A string or BooleanExpression that describes the
            desired rows
        selected_fields:
            A tuple of strings representing the column names
            to return in the output dataframe.
        case_sensitive:
            If True column matching is case sensitive
        snapshot_id:
            Optional Snapshot ID to time travel to. If None,
            scans the table as of the current snapshot ID.
        options:
            Additional Table properties as a dictionary of
            string key value pairs to use for this scan.
        limit:
            An integer representing the number of rows to
            return in the scan result. If None, fetches all
            matching rows.

    Returns:
        A DataScan based on the table's current metadata.
    """
    return DataScan(
        table_metadata=self.metadata,
        io=self.io,
        row_filter=row_filter,
        selected_fields=selected_fields,
        case_sensitive=case_sensitive,
        snapshot_id=snapshot_id,
        options=options,
        limit=limit,
    )

schema()

Return the schema for this table.

Source code in pyiceberg/table/__init__.py
def schema(self) -> Schema:
    """Return the schema for this table."""
    return next(schema for schema in self.metadata.schemas if schema.schema_id == self.metadata.current_schema_id)

schemas()

Return a dict of the schema of this table.

Source code in pyiceberg/table/__init__.py
def schemas(self) -> Dict[int, Schema]:
    """Return a dict of the schema of this table."""
    return {schema.schema_id: schema for schema in self.metadata.schemas}

snapshot_as_of_timestamp(timestamp_ms, inclusive=True)

Get the snapshot that was current as of or right before the given timestamp, or None if there is no matching snapshot.

Parameters:

Name Type Description Default
timestamp_ms int

Find snapshot that was current at/before this timestamp

required
inclusive bool

Includes timestamp_ms in search when True. Excludes timestamp_ms when False

True
Source code in pyiceberg/table/__init__.py
def snapshot_as_of_timestamp(self, timestamp_ms: int, inclusive: bool = True) -> Optional[Snapshot]:
    """Get the snapshot that was current as of or right before the given timestamp, or None if there is no matching snapshot.

    Args:
        timestamp_ms: Find snapshot that was current at/before this timestamp
        inclusive: Includes timestamp_ms in search when True. Excludes timestamp_ms when False
    """
    for log_entry in reversed(self.history()):
        if (inclusive and log_entry.timestamp_ms <= timestamp_ms) or log_entry.timestamp_ms < timestamp_ms:
            return self.snapshot_by_id(log_entry.snapshot_id)
    return None

snapshot_by_id(snapshot_id)

Get the snapshot of this table with the given id, or None if there is no matching snapshot.

Source code in pyiceberg/table/__init__.py
def snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]:
    """Get the snapshot of this table with the given id, or None if there is no matching snapshot."""
    return self.metadata.snapshot_by_id(snapshot_id)

snapshot_by_name(name)

Return the snapshot referenced by the given name or null if no such reference exists.

Source code in pyiceberg/table/__init__.py
def snapshot_by_name(self, name: str) -> Optional[Snapshot]:
    """Return the snapshot referenced by the given name or null if no such reference exists."""
    if ref := self.metadata.refs.get(name):
        return self.snapshot_by_id(ref.snapshot_id)
    return None

sort_order()

Return the sort order of this table.

Source code in pyiceberg/table/__init__.py
def sort_order(self) -> SortOrder:
    """Return the sort order of this table."""
    return next(
        sort_order for sort_order in self.metadata.sort_orders if sort_order.order_id == self.metadata.default_sort_order_id
    )

sort_orders()

Return a dict of the sort orders of this table.

Source code in pyiceberg/table/__init__.py
def sort_orders(self) -> Dict[int, SortOrder]:
    """Return a dict of the sort orders of this table."""
    return {sort_order.order_id: sort_order for sort_order in self.metadata.sort_orders}

spec()

Return the partition spec of this table.

Source code in pyiceberg/table/__init__.py
def spec(self) -> PartitionSpec:
    """Return the partition spec of this table."""
    return next(spec for spec in self.metadata.partition_specs if spec.spec_id == self.metadata.default_spec_id)

specs()

Return a dict the partition specs this table.

Source code in pyiceberg/table/__init__.py
def specs(self) -> Dict[int, PartitionSpec]:
    """Return a dict the partition specs this table."""
    return {spec.spec_id: spec for spec in self.metadata.partition_specs}

to_bodo()

Read a bodo DataFrame lazily from this Iceberg table.

Returns:

Type Description
DataFrame

bd.DataFrame: Unmaterialized Bodo Dataframe created from the Iceberg table

Source code in pyiceberg/table/__init__.py
def to_bodo(self) -> bd.DataFrame:
    """Read a bodo DataFrame lazily from this Iceberg table.

    Returns:
        bd.DataFrame: Unmaterialized Bodo Dataframe created from the Iceberg table
    """
    import bodo.pandas as bd

    return bd.read_iceberg_table(self)

to_daft()

Read a Daft DataFrame lazily from this Iceberg table.

Returns:

Type Description
DataFrame

daft.DataFrame: Unmaterialized Daft Dataframe created from the Iceberg table

Source code in pyiceberg/table/__init__.py
def to_daft(self) -> daft.DataFrame:
    """Read a Daft DataFrame lazily from this Iceberg table.

    Returns:
        daft.DataFrame: Unmaterialized Daft Dataframe created from the Iceberg table
    """
    import daft

    return daft.read_iceberg(self)

to_polars()

Lazily read from this Apache Iceberg table.

Returns:

Type Description
LazyFrame

pl.LazyFrame: Unmaterialized Polars LazyFrame created from the Iceberg table

Source code in pyiceberg/table/__init__.py
def to_polars(self) -> pl.LazyFrame:
    """Lazily read from this Apache Iceberg table.

    Returns:
        pl.LazyFrame: Unmaterialized Polars LazyFrame created from the Iceberg table
    """
    import polars as pl

    return pl.scan_iceberg(self)

transaction()

Create a new transaction object to first stage the changes, and then commit them to the catalog.

Returns:

Type Description
Transaction

The transaction object

Source code in pyiceberg/table/__init__.py
def transaction(self) -> Transaction:
    """Create a new transaction object to first stage the changes, and then commit them to the catalog.

    Returns:
        The transaction object
    """
    return Transaction(self)

update_schema(allow_incompatible_changes=False, case_sensitive=True)

Create a new UpdateSchema to alter the columns of this table.

Parameters:

Name Type Description Default
allow_incompatible_changes bool

If changes are allowed that might break downstream consumers.

False
case_sensitive bool

If field names are case-sensitive.

True

Returns:

Type Description
UpdateSchema

A new UpdateSchema.

Source code in pyiceberg/table/__init__.py
def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema:
    """Create a new UpdateSchema to alter the columns of this table.

    Args:
        allow_incompatible_changes: If changes are allowed that might break downstream consumers.
        case_sensitive: If field names are case-sensitive.

    Returns:
        A new UpdateSchema.
    """
    return UpdateSchema(
        transaction=Transaction(self, autocommit=True),
        allow_incompatible_changes=allow_incompatible_changes,
        case_sensitive=case_sensitive,
        name_mapping=self.name_mapping(),
    )

update_statistics()

Shorthand to run statistics management operations like add statistics and remove statistics.

Use table.update_statistics().().commit() to run a specific operation. Use table.update_statistics().().().commit() to run multiple operations.

Pending changes are applied on commit.

We can also use context managers to make more changes. For example:

with table.update_statistics() as update: update.set_statistics(statistics_file=statistics_file) update.remove_statistics(snapshot_id=2)

Source code in pyiceberg/table/__init__.py
def update_statistics(self) -> UpdateStatistics:
    """
    Shorthand to run statistics management operations like add statistics and remove statistics.

    Use table.update_statistics().<operation>().commit() to run a specific operation.
    Use table.update_statistics().<operation-one>().<operation-two>().commit() to run multiple operations.

    Pending changes are applied on commit.

    We can also use context managers to make more changes. For example:

    with table.update_statistics() as update:
        update.set_statistics(statistics_file=statistics_file)
        update.remove_statistics(snapshot_id=2)
    """
    return UpdateStatistics(transaction=Transaction(self, autocommit=True))

upsert(df, join_cols=None, when_matched_update_all=True, when_not_matched_insert_all=True, case_sensitive=True, branch=MAIN_BRANCH)

Shorthand API for performing an upsert to an iceberg table.

Args:

df: The input dataframe to upsert with the table's data.
join_cols: Columns to join on, if not provided, it will use the identifier-field-ids.
when_matched_update_all: Bool indicating to update rows that are matched but require an update due to a value in a non-key column changing
when_not_matched_insert_all: Bool indicating new rows to be inserted that do not match any existing rows in the table
case_sensitive: Bool indicating if the match should be case-sensitive
branch: Branch Reference to run the upsert operation

To learn more about the identifier-field-ids: https://iceberg.apache.org/spec/#identifier-field-ids

    Example Use Cases:
        Case 1: Both Parameters = True (Full Upsert)
        Existing row found → Update it
        New row found → Insert it

        Case 2: when_matched_update_all = False, when_not_matched_insert_all = True
        Existing row found → Do nothing (no updates)
        New row found → Insert it

        Case 3: when_matched_update_all = True, when_not_matched_insert_all = False
        Existing row found → Update it
        New row found → Do nothing (no inserts)

        Case 4: Both Parameters = False (No Merge Effect)
        Existing row found → Do nothing
        New row found → Do nothing
        (Function effectively does nothing)

Returns:

Type Description
UpsertResult

An UpsertResult class (contains details of rows updated and inserted)

Source code in pyiceberg/table/__init__.py
def upsert(
    self,
    df: pa.Table,
    join_cols: Optional[List[str]] = None,
    when_matched_update_all: bool = True,
    when_not_matched_insert_all: bool = True,
    case_sensitive: bool = True,
    branch: Optional[str] = MAIN_BRANCH,
) -> UpsertResult:
    """Shorthand API for performing an upsert to an iceberg table.

    Args:

        df: The input dataframe to upsert with the table's data.
        join_cols: Columns to join on, if not provided, it will use the identifier-field-ids.
        when_matched_update_all: Bool indicating to update rows that are matched but require an update due to a value in a non-key column changing
        when_not_matched_insert_all: Bool indicating new rows to be inserted that do not match any existing rows in the table
        case_sensitive: Bool indicating if the match should be case-sensitive
        branch: Branch Reference to run the upsert operation

        To learn more about the identifier-field-ids: https://iceberg.apache.org/spec/#identifier-field-ids

            Example Use Cases:
                Case 1: Both Parameters = True (Full Upsert)
                Existing row found → Update it
                New row found → Insert it

                Case 2: when_matched_update_all = False, when_not_matched_insert_all = True
                Existing row found → Do nothing (no updates)
                New row found → Insert it

                Case 3: when_matched_update_all = True, when_not_matched_insert_all = False
                Existing row found → Update it
                New row found → Do nothing (no inserts)

                Case 4: Both Parameters = False (No Merge Effect)
                Existing row found → Do nothing
                New row found → Do nothing
                (Function effectively does nothing)


    Returns:
        An UpsertResult class (contains details of rows updated and inserted)
    """
    with self.transaction() as tx:
        return tx.upsert(
            df=df,
            join_cols=join_cols,
            when_matched_update_all=when_matched_update_all,
            when_not_matched_insert_all=when_not_matched_insert_all,
            case_sensitive=case_sensitive,
            branch=branch,
        )

TableIdentifier

Bases: IcebergBaseModel

Fully Qualified identifier to a table.

Source code in pyiceberg/table/__init__.py
class TableIdentifier(IcebergBaseModel):
    """Fully Qualified identifier to a table."""

    namespace: Namespace
    name: str

TableScan

Bases: ABC

Source code in pyiceberg/table/__init__.py
class TableScan(ABC):
    table_metadata: TableMetadata
    io: FileIO
    row_filter: BooleanExpression
    selected_fields: Tuple[str, ...]
    case_sensitive: bool
    snapshot_id: Optional[int]
    options: Properties
    limit: Optional[int]

    def __init__(
        self,
        table_metadata: TableMetadata,
        io: FileIO,
        row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE,
        selected_fields: Tuple[str, ...] = ("*",),
        case_sensitive: bool = True,
        snapshot_id: Optional[int] = None,
        options: Properties = EMPTY_DICT,
        limit: Optional[int] = None,
    ):
        self.table_metadata = table_metadata
        self.io = io
        self.row_filter = _parse_row_filter(row_filter)
        self.selected_fields = selected_fields
        self.case_sensitive = case_sensitive
        self.snapshot_id = snapshot_id
        self.options = options
        self.limit = limit

    def snapshot(self) -> Optional[Snapshot]:
        if self.snapshot_id:
            return self.table_metadata.snapshot_by_id(self.snapshot_id)
        return self.table_metadata.current_snapshot()

    def projection(self) -> Schema:
        current_schema = self.table_metadata.schema()
        if self.snapshot_id is not None:
            snapshot = self.table_metadata.snapshot_by_id(self.snapshot_id)
            if snapshot is not None:
                if snapshot.schema_id is not None:
                    try:
                        current_schema = next(
                            schema for schema in self.table_metadata.schemas if schema.schema_id == snapshot.schema_id
                        )
                    except StopIteration:
                        warnings.warn(f"Metadata does not contain schema with id: {snapshot.schema_id}")
            else:
                raise ValueError(f"Snapshot not found: {self.snapshot_id}")

        if "*" in self.selected_fields:
            return current_schema

        return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive)

    @abstractmethod
    def plan_files(self) -> Iterable[ScanTask]: ...

    @abstractmethod
    def to_arrow(self) -> pa.Table: ...

    @abstractmethod
    def to_pandas(self, **kwargs: Any) -> pd.DataFrame: ...

    @abstractmethod
    def to_polars(self) -> pl.DataFrame: ...

    def update(self: S, **overrides: Any) -> S:
        """Create a copy of this table scan with updated fields."""
        from inspect import signature

        # Extract those attributes that are constructor parameters. We don't use self.__dict__ as the kwargs to the
        # constructors because it may contain additional attributes that are not part of the constructor signature.
        params = signature(type(self).__init__).parameters.keys() - {"self"}  # Skip "self" parameter
        kwargs = {param: getattr(self, param) for param in params}  # Assume parameters are attributes

        return type(self)(**{**kwargs, **overrides})

    def use_ref(self: S, name: str) -> S:
        if self.snapshot_id:
            raise ValueError(f"Cannot override ref, already set snapshot id={self.snapshot_id}")
        if snapshot := self.table_metadata.snapshot_by_name(name):
            return self.update(snapshot_id=snapshot.snapshot_id)

        raise ValueError(f"Cannot scan unknown ref={name}")

    def select(self: S, *field_names: str) -> S:
        if "*" in self.selected_fields:
            return self.update(selected_fields=field_names)
        return self.update(selected_fields=tuple(set(self.selected_fields).intersection(set(field_names))))

    def filter(self: S, expr: Union[str, BooleanExpression]) -> S:
        return self.update(row_filter=And(self.row_filter, _parse_row_filter(expr)))

    def with_case_sensitive(self: S, case_sensitive: bool = True) -> S:
        return self.update(case_sensitive=case_sensitive)

    @abstractmethod
    def count(self) -> int: ...

update(**overrides)

Create a copy of this table scan with updated fields.

Source code in pyiceberg/table/__init__.py
def update(self: S, **overrides: Any) -> S:
    """Create a copy of this table scan with updated fields."""
    from inspect import signature

    # Extract those attributes that are constructor parameters. We don't use self.__dict__ as the kwargs to the
    # constructors because it may contain additional attributes that are not part of the constructor signature.
    params = signature(type(self).__init__).parameters.keys() - {"self"}  # Skip "self" parameter
    kwargs = {param: getattr(self, param) for param in params}  # Assume parameters are attributes

    return type(self)(**{**kwargs, **overrides})

Transaction

Source code in pyiceberg/table/__init__.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
class Transaction:
    _table: Table
    _autocommit: bool
    _updates: Tuple[TableUpdate, ...]
    _requirements: Tuple[TableRequirement, ...]

    def __init__(self, table: Table, autocommit: bool = False):
        """Open a transaction to stage and commit changes to a table.

        Args:
            table: The table that will be altered.
            autocommit: Option to automatically commit the changes when they are staged.
        """
        self._table = table
        self._autocommit = autocommit
        self._updates = ()
        self._requirements = ()

    @property
    def table_metadata(self) -> TableMetadata:
        return update_table_metadata(self._table.metadata, self._updates)

    def __enter__(self) -> Transaction:
        """Start a transaction to update the table."""
        return self

    def __exit__(
        self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType]
    ) -> None:
        """Close and commit the transaction if no exceptions have been raised."""
        if exctype is None and excinst is None and exctb is None:
            self.commit_transaction()

    def _apply(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...] = ()) -> Transaction:
        """Check if the requirements are met, and applies the updates to the metadata."""
        for requirement in requirements:
            requirement.validate(self.table_metadata)

        self._updates += updates

        # For the requirements, it does not make sense to add a requirement more than once
        # For example, you cannot assert that the current schema has two different IDs
        existing_requirements = {type(requirement) for requirement in self._requirements}
        for new_requirement in requirements:
            if type(new_requirement) not in existing_requirements:
                self._requirements = self._requirements + (new_requirement,)

        if self._autocommit:
            self.commit_transaction()

        return self

    def _scan(self, row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, case_sensitive: bool = True) -> DataScan:
        """Minimal data scan of the table with the current state of the transaction."""
        return DataScan(
            table_metadata=self.table_metadata, io=self._table.io, row_filter=row_filter, case_sensitive=case_sensitive
        )

    def upgrade_table_version(self, format_version: TableVersion) -> Transaction:
        """Set the table to a certain version.

        Args:
            format_version: The newly set version.

        Returns:
            The alter table builder.
        """
        if format_version not in {1, 2}:
            raise ValueError(f"Unsupported table format version: {format_version}")

        if format_version < self.table_metadata.format_version:
            raise ValueError(f"Cannot downgrade v{self.table_metadata.format_version} table to v{format_version}")

        if format_version > self.table_metadata.format_version:
            return self._apply((UpgradeFormatVersionUpdate(format_version=format_version),))

        return self

    def set_properties(self, properties: Properties = EMPTY_DICT, **kwargs: Any) -> Transaction:
        """Set properties.

        When a property is already set, it will be overwritten.

        Args:
            properties: The properties set on the table.
            kwargs: properties can also be pass as kwargs.

        Returns:
            The alter table builder.
        """
        if properties and kwargs:
            raise ValueError("Cannot pass both properties and kwargs")
        updates = properties or kwargs
        return self._apply((SetPropertiesUpdate(updates=updates),))

    def _set_ref_snapshot(
        self,
        snapshot_id: int,
        ref_name: str,
        type: str,
        max_ref_age_ms: Optional[int] = None,
        max_snapshot_age_ms: Optional[int] = None,
        min_snapshots_to_keep: Optional[int] = None,
    ) -> UpdatesAndRequirements:
        """Update a ref to a snapshot.

        Returns:
            The updates and requirements for the set-snapshot-ref staged
        """
        updates = (
            SetSnapshotRefUpdate(
                snapshot_id=snapshot_id,
                ref_name=ref_name,
                type=type,
                max_ref_age_ms=max_ref_age_ms,
                max_snapshot_age_ms=max_snapshot_age_ms,
                min_snapshots_to_keep=min_snapshots_to_keep,
            ),
        )
        requirements = (
            AssertRefSnapshotId(
                snapshot_id=self.table_metadata.refs[ref_name].snapshot_id if ref_name in self.table_metadata.refs else None,
                ref=ref_name,
            ),
        )

        return updates, requirements

    def _build_partition_predicate(self, partition_records: Set[Record]) -> BooleanExpression:
        """Build a filter predicate matching any of the input partition records.

        Args:
            partition_records: A set of partition records to match
        Returns:
            A predicate matching any of the input partition records.
        """
        partition_spec = self.table_metadata.spec()
        schema = self.table_metadata.schema()
        partition_fields = [schema.find_field(field.source_id).name for field in partition_spec.fields]

        expr: BooleanExpression = AlwaysFalse()
        for partition_record in partition_records:
            match_partition_expression: BooleanExpression = AlwaysTrue()

            for pos, partition_field in enumerate(partition_fields):
                predicate = (
                    EqualTo(Reference(partition_field), partition_record[pos])
                    if partition_record[pos] is not None
                    else IsNull(Reference(partition_field))
                )
                match_partition_expression = And(match_partition_expression, predicate)
            expr = Or(expr, match_partition_expression)
        return expr

    def _append_snapshot_producer(
        self, snapshot_properties: Dict[str, str], branch: Optional[str] = MAIN_BRANCH
    ) -> _FastAppendFiles:
        """Determine the append type based on table properties.

        Args:
            snapshot_properties: Custom properties to be added to the snapshot summary
        Returns:
            Either a fast-append or a merge-append snapshot producer.
        """
        manifest_merge_enabled = property_as_bool(
            self.table_metadata.properties,
            TableProperties.MANIFEST_MERGE_ENABLED,
            TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT,
        )
        update_snapshot = self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch)
        return update_snapshot.merge_append() if manifest_merge_enabled else update_snapshot.fast_append()

    def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema:
        """Create a new UpdateSchema to alter the columns of this table.

        Args:
            allow_incompatible_changes: If changes are allowed that might break downstream consumers.
            case_sensitive: If field names are case-sensitive.

        Returns:
            A new UpdateSchema.
        """
        return UpdateSchema(
            self,
            allow_incompatible_changes=allow_incompatible_changes,
            case_sensitive=case_sensitive,
            name_mapping=self.table_metadata.name_mapping(),
        )

    def update_snapshot(
        self, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: Optional[str] = MAIN_BRANCH
    ) -> UpdateSnapshot:
        """Create a new UpdateSnapshot to produce a new snapshot for the table.

        Returns:
            A new UpdateSnapshot
        """
        return UpdateSnapshot(self, io=self._table.io, branch=branch, snapshot_properties=snapshot_properties)

    def update_statistics(self) -> UpdateStatistics:
        """
        Create a new UpdateStatistics to update the statistics of the table.

        Returns:
            A new UpdateStatistics
        """
        return UpdateStatistics(transaction=self)

    def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: Optional[str] = MAIN_BRANCH) -> None:
        """
        Shorthand API for appending a PyArrow table to a table transaction.

        Args:
            df: The Arrow dataframe that will be appended to overwrite the table
            snapshot_properties: Custom properties to be added to the snapshot summary
            branch: Branch Reference to run the append operation
        """
        try:
            import pyarrow as pa
        except ModuleNotFoundError as e:
            raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e

        from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _dataframe_to_data_files

        if not isinstance(df, pa.Table):
            raise ValueError(f"Expected PyArrow table, got: {df}")

        downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
        _check_pyarrow_schema_compatible(
            self.table_metadata.schema(),
            provided_schema=df.schema,
            downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
            format_version=self.table_metadata.format_version,
        )

        with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files:
            # skip writing data files if the dataframe is empty
            if df.shape[0] > 0:
                data_files = list(
                    _dataframe_to_data_files(
                        table_metadata=self.table_metadata, write_uuid=append_files.commit_uuid, df=df, io=self._table.io
                    )
                )
                for data_file in data_files:
                    append_files.append_data_file(data_file)

    def dynamic_partition_overwrite(
        self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: Optional[str] = MAIN_BRANCH
    ) -> None:
        """
        Shorthand for overwriting existing partitions with a PyArrow table.

        The function detects partition values in the provided arrow table using the current
        partition spec, and deletes existing partitions matching these values. Finally, the
        data in the table is appended to the table.

        Args:
            df: The Arrow dataframe that will be used to overwrite the table
            snapshot_properties: Custom properties to be added to the snapshot summary
            branch: Branch Reference to run the dynamic partition overwrite operation
        """
        try:
            import pyarrow as pa
        except ModuleNotFoundError as e:
            raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e

        from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _dataframe_to_data_files

        if not isinstance(df, pa.Table):
            raise ValueError(f"Expected PyArrow table, got: {df}")

        if self.table_metadata.spec().is_unpartitioned():
            raise ValueError("Cannot apply dynamic overwrite on an unpartitioned table.")

        for field in self.table_metadata.spec().fields:
            if not isinstance(field.transform, IdentityTransform):
                raise ValueError(
                    f"For now dynamic overwrite does not support a table with non-identity-transform field in the latest partition spec: {field}"
                )

        downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
        _check_pyarrow_schema_compatible(
            self.table_metadata.schema(),
            provided_schema=df.schema,
            downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
            format_version=self.table_metadata.format_version,
        )

        # If dataframe does not have data, there is no need to overwrite
        if df.shape[0] == 0:
            return

        append_snapshot_commit_uuid = uuid.uuid4()
        data_files: List[DataFile] = list(
            _dataframe_to_data_files(
                table_metadata=self._table.metadata, write_uuid=append_snapshot_commit_uuid, df=df, io=self._table.io
            )
        )

        partitions_to_overwrite = {data_file.partition for data_file in data_files}
        delete_filter = self._build_partition_predicate(partition_records=partitions_to_overwrite)
        self.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties, branch=branch)

        with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files:
            append_files.commit_uuid = append_snapshot_commit_uuid
            for data_file in data_files:
                append_files.append_data_file(data_file)

    def overwrite(
        self,
        df: pa.Table,
        overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
        snapshot_properties: Dict[str, str] = EMPTY_DICT,
        case_sensitive: bool = True,
        branch: Optional[str] = MAIN_BRANCH,
    ) -> None:
        """
        Shorthand for adding a table overwrite with a PyArrow table to the transaction.

        An overwrite may produce zero or more snapshots based on the operation:

            - DELETE: In case existing Parquet files can be dropped completely.
            - REPLACE: In case existing Parquet files need to be rewritten.
            - APPEND: In case new data is being inserted into the table.

        Args:
            df: The Arrow dataframe that will be used to overwrite the table
            overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
                              or a boolean expression in case of a partial overwrite
            snapshot_properties: Custom properties to be added to the snapshot summary
            case_sensitive: A bool determine if the provided `overwrite_filter` is case-sensitive
            branch: Branch Reference to run the overwrite operation
        """
        try:
            import pyarrow as pa
        except ModuleNotFoundError as e:
            raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e

        from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _dataframe_to_data_files

        if not isinstance(df, pa.Table):
            raise ValueError(f"Expected PyArrow table, got: {df}")

        downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
        _check_pyarrow_schema_compatible(
            self.table_metadata.schema(),
            provided_schema=df.schema,
            downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
            format_version=self.table_metadata.format_version,
        )

        if overwrite_filter != AlwaysFalse():
            # Only delete when the filter is != AlwaysFalse
            self.delete(
                delete_filter=overwrite_filter,
                case_sensitive=case_sensitive,
                snapshot_properties=snapshot_properties,
                branch=branch,
            )

        with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files:
            # skip writing data files if the dataframe is empty
            if df.shape[0] > 0:
                data_files = _dataframe_to_data_files(
                    table_metadata=self.table_metadata, write_uuid=append_files.commit_uuid, df=df, io=self._table.io
                )
                for data_file in data_files:
                    append_files.append_data_file(data_file)

    def delete(
        self,
        delete_filter: Union[str, BooleanExpression],
        snapshot_properties: Dict[str, str] = EMPTY_DICT,
        case_sensitive: bool = True,
        branch: Optional[str] = MAIN_BRANCH,
    ) -> None:
        """
        Shorthand for deleting record from a table.

        A delete may produce zero or more snapshots based on the operation:

            - DELETE: In case existing Parquet files can be dropped completely.
            - REPLACE: In case existing Parquet files need to be rewritten

        Args:
            delete_filter: A boolean expression to delete rows from a table
            snapshot_properties: Custom properties to be added to the snapshot summary
            case_sensitive: A bool determine if the provided `delete_filter` is case-sensitive
            branch: Branch Reference to run the delete operation
        """
        from pyiceberg.io.pyarrow import (
            ArrowScan,
            _dataframe_to_data_files,
            _expression_to_complementary_pyarrow,
        )

        if (
            self.table_metadata.properties.get(TableProperties.DELETE_MODE, TableProperties.DELETE_MODE_DEFAULT)
            == TableProperties.DELETE_MODE_MERGE_ON_READ
        ):
            warnings.warn("Merge on read is not yet supported, falling back to copy-on-write")

        if isinstance(delete_filter, str):
            delete_filter = _parse_row_filter(delete_filter)

        with self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch).delete() as delete_snapshot:
            delete_snapshot.delete_by_predicate(delete_filter, case_sensitive)

        # Check if there are any files that require an actual rewrite of a data file
        if delete_snapshot.rewrites_needed is True:
            bound_delete_filter = bind(self.table_metadata.schema(), delete_filter, case_sensitive)
            preserve_row_filter = _expression_to_complementary_pyarrow(bound_delete_filter)

            file_scan = self._scan(row_filter=delete_filter, case_sensitive=case_sensitive)
            if branch is not None:
                file_scan = file_scan.use_ref(branch)
            files = file_scan.plan_files()

            commit_uuid = uuid.uuid4()
            counter = itertools.count(0)

            replaced_files: List[Tuple[DataFile, List[DataFile]]] = []
            # This will load the Parquet file into memory, including:
            #   - Filter out the rows based on the delete filter
            #   - Projecting it to the current schema
            #   - Applying the positional deletes if they are there
            # When writing
            #   - Apply the latest partition-spec
            #   - And sort order when added
            for original_file in files:
                df = ArrowScan(
                    table_metadata=self.table_metadata,
                    io=self._table.io,
                    projected_schema=self.table_metadata.schema(),
                    row_filter=AlwaysTrue(),
                ).to_table(tasks=[original_file])
                filtered_df = df.filter(preserve_row_filter)

                # Only rewrite if there are records being deleted
                if len(filtered_df) == 0:
                    replaced_files.append((original_file.file, []))
                elif len(df) != len(filtered_df):
                    replaced_files.append(
                        (
                            original_file.file,
                            list(
                                _dataframe_to_data_files(
                                    io=self._table.io,
                                    df=filtered_df,
                                    table_metadata=self.table_metadata,
                                    write_uuid=commit_uuid,
                                    counter=counter,
                                )
                            ),
                        )
                    )

            if len(replaced_files) > 0:
                with self.update_snapshot(
                    snapshot_properties=snapshot_properties, branch=branch
                ).overwrite() as overwrite_snapshot:
                    overwrite_snapshot.commit_uuid = commit_uuid
                    for original_data_file, replaced_data_files in replaced_files:
                        overwrite_snapshot.delete_data_file(original_data_file)
                        for replaced_data_file in replaced_data_files:
                            overwrite_snapshot.append_data_file(replaced_data_file)

        if not delete_snapshot.files_affected and not delete_snapshot.rewrites_needed:
            warnings.warn("Delete operation did not match any records")

    def upsert(
        self,
        df: pa.Table,
        join_cols: Optional[List[str]] = None,
        when_matched_update_all: bool = True,
        when_not_matched_insert_all: bool = True,
        case_sensitive: bool = True,
        branch: Optional[str] = MAIN_BRANCH,
    ) -> UpsertResult:
        """Shorthand API for performing an upsert to an iceberg table.

        Args:

            df: The input dataframe to upsert with the table's data.
            join_cols: Columns to join on, if not provided, it will use the identifier-field-ids.
            when_matched_update_all: Bool indicating to update rows that are matched but require an update due to a value in a non-key column changing
            when_not_matched_insert_all: Bool indicating new rows to be inserted that do not match any existing rows in the table
            case_sensitive: Bool indicating if the match should be case-sensitive
            branch: Branch Reference to run the upsert operation

            To learn more about the identifier-field-ids: https://iceberg.apache.org/spec/#identifier-field-ids

                Example Use Cases:
                    Case 1: Both Parameters = True (Full Upsert)
                    Existing row found → Update it
                    New row found → Insert it

                    Case 2: when_matched_update_all = False, when_not_matched_insert_all = True
                    Existing row found → Do nothing (no updates)
                    New row found → Insert it

                    Case 3: when_matched_update_all = True, when_not_matched_insert_all = False
                    Existing row found → Update it
                    New row found → Do nothing (no inserts)

                    Case 4: Both Parameters = False (No Merge Effect)
                    Existing row found → Do nothing
                    New row found → Do nothing
                    (Function effectively does nothing)


        Returns:
            An UpsertResult class (contains details of rows updated and inserted)
        """
        try:
            import pyarrow as pa  # noqa: F401
        except ModuleNotFoundError as e:
            raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e

        from pyiceberg.io.pyarrow import expression_to_pyarrow
        from pyiceberg.table import upsert_util

        if join_cols is None:
            join_cols = []
            for field_id in self.table_metadata.schema().identifier_field_ids:
                col = self.table_metadata.schema().find_column_name(field_id)
                if col is not None:
                    join_cols.append(col)
                else:
                    raise ValueError(f"Field-ID could not be found: {join_cols}")

        if len(join_cols) == 0:
            raise ValueError("Join columns could not be found, please set identifier-field-ids or pass in explicitly.")

        if not when_matched_update_all and not when_not_matched_insert_all:
            raise ValueError("no upsert options selected...exiting")

        if upsert_util.has_duplicate_rows(df, join_cols):
            raise ValueError("Duplicate rows found in source dataset based on the key columns. No upsert executed")

        from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible

        downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
        _check_pyarrow_schema_compatible(
            self.table_metadata.schema(),
            provided_schema=df.schema,
            downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
            format_version=self.table_metadata.format_version,
        )

        # get list of rows that exist so we don't have to load the entire target table
        matched_predicate = upsert_util.create_match_filter(df, join_cols)

        # We must use Transaction.table_metadata for the scan. This includes all uncommitted - but relevant - changes.

        matched_iceberg_record_batches_scan = DataScan(
            table_metadata=self.table_metadata,
            io=self._table.io,
            row_filter=matched_predicate,
            case_sensitive=case_sensitive,
        )

        if branch in self.table_metadata.refs:
            matched_iceberg_record_batches_scan = matched_iceberg_record_batches_scan.use_ref(branch)

        matched_iceberg_record_batches = matched_iceberg_record_batches_scan.to_arrow_batch_reader()

        batches_to_overwrite = []
        overwrite_predicates = []
        rows_to_insert = df

        for batch in matched_iceberg_record_batches:
            rows = pa.Table.from_batches([batch])

            if when_matched_update_all:
                # function get_rows_to_update is doing a check on non-key columns to see if any of the values have actually changed
                # we don't want to do just a blanket overwrite for matched rows if the actual non-key column data hasn't changed
                # this extra step avoids unnecessary IO and writes
                rows_to_update = upsert_util.get_rows_to_update(df, rows, join_cols)

                if len(rows_to_update) > 0:
                    # build the match predicate filter
                    overwrite_mask_predicate = upsert_util.create_match_filter(rows_to_update, join_cols)

                    batches_to_overwrite.append(rows_to_update)
                    overwrite_predicates.append(overwrite_mask_predicate)

            if when_not_matched_insert_all:
                expr_match = upsert_util.create_match_filter(rows, join_cols)
                expr_match_bound = bind(self.table_metadata.schema(), expr_match, case_sensitive=case_sensitive)
                expr_match_arrow = expression_to_pyarrow(expr_match_bound)

                # Filter rows per batch.
                rows_to_insert = rows_to_insert.filter(~expr_match_arrow)

        update_row_cnt = 0
        insert_row_cnt = 0

        if batches_to_overwrite:
            rows_to_update = pa.concat_tables(batches_to_overwrite)
            update_row_cnt = len(rows_to_update)
            self.overwrite(
                rows_to_update,
                overwrite_filter=Or(*overwrite_predicates) if len(overwrite_predicates) > 1 else overwrite_predicates[0],
                branch=branch,
            )

        if when_not_matched_insert_all:
            insert_row_cnt = len(rows_to_insert)
            if rows_to_insert:
                self.append(rows_to_insert, branch=branch)

        return UpsertResult(rows_updated=update_row_cnt, rows_inserted=insert_row_cnt)

    def add_files(
        self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True
    ) -> None:
        """
        Shorthand API for adding files as data files to the table transaction.

        Args:
            file_paths: The list of full file paths to be added as data files to the table

        Raises:
            FileNotFoundError: If the file does not exist.
            ValueError: Raises a ValueError given file_paths contains duplicate files
            ValueError: Raises a ValueError given file_paths already referenced by table
        """
        if len(file_paths) != len(set(file_paths)):
            raise ValueError("File paths must be unique")

        if check_duplicate_files:
            import pyarrow.compute as pc

            expr = pc.field("file_path").isin(file_paths)
            referenced_files = [file["file_path"] for file in self._table.inspect.data_files().filter(expr).to_pylist()]

            if referenced_files:
                raise ValueError(f"Cannot add files that are already referenced by table, files: {', '.join(referenced_files)}")

        if self.table_metadata.name_mapping() is None:
            self.set_properties(
                **{TableProperties.DEFAULT_NAME_MAPPING: self.table_metadata.schema().name_mapping.model_dump_json()}
            )
        with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot:
            data_files = _parquet_files_to_data_files(
                table_metadata=self.table_metadata, file_paths=file_paths, io=self._table.io
            )
            for data_file in data_files:
                update_snapshot.append_data_file(data_file)

    def update_spec(self) -> UpdateSpec:
        """Create a new UpdateSpec to update the partitioning of the table.

        Returns:
            A new UpdateSpec.
        """
        return UpdateSpec(self)

    def remove_properties(self, *removals: str) -> Transaction:
        """Remove properties.

        Args:
            removals: Properties to be removed.

        Returns:
            The alter table builder.
        """
        return self._apply((RemovePropertiesUpdate(removals=removals),))

    def update_location(self, location: str) -> Transaction:
        """Set the new table location.

        Args:
            location: The new location of the table.

        Returns:
            The alter table builder.
        """
        raise NotImplementedError("Not yet implemented")

    def commit_transaction(self) -> Table:
        """Commit the changes to the catalog.

        Returns:
            The table with the updates applied.
        """
        if len(self._updates) > 0:
            self._requirements += (AssertTableUUID(uuid=self.table_metadata.table_uuid),)
            self._table._do_commit(  # pylint: disable=W0212
                updates=self._updates,
                requirements=self._requirements,
            )

        self._updates = ()
        self._requirements = ()

        return self._table

__enter__()

Start a transaction to update the table.

Source code in pyiceberg/table/__init__.py
def __enter__(self) -> Transaction:
    """Start a transaction to update the table."""
    return self

__exit__(exctype, excinst, exctb)

Close and commit the transaction if no exceptions have been raised.

Source code in pyiceberg/table/__init__.py
def __exit__(
    self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType]
) -> None:
    """Close and commit the transaction if no exceptions have been raised."""
    if exctype is None and excinst is None and exctb is None:
        self.commit_transaction()

__init__(table, autocommit=False)

Open a transaction to stage and commit changes to a table.

Parameters:

Name Type Description Default
table Table

The table that will be altered.

required
autocommit bool

Option to automatically commit the changes when they are staged.

False
Source code in pyiceberg/table/__init__.py
def __init__(self, table: Table, autocommit: bool = False):
    """Open a transaction to stage and commit changes to a table.

    Args:
        table: The table that will be altered.
        autocommit: Option to automatically commit the changes when they are staged.
    """
    self._table = table
    self._autocommit = autocommit
    self._updates = ()
    self._requirements = ()

add_files(file_paths, snapshot_properties=EMPTY_DICT, check_duplicate_files=True)

Shorthand API for adding files as data files to the table transaction.

Parameters:

Name Type Description Default
file_paths List[str]

The list of full file paths to be added as data files to the table

required

Raises:

Type Description
FileNotFoundError

If the file does not exist.

ValueError

Raises a ValueError given file_paths contains duplicate files

ValueError

Raises a ValueError given file_paths already referenced by table

Source code in pyiceberg/table/__init__.py
def add_files(
    self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True
) -> None:
    """
    Shorthand API for adding files as data files to the table transaction.

    Args:
        file_paths: The list of full file paths to be added as data files to the table

    Raises:
        FileNotFoundError: If the file does not exist.
        ValueError: Raises a ValueError given file_paths contains duplicate files
        ValueError: Raises a ValueError given file_paths already referenced by table
    """
    if len(file_paths) != len(set(file_paths)):
        raise ValueError("File paths must be unique")

    if check_duplicate_files:
        import pyarrow.compute as pc

        expr = pc.field("file_path").isin(file_paths)
        referenced_files = [file["file_path"] for file in self._table.inspect.data_files().filter(expr).to_pylist()]

        if referenced_files:
            raise ValueError(f"Cannot add files that are already referenced by table, files: {', '.join(referenced_files)}")

    if self.table_metadata.name_mapping() is None:
        self.set_properties(
            **{TableProperties.DEFAULT_NAME_MAPPING: self.table_metadata.schema().name_mapping.model_dump_json()}
        )
    with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot:
        data_files = _parquet_files_to_data_files(
            table_metadata=self.table_metadata, file_paths=file_paths, io=self._table.io
        )
        for data_file in data_files:
            update_snapshot.append_data_file(data_file)

append(df, snapshot_properties=EMPTY_DICT, branch=MAIN_BRANCH)

Shorthand API for appending a PyArrow table to a table transaction.

Parameters:

Name Type Description Default
df Table

The Arrow dataframe that will be appended to overwrite the table

required
snapshot_properties Dict[str, str]

Custom properties to be added to the snapshot summary

EMPTY_DICT
branch Optional[str]

Branch Reference to run the append operation

MAIN_BRANCH
Source code in pyiceberg/table/__init__.py
def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: Optional[str] = MAIN_BRANCH) -> None:
    """
    Shorthand API for appending a PyArrow table to a table transaction.

    Args:
        df: The Arrow dataframe that will be appended to overwrite the table
        snapshot_properties: Custom properties to be added to the snapshot summary
        branch: Branch Reference to run the append operation
    """
    try:
        import pyarrow as pa
    except ModuleNotFoundError as e:
        raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e

    from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _dataframe_to_data_files

    if not isinstance(df, pa.Table):
        raise ValueError(f"Expected PyArrow table, got: {df}")

    downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
    _check_pyarrow_schema_compatible(
        self.table_metadata.schema(),
        provided_schema=df.schema,
        downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
        format_version=self.table_metadata.format_version,
    )

    with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files:
        # skip writing data files if the dataframe is empty
        if df.shape[0] > 0:
            data_files = list(
                _dataframe_to_data_files(
                    table_metadata=self.table_metadata, write_uuid=append_files.commit_uuid, df=df, io=self._table.io
                )
            )
            for data_file in data_files:
                append_files.append_data_file(data_file)

commit_transaction()

Commit the changes to the catalog.

Returns:

Type Description
Table

The table with the updates applied.

Source code in pyiceberg/table/__init__.py
def commit_transaction(self) -> Table:
    """Commit the changes to the catalog.

    Returns:
        The table with the updates applied.
    """
    if len(self._updates) > 0:
        self._requirements += (AssertTableUUID(uuid=self.table_metadata.table_uuid),)
        self._table._do_commit(  # pylint: disable=W0212
            updates=self._updates,
            requirements=self._requirements,
        )

    self._updates = ()
    self._requirements = ()

    return self._table

delete(delete_filter, snapshot_properties=EMPTY_DICT, case_sensitive=True, branch=MAIN_BRANCH)

Shorthand for deleting record from a table.

A delete may produce zero or more snapshots based on the operation:

- DELETE: In case existing Parquet files can be dropped completely.
- REPLACE: In case existing Parquet files need to be rewritten

Parameters:

Name Type Description Default
delete_filter Union[str, BooleanExpression]

A boolean expression to delete rows from a table

required
snapshot_properties Dict[str, str]

Custom properties to be added to the snapshot summary

EMPTY_DICT
case_sensitive bool

A bool determine if the provided delete_filter is case-sensitive

True
branch Optional[str]

Branch Reference to run the delete operation

MAIN_BRANCH
Source code in pyiceberg/table/__init__.py
def delete(
    self,
    delete_filter: Union[str, BooleanExpression],
    snapshot_properties: Dict[str, str] = EMPTY_DICT,
    case_sensitive: bool = True,
    branch: Optional[str] = MAIN_BRANCH,
) -> None:
    """
    Shorthand for deleting record from a table.

    A delete may produce zero or more snapshots based on the operation:

        - DELETE: In case existing Parquet files can be dropped completely.
        - REPLACE: In case existing Parquet files need to be rewritten

    Args:
        delete_filter: A boolean expression to delete rows from a table
        snapshot_properties: Custom properties to be added to the snapshot summary
        case_sensitive: A bool determine if the provided `delete_filter` is case-sensitive
        branch: Branch Reference to run the delete operation
    """
    from pyiceberg.io.pyarrow import (
        ArrowScan,
        _dataframe_to_data_files,
        _expression_to_complementary_pyarrow,
    )

    if (
        self.table_metadata.properties.get(TableProperties.DELETE_MODE, TableProperties.DELETE_MODE_DEFAULT)
        == TableProperties.DELETE_MODE_MERGE_ON_READ
    ):
        warnings.warn("Merge on read is not yet supported, falling back to copy-on-write")

    if isinstance(delete_filter, str):
        delete_filter = _parse_row_filter(delete_filter)

    with self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch).delete() as delete_snapshot:
        delete_snapshot.delete_by_predicate(delete_filter, case_sensitive)

    # Check if there are any files that require an actual rewrite of a data file
    if delete_snapshot.rewrites_needed is True:
        bound_delete_filter = bind(self.table_metadata.schema(), delete_filter, case_sensitive)
        preserve_row_filter = _expression_to_complementary_pyarrow(bound_delete_filter)

        file_scan = self._scan(row_filter=delete_filter, case_sensitive=case_sensitive)
        if branch is not None:
            file_scan = file_scan.use_ref(branch)
        files = file_scan.plan_files()

        commit_uuid = uuid.uuid4()
        counter = itertools.count(0)

        replaced_files: List[Tuple[DataFile, List[DataFile]]] = []
        # This will load the Parquet file into memory, including:
        #   - Filter out the rows based on the delete filter
        #   - Projecting it to the current schema
        #   - Applying the positional deletes if they are there
        # When writing
        #   - Apply the latest partition-spec
        #   - And sort order when added
        for original_file in files:
            df = ArrowScan(
                table_metadata=self.table_metadata,
                io=self._table.io,
                projected_schema=self.table_metadata.schema(),
                row_filter=AlwaysTrue(),
            ).to_table(tasks=[original_file])
            filtered_df = df.filter(preserve_row_filter)

            # Only rewrite if there are records being deleted
            if len(filtered_df) == 0:
                replaced_files.append((original_file.file, []))
            elif len(df) != len(filtered_df):
                replaced_files.append(
                    (
                        original_file.file,
                        list(
                            _dataframe_to_data_files(
                                io=self._table.io,
                                df=filtered_df,
                                table_metadata=self.table_metadata,
                                write_uuid=commit_uuid,
                                counter=counter,
                            )
                        ),
                    )
                )

        if len(replaced_files) > 0:
            with self.update_snapshot(
                snapshot_properties=snapshot_properties, branch=branch
            ).overwrite() as overwrite_snapshot:
                overwrite_snapshot.commit_uuid = commit_uuid
                for original_data_file, replaced_data_files in replaced_files:
                    overwrite_snapshot.delete_data_file(original_data_file)
                    for replaced_data_file in replaced_data_files:
                        overwrite_snapshot.append_data_file(replaced_data_file)

    if not delete_snapshot.files_affected and not delete_snapshot.rewrites_needed:
        warnings.warn("Delete operation did not match any records")

dynamic_partition_overwrite(df, snapshot_properties=EMPTY_DICT, branch=MAIN_BRANCH)

Shorthand for overwriting existing partitions with a PyArrow table.

The function detects partition values in the provided arrow table using the current partition spec, and deletes existing partitions matching these values. Finally, the data in the table is appended to the table.

Parameters:

Name Type Description Default
df Table

The Arrow dataframe that will be used to overwrite the table

required
snapshot_properties Dict[str, str]

Custom properties to be added to the snapshot summary

EMPTY_DICT
branch Optional[str]

Branch Reference to run the dynamic partition overwrite operation

MAIN_BRANCH
Source code in pyiceberg/table/__init__.py
def dynamic_partition_overwrite(
    self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: Optional[str] = MAIN_BRANCH
) -> None:
    """
    Shorthand for overwriting existing partitions with a PyArrow table.

    The function detects partition values in the provided arrow table using the current
    partition spec, and deletes existing partitions matching these values. Finally, the
    data in the table is appended to the table.

    Args:
        df: The Arrow dataframe that will be used to overwrite the table
        snapshot_properties: Custom properties to be added to the snapshot summary
        branch: Branch Reference to run the dynamic partition overwrite operation
    """
    try:
        import pyarrow as pa
    except ModuleNotFoundError as e:
        raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e

    from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _dataframe_to_data_files

    if not isinstance(df, pa.Table):
        raise ValueError(f"Expected PyArrow table, got: {df}")

    if self.table_metadata.spec().is_unpartitioned():
        raise ValueError("Cannot apply dynamic overwrite on an unpartitioned table.")

    for field in self.table_metadata.spec().fields:
        if not isinstance(field.transform, IdentityTransform):
            raise ValueError(
                f"For now dynamic overwrite does not support a table with non-identity-transform field in the latest partition spec: {field}"
            )

    downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
    _check_pyarrow_schema_compatible(
        self.table_metadata.schema(),
        provided_schema=df.schema,
        downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
        format_version=self.table_metadata.format_version,
    )

    # If dataframe does not have data, there is no need to overwrite
    if df.shape[0] == 0:
        return

    append_snapshot_commit_uuid = uuid.uuid4()
    data_files: List[DataFile] = list(
        _dataframe_to_data_files(
            table_metadata=self._table.metadata, write_uuid=append_snapshot_commit_uuid, df=df, io=self._table.io
        )
    )

    partitions_to_overwrite = {data_file.partition for data_file in data_files}
    delete_filter = self._build_partition_predicate(partition_records=partitions_to_overwrite)
    self.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties, branch=branch)

    with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files:
        append_files.commit_uuid = append_snapshot_commit_uuid
        for data_file in data_files:
            append_files.append_data_file(data_file)

overwrite(df, overwrite_filter=ALWAYS_TRUE, snapshot_properties=EMPTY_DICT, case_sensitive=True, branch=MAIN_BRANCH)

Shorthand for adding a table overwrite with a PyArrow table to the transaction.

An overwrite may produce zero or more snapshots based on the operation:

- DELETE: In case existing Parquet files can be dropped completely.
- REPLACE: In case existing Parquet files need to be rewritten.
- APPEND: In case new data is being inserted into the table.

Parameters:

Name Type Description Default
df Table

The Arrow dataframe that will be used to overwrite the table

required
overwrite_filter Union[BooleanExpression, str]

ALWAYS_TRUE when you overwrite all the data, or a boolean expression in case of a partial overwrite

ALWAYS_TRUE
snapshot_properties Dict[str, str]

Custom properties to be added to the snapshot summary

EMPTY_DICT
case_sensitive bool

A bool determine if the provided overwrite_filter is case-sensitive

True
branch Optional[str]

Branch Reference to run the overwrite operation

MAIN_BRANCH
Source code in pyiceberg/table/__init__.py
def overwrite(
    self,
    df: pa.Table,
    overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
    snapshot_properties: Dict[str, str] = EMPTY_DICT,
    case_sensitive: bool = True,
    branch: Optional[str] = MAIN_BRANCH,
) -> None:
    """
    Shorthand for adding a table overwrite with a PyArrow table to the transaction.

    An overwrite may produce zero or more snapshots based on the operation:

        - DELETE: In case existing Parquet files can be dropped completely.
        - REPLACE: In case existing Parquet files need to be rewritten.
        - APPEND: In case new data is being inserted into the table.

    Args:
        df: The Arrow dataframe that will be used to overwrite the table
        overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
                          or a boolean expression in case of a partial overwrite
        snapshot_properties: Custom properties to be added to the snapshot summary
        case_sensitive: A bool determine if the provided `overwrite_filter` is case-sensitive
        branch: Branch Reference to run the overwrite operation
    """
    try:
        import pyarrow as pa
    except ModuleNotFoundError as e:
        raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e

    from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _dataframe_to_data_files

    if not isinstance(df, pa.Table):
        raise ValueError(f"Expected PyArrow table, got: {df}")

    downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
    _check_pyarrow_schema_compatible(
        self.table_metadata.schema(),
        provided_schema=df.schema,
        downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
        format_version=self.table_metadata.format_version,
    )

    if overwrite_filter != AlwaysFalse():
        # Only delete when the filter is != AlwaysFalse
        self.delete(
            delete_filter=overwrite_filter,
            case_sensitive=case_sensitive,
            snapshot_properties=snapshot_properties,
            branch=branch,
        )

    with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files:
        # skip writing data files if the dataframe is empty
        if df.shape[0] > 0:
            data_files = _dataframe_to_data_files(
                table_metadata=self.table_metadata, write_uuid=append_files.commit_uuid, df=df, io=self._table.io
            )
            for data_file in data_files:
                append_files.append_data_file(data_file)

remove_properties(*removals)

Remove properties.

Parameters:

Name Type Description Default
removals str

Properties to be removed.

()

Returns:

Type Description
Transaction

The alter table builder.

Source code in pyiceberg/table/__init__.py
def remove_properties(self, *removals: str) -> Transaction:
    """Remove properties.

    Args:
        removals: Properties to be removed.

    Returns:
        The alter table builder.
    """
    return self._apply((RemovePropertiesUpdate(removals=removals),))

set_properties(properties=EMPTY_DICT, **kwargs)

Set properties.

When a property is already set, it will be overwritten.

Parameters:

Name Type Description Default
properties Properties

The properties set on the table.

EMPTY_DICT
kwargs Any

properties can also be pass as kwargs.

{}

Returns:

Type Description
Transaction

The alter table builder.

Source code in pyiceberg/table/__init__.py
def set_properties(self, properties: Properties = EMPTY_DICT, **kwargs: Any) -> Transaction:
    """Set properties.

    When a property is already set, it will be overwritten.

    Args:
        properties: The properties set on the table.
        kwargs: properties can also be pass as kwargs.

    Returns:
        The alter table builder.
    """
    if properties and kwargs:
        raise ValueError("Cannot pass both properties and kwargs")
    updates = properties or kwargs
    return self._apply((SetPropertiesUpdate(updates=updates),))

update_location(location)

Set the new table location.

Parameters:

Name Type Description Default
location str

The new location of the table.

required

Returns:

Type Description
Transaction

The alter table builder.

Source code in pyiceberg/table/__init__.py
def update_location(self, location: str) -> Transaction:
    """Set the new table location.

    Args:
        location: The new location of the table.

    Returns:
        The alter table builder.
    """
    raise NotImplementedError("Not yet implemented")

update_schema(allow_incompatible_changes=False, case_sensitive=True)

Create a new UpdateSchema to alter the columns of this table.

Parameters:

Name Type Description Default
allow_incompatible_changes bool

If changes are allowed that might break downstream consumers.

False
case_sensitive bool

If field names are case-sensitive.

True

Returns:

Type Description
UpdateSchema

A new UpdateSchema.

Source code in pyiceberg/table/__init__.py
def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema:
    """Create a new UpdateSchema to alter the columns of this table.

    Args:
        allow_incompatible_changes: If changes are allowed that might break downstream consumers.
        case_sensitive: If field names are case-sensitive.

    Returns:
        A new UpdateSchema.
    """
    return UpdateSchema(
        self,
        allow_incompatible_changes=allow_incompatible_changes,
        case_sensitive=case_sensitive,
        name_mapping=self.table_metadata.name_mapping(),
    )

update_snapshot(snapshot_properties=EMPTY_DICT, branch=MAIN_BRANCH)

Create a new UpdateSnapshot to produce a new snapshot for the table.

Returns:

Type Description
UpdateSnapshot

A new UpdateSnapshot

Source code in pyiceberg/table/__init__.py
def update_snapshot(
    self, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: Optional[str] = MAIN_BRANCH
) -> UpdateSnapshot:
    """Create a new UpdateSnapshot to produce a new snapshot for the table.

    Returns:
        A new UpdateSnapshot
    """
    return UpdateSnapshot(self, io=self._table.io, branch=branch, snapshot_properties=snapshot_properties)

update_spec()

Create a new UpdateSpec to update the partitioning of the table.

Returns:

Type Description
UpdateSpec

A new UpdateSpec.

Source code in pyiceberg/table/__init__.py
def update_spec(self) -> UpdateSpec:
    """Create a new UpdateSpec to update the partitioning of the table.

    Returns:
        A new UpdateSpec.
    """
    return UpdateSpec(self)

update_statistics()

Create a new UpdateStatistics to update the statistics of the table.

Returns:

Type Description
UpdateStatistics

A new UpdateStatistics

Source code in pyiceberg/table/__init__.py
def update_statistics(self) -> UpdateStatistics:
    """
    Create a new UpdateStatistics to update the statistics of the table.

    Returns:
        A new UpdateStatistics
    """
    return UpdateStatistics(transaction=self)

upgrade_table_version(format_version)

Set the table to a certain version.

Parameters:

Name Type Description Default
format_version TableVersion

The newly set version.

required

Returns:

Type Description
Transaction

The alter table builder.

Source code in pyiceberg/table/__init__.py
def upgrade_table_version(self, format_version: TableVersion) -> Transaction:
    """Set the table to a certain version.

    Args:
        format_version: The newly set version.

    Returns:
        The alter table builder.
    """
    if format_version not in {1, 2}:
        raise ValueError(f"Unsupported table format version: {format_version}")

    if format_version < self.table_metadata.format_version:
        raise ValueError(f"Cannot downgrade v{self.table_metadata.format_version} table to v{format_version}")

    if format_version > self.table_metadata.format_version:
        return self._apply((UpgradeFormatVersionUpdate(format_version=format_version),))

    return self

upsert(df, join_cols=None, when_matched_update_all=True, when_not_matched_insert_all=True, case_sensitive=True, branch=MAIN_BRANCH)

Shorthand API for performing an upsert to an iceberg table.

Args:

df: The input dataframe to upsert with the table's data.
join_cols: Columns to join on, if not provided, it will use the identifier-field-ids.
when_matched_update_all: Bool indicating to update rows that are matched but require an update due to a value in a non-key column changing
when_not_matched_insert_all: Bool indicating new rows to be inserted that do not match any existing rows in the table
case_sensitive: Bool indicating if the match should be case-sensitive
branch: Branch Reference to run the upsert operation

To learn more about the identifier-field-ids: https://iceberg.apache.org/spec/#identifier-field-ids

    Example Use Cases:
        Case 1: Both Parameters = True (Full Upsert)
        Existing row found → Update it
        New row found → Insert it

        Case 2: when_matched_update_all = False, when_not_matched_insert_all = True
        Existing row found → Do nothing (no updates)
        New row found → Insert it

        Case 3: when_matched_update_all = True, when_not_matched_insert_all = False
        Existing row found → Update it
        New row found → Do nothing (no inserts)

        Case 4: Both Parameters = False (No Merge Effect)
        Existing row found → Do nothing
        New row found → Do nothing
        (Function effectively does nothing)

Returns:

Type Description
UpsertResult

An UpsertResult class (contains details of rows updated and inserted)

Source code in pyiceberg/table/__init__.py
def upsert(
    self,
    df: pa.Table,
    join_cols: Optional[List[str]] = None,
    when_matched_update_all: bool = True,
    when_not_matched_insert_all: bool = True,
    case_sensitive: bool = True,
    branch: Optional[str] = MAIN_BRANCH,
) -> UpsertResult:
    """Shorthand API for performing an upsert to an iceberg table.

    Args:

        df: The input dataframe to upsert with the table's data.
        join_cols: Columns to join on, if not provided, it will use the identifier-field-ids.
        when_matched_update_all: Bool indicating to update rows that are matched but require an update due to a value in a non-key column changing
        when_not_matched_insert_all: Bool indicating new rows to be inserted that do not match any existing rows in the table
        case_sensitive: Bool indicating if the match should be case-sensitive
        branch: Branch Reference to run the upsert operation

        To learn more about the identifier-field-ids: https://iceberg.apache.org/spec/#identifier-field-ids

            Example Use Cases:
                Case 1: Both Parameters = True (Full Upsert)
                Existing row found → Update it
                New row found → Insert it

                Case 2: when_matched_update_all = False, when_not_matched_insert_all = True
                Existing row found → Do nothing (no updates)
                New row found → Insert it

                Case 3: when_matched_update_all = True, when_not_matched_insert_all = False
                Existing row found → Update it
                New row found → Do nothing (no inserts)

                Case 4: Both Parameters = False (No Merge Effect)
                Existing row found → Do nothing
                New row found → Do nothing
                (Function effectively does nothing)


    Returns:
        An UpsertResult class (contains details of rows updated and inserted)
    """
    try:
        import pyarrow as pa  # noqa: F401
    except ModuleNotFoundError as e:
        raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e

    from pyiceberg.io.pyarrow import expression_to_pyarrow
    from pyiceberg.table import upsert_util

    if join_cols is None:
        join_cols = []
        for field_id in self.table_metadata.schema().identifier_field_ids:
            col = self.table_metadata.schema().find_column_name(field_id)
            if col is not None:
                join_cols.append(col)
            else:
                raise ValueError(f"Field-ID could not be found: {join_cols}")

    if len(join_cols) == 0:
        raise ValueError("Join columns could not be found, please set identifier-field-ids or pass in explicitly.")

    if not when_matched_update_all and not when_not_matched_insert_all:
        raise ValueError("no upsert options selected...exiting")

    if upsert_util.has_duplicate_rows(df, join_cols):
        raise ValueError("Duplicate rows found in source dataset based on the key columns. No upsert executed")

    from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible

    downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
    _check_pyarrow_schema_compatible(
        self.table_metadata.schema(),
        provided_schema=df.schema,
        downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
        format_version=self.table_metadata.format_version,
    )

    # get list of rows that exist so we don't have to load the entire target table
    matched_predicate = upsert_util.create_match_filter(df, join_cols)

    # We must use Transaction.table_metadata for the scan. This includes all uncommitted - but relevant - changes.

    matched_iceberg_record_batches_scan = DataScan(
        table_metadata=self.table_metadata,
        io=self._table.io,
        row_filter=matched_predicate,
        case_sensitive=case_sensitive,
    )

    if branch in self.table_metadata.refs:
        matched_iceberg_record_batches_scan = matched_iceberg_record_batches_scan.use_ref(branch)

    matched_iceberg_record_batches = matched_iceberg_record_batches_scan.to_arrow_batch_reader()

    batches_to_overwrite = []
    overwrite_predicates = []
    rows_to_insert = df

    for batch in matched_iceberg_record_batches:
        rows = pa.Table.from_batches([batch])

        if when_matched_update_all:
            # function get_rows_to_update is doing a check on non-key columns to see if any of the values have actually changed
            # we don't want to do just a blanket overwrite for matched rows if the actual non-key column data hasn't changed
            # this extra step avoids unnecessary IO and writes
            rows_to_update = upsert_util.get_rows_to_update(df, rows, join_cols)

            if len(rows_to_update) > 0:
                # build the match predicate filter
                overwrite_mask_predicate = upsert_util.create_match_filter(rows_to_update, join_cols)

                batches_to_overwrite.append(rows_to_update)
                overwrite_predicates.append(overwrite_mask_predicate)

        if when_not_matched_insert_all:
            expr_match = upsert_util.create_match_filter(rows, join_cols)
            expr_match_bound = bind(self.table_metadata.schema(), expr_match, case_sensitive=case_sensitive)
            expr_match_arrow = expression_to_pyarrow(expr_match_bound)

            # Filter rows per batch.
            rows_to_insert = rows_to_insert.filter(~expr_match_arrow)

    update_row_cnt = 0
    insert_row_cnt = 0

    if batches_to_overwrite:
        rows_to_update = pa.concat_tables(batches_to_overwrite)
        update_row_cnt = len(rows_to_update)
        self.overwrite(
            rows_to_update,
            overwrite_filter=Or(*overwrite_predicates) if len(overwrite_predicates) > 1 else overwrite_predicates[0],
            branch=branch,
        )

    if when_not_matched_insert_all:
        insert_row_cnt = len(rows_to_insert)
        if rows_to_insert:
            self.append(rows_to_insert, branch=branch)

    return UpsertResult(rows_updated=update_row_cnt, rows_inserted=insert_row_cnt)

UpsertResult dataclass

Summary the upsert operation.

Source code in pyiceberg/table/__init__.py
@dataclass()
class UpsertResult:
    """Summary the upsert operation."""

    rows_updated: int = 0
    rows_inserted: int = 0

WriteTask dataclass

Task with the parameters for writing a DataFile.

Source code in pyiceberg/table/__init__.py
@dataclass(frozen=True)
class WriteTask:
    """Task with the parameters for writing a DataFile."""

    write_uuid: uuid.UUID
    task_id: int
    schema: Schema
    record_batches: List[pa.RecordBatch]
    sort_order_id: Optional[int] = None
    partition_key: Optional[PartitionKey] = None

    def generate_data_file_filename(self, extension: str) -> str:
        # Mimics the behavior in the Java API:
        # https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101
        return f"00000-{self.task_id}-{self.write_uuid}.{extension}"