Skip to content

table

AssertCreate

Bases: TableRequirement

The table must not already exist; used for create transactions.

Source code in pyiceberg/table/__init__.py
class AssertCreate(TableRequirement):
    """The table must not already exist; used for create transactions."""

    type: Literal["assert-create"] = Field(default="assert-create")

    def validate(self, base_metadata: Optional[TableMetadata]) -> None:
        if base_metadata is not None:
            raise CommitFailedException("Table already exists")

AssertCurrentSchemaId

Bases: TableRequirement

The table's current schema id must match the requirement's current-schema-id.

Source code in pyiceberg/table/__init__.py
class AssertCurrentSchemaId(TableRequirement):
    """The table's current schema id must match the requirement's `current-schema-id`."""

    type: Literal["assert-current-schema-id"] = Field(default="assert-current-schema-id")
    current_schema_id: int = Field(..., alias="current-schema-id")

    def validate(self, base_metadata: Optional[TableMetadata]) -> None:
        if base_metadata is None:
            raise CommitFailedException("Requirement failed: current table metadata is missing")
        elif self.current_schema_id != base_metadata.current_schema_id:
            raise CommitFailedException(
                f"Requirement failed: current schema id has changed: expected {self.current_schema_id}, found {base_metadata.current_schema_id}"
            )

AssertDefaultSortOrderId

Bases: TableRequirement

The table's default sort order id must match the requirement's default-sort-order-id.

Source code in pyiceberg/table/__init__.py
class AssertDefaultSortOrderId(TableRequirement):
    """The table's default sort order id must match the requirement's `default-sort-order-id`."""

    type: Literal["assert-default-sort-order-id"] = Field(default="assert-default-sort-order-id")
    default_sort_order_id: int = Field(..., alias="default-sort-order-id")

    def validate(self, base_metadata: Optional[TableMetadata]) -> None:
        if base_metadata is None:
            raise CommitFailedException("Requirement failed: current table metadata is missing")
        elif self.default_sort_order_id != base_metadata.default_sort_order_id:
            raise CommitFailedException(
                f"Requirement failed: default sort order id has changed: expected {self.default_sort_order_id}, found {base_metadata.default_sort_order_id}"
            )

AssertDefaultSpecId

Bases: TableRequirement

The table's default spec id must match the requirement's default-spec-id.

Source code in pyiceberg/table/__init__.py
class AssertDefaultSpecId(TableRequirement):
    """The table's default spec id must match the requirement's `default-spec-id`."""

    type: Literal["assert-default-spec-id"] = Field(default="assert-default-spec-id")
    default_spec_id: int = Field(..., alias="default-spec-id")

    def validate(self, base_metadata: Optional[TableMetadata]) -> None:
        if base_metadata is None:
            raise CommitFailedException("Requirement failed: current table metadata is missing")
        elif self.default_spec_id != base_metadata.default_spec_id:
            raise CommitFailedException(
                f"Requirement failed: default spec id has changed: expected {self.default_spec_id}, found {base_metadata.default_spec_id}"
            )

AssertLastAssignedFieldId

Bases: TableRequirement

The table's last assigned column id must match the requirement's last-assigned-field-id.

Source code in pyiceberg/table/__init__.py
class AssertLastAssignedFieldId(TableRequirement):
    """The table's last assigned column id must match the requirement's `last-assigned-field-id`."""

    type: Literal["assert-last-assigned-field-id"] = Field(default="assert-last-assigned-field-id")
    last_assigned_field_id: int = Field(..., alias="last-assigned-field-id")

    def validate(self, base_metadata: Optional[TableMetadata]) -> None:
        if base_metadata is None:
            raise CommitFailedException("Requirement failed: current table metadata is missing")
        elif base_metadata.last_column_id != self.last_assigned_field_id:
            raise CommitFailedException(
                f"Requirement failed: last assigned field id has changed: expected {self.last_assigned_field_id}, found {base_metadata.last_column_id}"
            )

AssertLastAssignedPartitionId

Bases: TableRequirement

The table's last assigned partition id must match the requirement's last-assigned-partition-id.

Source code in pyiceberg/table/__init__.py
class AssertLastAssignedPartitionId(TableRequirement):
    """The table's last assigned partition id must match the requirement's `last-assigned-partition-id`."""

    type: Literal["assert-last-assigned-partition-id"] = Field(default="assert-last-assigned-partition-id")
    last_assigned_partition_id: int = Field(..., alias="last-assigned-partition-id")

    def validate(self, base_metadata: Optional[TableMetadata]) -> None:
        if base_metadata is None:
            raise CommitFailedException("Requirement failed: current table metadata is missing")
        elif base_metadata.last_partition_id != self.last_assigned_partition_id:
            raise CommitFailedException(
                f"Requirement failed: last assigned partition id has changed: expected {self.last_assigned_partition_id}, found {base_metadata.last_partition_id}"
            )

AssertRefSnapshotId

Bases: TableRequirement

The table branch or tag identified by the requirement's ref must reference the requirement's snapshot-id.

if snapshot-id is null or missing, the ref must not already exist.

Source code in pyiceberg/table/__init__.py
class AssertRefSnapshotId(TableRequirement):
    """The table branch or tag identified by the requirement's `ref` must reference the requirement's `snapshot-id`.

    if `snapshot-id` is `null` or missing, the ref must not already exist.
    """

    type: Literal["assert-ref-snapshot-id"] = Field(default="assert-ref-snapshot-id")
    ref: str = Field(...)
    snapshot_id: Optional[int] = Field(default=None, alias="snapshot-id")

    def validate(self, base_metadata: Optional[TableMetadata]) -> None:
        if base_metadata is None:
            raise CommitFailedException("Requirement failed: current table metadata is missing")
        elif snapshot_ref := base_metadata.refs.get(self.ref):
            ref_type = snapshot_ref.snapshot_ref_type
            if self.snapshot_id is None:
                raise CommitFailedException(f"Requirement failed: {ref_type} {self.ref} was created concurrently")
            elif self.snapshot_id != snapshot_ref.snapshot_id:
                raise CommitFailedException(
                    f"Requirement failed: {ref_type} {self.ref} has changed: expected id {self.snapshot_id}, found {snapshot_ref.snapshot_id}"
                )
        elif self.snapshot_id is not None:
            raise CommitFailedException(f"Requirement failed: branch or tag {self.ref} is missing, expected {self.snapshot_id}")

AssertTableUUID

Bases: TableRequirement

The table UUID must match the requirement's uuid.

Source code in pyiceberg/table/__init__.py
class AssertTableUUID(TableRequirement):
    """The table UUID must match the requirement's `uuid`."""

    type: Literal["assert-table-uuid"] = Field(default="assert-table-uuid")
    uuid: uuid.UUID

    def validate(self, base_metadata: Optional[TableMetadata]) -> None:
        if base_metadata is None:
            raise CommitFailedException("Requirement failed: current table metadata is missing")
        elif self.uuid != base_metadata.table_uuid:
            raise CommitFailedException(f"Table UUID does not match: {self.uuid} != {base_metadata.table_uuid}")

DataScan

Bases: TableScan

Source code in pyiceberg/table/__init__.py
class DataScan(TableScan):
    def __init__(
        self,
        table: Table,
        row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE,
        selected_fields: Tuple[str, ...] = ("*",),
        case_sensitive: bool = True,
        snapshot_id: Optional[int] = None,
        options: Properties = EMPTY_DICT,
        limit: Optional[int] = None,
    ):
        super().__init__(table, row_filter, selected_fields, case_sensitive, snapshot_id, options, limit)

    def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
        project = inclusive_projection(self.table.schema(), self.table.specs()[spec_id])
        return project(self.row_filter)

    @cached_property
    def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
        return KeyDefaultDict(self._build_partition_projection)

    def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]:
        spec = self.table.specs()[spec_id]
        return visitors.manifest_evaluator(spec, self.table.schema(), self.partition_filters[spec_id], self.case_sensitive)

    def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]:
        spec = self.table.specs()[spec_id]
        partition_type = spec.partition_type(self.table.schema())
        partition_schema = Schema(*partition_type.fields)
        partition_expr = self.partition_filters[spec_id]

        # The lambda created here is run in multiple threads.
        # So we avoid creating _EvaluatorExpression methods bound to a single
        # shared instance across multiple threads.
        return lambda data_file: visitors.expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(
            data_file.partition
        )

    def _check_sequence_number(self, min_data_sequence_number: int, manifest: ManifestFile) -> bool:
        """Ensure that no manifests are loaded that contain deletes that are older than the data.

        Args:
            min_data_sequence_number (int): The minimal sequence number.
            manifest (ManifestFile): A ManifestFile that can be either data or deletes.

        Returns:
            Boolean indicating if it is either a data file, or a relevant delete file.
        """
        return manifest.content == ManifestContent.DATA or (
            # Not interested in deletes that are older than the data
            manifest.content == ManifestContent.DELETES
            and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_data_sequence_number
        )

    def plan_files(self) -> Iterable[FileScanTask]:
        """Plans the relevant files by filtering on the PartitionSpecs.

        Returns:
            List of FileScanTasks that contain both data and delete files.
        """
        snapshot = self.snapshot()
        if not snapshot:
            return iter([])

        io = self.table.io

        # step 1: filter manifests using partition summaries
        # the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id

        manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)

        manifests = [
            manifest_file
            for manifest_file in snapshot.manifests(io)
            if manifest_evaluators[manifest_file.partition_spec_id](manifest_file)
        ]

        # step 2: filter the data files in each manifest
        # this filter depends on the partition spec used to write the manifest file

        partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator)
        metrics_evaluator = _InclusiveMetricsEvaluator(
            self.table.schema(), self.row_filter, self.case_sensitive, self.options.get("include_empty_files") == "true"
        ).eval

        min_data_sequence_number = _min_data_file_sequence_number(manifests)

        data_entries: List[ManifestEntry] = []
        positional_delete_entries = SortedList(key=lambda entry: entry.data_sequence_number or INITIAL_SEQUENCE_NUMBER)

        executor = ExecutorFactory.get_or_create()
        for manifest_entry in chain(
            *executor.map(
                lambda args: _open_manifest(*args),
                [
                    (
                        io,
                        manifest,
                        partition_evaluators[manifest.partition_spec_id],
                        metrics_evaluator,
                    )
                    for manifest in manifests
                    if self._check_sequence_number(min_data_sequence_number, manifest)
                ],
            )
        ):
            data_file = manifest_entry.data_file
            if data_file.content == DataFileContent.DATA:
                data_entries.append(manifest_entry)
            elif data_file.content == DataFileContent.POSITION_DELETES:
                positional_delete_entries.add(manifest_entry)
            elif data_file.content == DataFileContent.EQUALITY_DELETES:
                raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568")
            else:
                raise ValueError(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}")

        return [
            FileScanTask(
                data_entry.data_file,
                delete_files=_match_deletes_to_data_file(
                    data_entry,
                    positional_delete_entries,
                ),
            )
            for data_entry in data_entries
        ]

    def to_arrow(self) -> pa.Table:
        from pyiceberg.io.pyarrow import project_table

        return project_table(
            self.plan_files(),
            self.table,
            self.row_filter,
            self.projection(),
            case_sensitive=self.case_sensitive,
            limit=self.limit,
        )

    def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
        return self.to_arrow().to_pandas(**kwargs)

    def to_duckdb(self, table_name: str, connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection:
        import duckdb

        con = connection or duckdb.connect(database=":memory:")
        con.register(table_name, self.to_arrow())

        return con

    def to_ray(self) -> ray.data.dataset.Dataset:
        import ray

        return ray.data.from_arrow(self.to_arrow())

plan_files()

Plans the relevant files by filtering on the PartitionSpecs.

Returns:

Type Description
Iterable[FileScanTask]

List of FileScanTasks that contain both data and delete files.

Source code in pyiceberg/table/__init__.py
def plan_files(self) -> Iterable[FileScanTask]:
    """Plans the relevant files by filtering on the PartitionSpecs.

    Returns:
        List of FileScanTasks that contain both data and delete files.
    """
    snapshot = self.snapshot()
    if not snapshot:
        return iter([])

    io = self.table.io

    # step 1: filter manifests using partition summaries
    # the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id

    manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)

    manifests = [
        manifest_file
        for manifest_file in snapshot.manifests(io)
        if manifest_evaluators[manifest_file.partition_spec_id](manifest_file)
    ]

    # step 2: filter the data files in each manifest
    # this filter depends on the partition spec used to write the manifest file

    partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator)
    metrics_evaluator = _InclusiveMetricsEvaluator(
        self.table.schema(), self.row_filter, self.case_sensitive, self.options.get("include_empty_files") == "true"
    ).eval

    min_data_sequence_number = _min_data_file_sequence_number(manifests)

    data_entries: List[ManifestEntry] = []
    positional_delete_entries = SortedList(key=lambda entry: entry.data_sequence_number or INITIAL_SEQUENCE_NUMBER)

    executor = ExecutorFactory.get_or_create()
    for manifest_entry in chain(
        *executor.map(
            lambda args: _open_manifest(*args),
            [
                (
                    io,
                    manifest,
                    partition_evaluators[manifest.partition_spec_id],
                    metrics_evaluator,
                )
                for manifest in manifests
                if self._check_sequence_number(min_data_sequence_number, manifest)
            ],
        )
    ):
        data_file = manifest_entry.data_file
        if data_file.content == DataFileContent.DATA:
            data_entries.append(manifest_entry)
        elif data_file.content == DataFileContent.POSITION_DELETES:
            positional_delete_entries.add(manifest_entry)
        elif data_file.content == DataFileContent.EQUALITY_DELETES:
            raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568")
        else:
            raise ValueError(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}")

    return [
        FileScanTask(
            data_entry.data_file,
            delete_files=_match_deletes_to_data_file(
                data_entry,
                positional_delete_entries,
            ),
        )
        for data_entry in data_entries
    ]

Namespace

Bases: IcebergRootModel[List[str]]

Reference to one or more levels of a namespace.

Source code in pyiceberg/table/__init__.py
class Namespace(IcebergRootModel[List[str]]):
    """Reference to one or more levels of a namespace."""

    root: List[str] = Field(
        ...,
        description='Reference to one or more levels of a namespace',
    )

StaticTable

Bases: Table

Load a table directly from a metadata file (i.e., without using a catalog).

Source code in pyiceberg/table/__init__.py
class StaticTable(Table):
    """Load a table directly from a metadata file (i.e., without using a catalog)."""

    def refresh(self) -> Table:
        """Refresh the current table metadata."""
        raise NotImplementedError("To be implemented")

    @classmethod
    def from_metadata(cls, metadata_location: str, properties: Properties = EMPTY_DICT) -> StaticTable:
        io = load_file_io(properties=properties, location=metadata_location)
        file = io.new_input(metadata_location)

        from pyiceberg.serializers import FromInputFile

        metadata = FromInputFile.table_metadata(file)

        from pyiceberg.catalog.noop import NoopCatalog

        return cls(
            identifier=("static-table", metadata_location),
            metadata_location=metadata_location,
            metadata=metadata,
            io=load_file_io({**properties, **metadata.properties}),
            catalog=NoopCatalog("static-table"),
        )

refresh()

Refresh the current table metadata.

Source code in pyiceberg/table/__init__.py
def refresh(self) -> Table:
    """Refresh the current table metadata."""
    raise NotImplementedError("To be implemented")

Table

Source code in pyiceberg/table/__init__.py
class Table:
    identifier: Identifier = Field()
    metadata: TableMetadata
    metadata_location: str = Field()
    io: FileIO
    catalog: Catalog

    def __init__(
        self, identifier: Identifier, metadata: TableMetadata, metadata_location: str, io: FileIO, catalog: Catalog
    ) -> None:
        self.identifier = identifier
        self.metadata = metadata
        self.metadata_location = metadata_location
        self.io = io
        self.catalog = catalog

    def transaction(self) -> Transaction:
        return Transaction(self)

    def refresh(self) -> Table:
        """Refresh the current table metadata."""
        fresh = self.catalog.load_table(self.identifier[1:])
        self.metadata = fresh.metadata
        self.io = fresh.io
        self.metadata_location = fresh.metadata_location
        return self

    def name(self) -> Identifier:
        """Return the identifier of this table."""
        return self.identifier

    def scan(
        self,
        row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE,
        selected_fields: Tuple[str, ...] = ("*",),
        case_sensitive: bool = True,
        snapshot_id: Optional[int] = None,
        options: Properties = EMPTY_DICT,
        limit: Optional[int] = None,
    ) -> DataScan:
        return DataScan(
            table=self,
            row_filter=row_filter,
            selected_fields=selected_fields,
            case_sensitive=case_sensitive,
            snapshot_id=snapshot_id,
            options=options,
            limit=limit,
        )

    @property
    def format_version(self) -> Literal[1, 2]:
        return self.metadata.format_version

    def schema(self) -> Schema:
        """Return the schema for this table."""
        return next(schema for schema in self.metadata.schemas if schema.schema_id == self.metadata.current_schema_id)

    def schemas(self) -> Dict[int, Schema]:
        """Return a dict of the schema of this table."""
        return {schema.schema_id: schema for schema in self.metadata.schemas}

    def spec(self) -> PartitionSpec:
        """Return the partition spec of this table."""
        return next(spec for spec in self.metadata.partition_specs if spec.spec_id == self.metadata.default_spec_id)

    def specs(self) -> Dict[int, PartitionSpec]:
        """Return a dict the partition specs this table."""
        return {spec.spec_id: spec for spec in self.metadata.partition_specs}

    def sort_order(self) -> SortOrder:
        """Return the sort order of this table."""
        return next(
            sort_order for sort_order in self.metadata.sort_orders if sort_order.order_id == self.metadata.default_sort_order_id
        )

    def sort_orders(self) -> Dict[int, SortOrder]:
        """Return a dict of the sort orders of this table."""
        return {sort_order.order_id: sort_order for sort_order in self.metadata.sort_orders}

    @property
    def properties(self) -> Dict[str, str]:
        """Properties of the table."""
        return self.metadata.properties

    def location(self) -> str:
        """Return the table's base location."""
        return self.metadata.location

    @property
    def last_sequence_number(self) -> int:
        return self.metadata.last_sequence_number

    def next_sequence_number(self) -> int:
        return self.last_sequence_number + 1 if self.metadata.format_version > 1 else INITIAL_SEQUENCE_NUMBER

    def new_snapshot_id(self) -> int:
        """Generate a new snapshot-id that's not in use."""
        snapshot_id = _generate_snapshot_id()
        while self.snapshot_by_id(snapshot_id) is not None:
            snapshot_id = _generate_snapshot_id()

        return snapshot_id

    def current_snapshot(self) -> Optional[Snapshot]:
        """Get the current snapshot for this table, or None if there is no current snapshot."""
        if self.metadata.current_snapshot_id is not None:
            return self.snapshot_by_id(self.metadata.current_snapshot_id)
        return None

    def snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]:
        """Get the snapshot of this table with the given id, or None if there is no matching snapshot."""
        return self.metadata.snapshot_by_id(snapshot_id)

    def snapshot_by_name(self, name: str) -> Optional[Snapshot]:
        """Return the snapshot referenced by the given name or null if no such reference exists."""
        if ref := self.metadata.refs.get(name):
            return self.snapshot_by_id(ref.snapshot_id)
        return None

    def history(self) -> List[SnapshotLogEntry]:
        """Get the snapshot history of this table."""
        return self.metadata.snapshot_log

    def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema:
        return UpdateSchema(self, allow_incompatible_changes=allow_incompatible_changes, case_sensitive=case_sensitive)

    def name_mapping(self) -> NameMapping:
        """Return the table's field-id NameMapping."""
        if name_mapping_json := self.properties.get(TableProperties.DEFAULT_NAME_MAPPING):
            return parse_mapping_from_json(name_mapping_json)
        else:
            return create_mapping_from_schema(self.schema())

    def append(self, df: pa.Table) -> None:
        """
        Append data to the table.

        Args:
            df: The Arrow dataframe that will be appended to overwrite the table
        """
        try:
            import pyarrow as pa
        except ModuleNotFoundError as e:
            raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e

        if not isinstance(df, pa.Table):
            raise ValueError(f"Expected PyArrow table, got: {df}")

        if len(self.spec().fields) > 0:
            raise ValueError("Cannot write to partitioned tables")

        merge = _MergingSnapshotProducer(operation=Operation.APPEND, table=self)

        # skip writing data files if the dataframe is empty
        if df.shape[0] > 0:
            data_files = _dataframe_to_data_files(self, df=df)
            for data_file in data_files:
                merge.append_data_file(data_file)

        merge.commit()

    def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE) -> None:
        """
        Overwrite all the data in the table.

        Args:
            df: The Arrow dataframe that will be used to overwrite the table
            overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
                              or a boolean expression in case of a partial overwrite
        """
        try:
            import pyarrow as pa
        except ModuleNotFoundError as e:
            raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e

        if not isinstance(df, pa.Table):
            raise ValueError(f"Expected PyArrow table, got: {df}")

        if overwrite_filter != AlwaysTrue():
            raise NotImplementedError("Cannot overwrite a subset of a table")

        if len(self.spec().fields) > 0:
            raise ValueError("Cannot write to partitioned tables")

        merge = _MergingSnapshotProducer(
            operation=Operation.OVERWRITE if self.current_snapshot() is not None else Operation.APPEND,
            table=self,
        )

        # skip writing data files if the dataframe is empty
        if df.shape[0] > 0:
            data_files = _dataframe_to_data_files(self, df=df)
            for data_file in data_files:
                merge.append_data_file(data_file)

        merge.commit()

    def refs(self) -> Dict[str, SnapshotRef]:
        """Return the snapshot references in the table."""
        return self.metadata.refs

    def _do_commit(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...]) -> None:
        response = self.catalog._commit_table(  # pylint: disable=W0212
            CommitTableRequest(
                identifier=TableIdentifier(namespace=self.identifier[:-1], name=self.identifier[-1]),
                updates=updates,
                requirements=requirements,
            )
        )  # pylint: disable=W0212
        self.metadata = response.metadata
        self.metadata_location = response.metadata_location

    def __eq__(self, other: Any) -> bool:
        """Return the equality of two instances of the Table class."""
        return (
            self.identifier == other.identifier
            and self.metadata == other.metadata
            and self.metadata_location == other.metadata_location
            if isinstance(other, Table)
            else False
        )

    def __repr__(self) -> str:
        """Return the string representation of the Table class."""
        table_name = self.catalog.table_name_from(self.identifier)
        schema_str = ",\n  ".join(str(column) for column in self.schema().columns if self.schema())
        partition_str = f"partition by: [{', '.join(field.name for field in self.spec().fields if self.spec())}]"
        sort_order_str = f"sort order: [{', '.join(str(field) for field in self.sort_order().fields if self.sort_order())}]"
        snapshot_str = f"snapshot: {str(self.current_snapshot()) if self.current_snapshot() else 'null'}"
        result_str = f"{table_name}(\n  {schema_str}\n),\n{partition_str},\n{sort_order_str},\n{snapshot_str}"
        return result_str

    def to_daft(self) -> daft.DataFrame:
        """Read a Daft DataFrame lazily from this Iceberg table.

        Returns:
            daft.DataFrame: Unmaterialized Daft Dataframe created from the Iceberg table
        """
        import daft

        return daft.read_iceberg(self)

properties: Dict[str, str] property

Properties of the table.

__eq__(other)

Return the equality of two instances of the Table class.

Source code in pyiceberg/table/__init__.py
def __eq__(self, other: Any) -> bool:
    """Return the equality of two instances of the Table class."""
    return (
        self.identifier == other.identifier
        and self.metadata == other.metadata
        and self.metadata_location == other.metadata_location
        if isinstance(other, Table)
        else False
    )

__repr__()

Return the string representation of the Table class.

Source code in pyiceberg/table/__init__.py
def __repr__(self) -> str:
    """Return the string representation of the Table class."""
    table_name = self.catalog.table_name_from(self.identifier)
    schema_str = ",\n  ".join(str(column) for column in self.schema().columns if self.schema())
    partition_str = f"partition by: [{', '.join(field.name for field in self.spec().fields if self.spec())}]"
    sort_order_str = f"sort order: [{', '.join(str(field) for field in self.sort_order().fields if self.sort_order())}]"
    snapshot_str = f"snapshot: {str(self.current_snapshot()) if self.current_snapshot() else 'null'}"
    result_str = f"{table_name}(\n  {schema_str}\n),\n{partition_str},\n{sort_order_str},\n{snapshot_str}"
    return result_str

append(df)

Append data to the table.

Parameters:

Name Type Description Default
df Table

The Arrow dataframe that will be appended to overwrite the table

required
Source code in pyiceberg/table/__init__.py
def append(self, df: pa.Table) -> None:
    """
    Append data to the table.

    Args:
        df: The Arrow dataframe that will be appended to overwrite the table
    """
    try:
        import pyarrow as pa
    except ModuleNotFoundError as e:
        raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e

    if not isinstance(df, pa.Table):
        raise ValueError(f"Expected PyArrow table, got: {df}")

    if len(self.spec().fields) > 0:
        raise ValueError("Cannot write to partitioned tables")

    merge = _MergingSnapshotProducer(operation=Operation.APPEND, table=self)

    # skip writing data files if the dataframe is empty
    if df.shape[0] > 0:
        data_files = _dataframe_to_data_files(self, df=df)
        for data_file in data_files:
            merge.append_data_file(data_file)

    merge.commit()

current_snapshot()

Get the current snapshot for this table, or None if there is no current snapshot.

Source code in pyiceberg/table/__init__.py
def current_snapshot(self) -> Optional[Snapshot]:
    """Get the current snapshot for this table, or None if there is no current snapshot."""
    if self.metadata.current_snapshot_id is not None:
        return self.snapshot_by_id(self.metadata.current_snapshot_id)
    return None

history()

Get the snapshot history of this table.

Source code in pyiceberg/table/__init__.py
def history(self) -> List[SnapshotLogEntry]:
    """Get the snapshot history of this table."""
    return self.metadata.snapshot_log

location()

Return the table's base location.

Source code in pyiceberg/table/__init__.py
def location(self) -> str:
    """Return the table's base location."""
    return self.metadata.location

name()

Return the identifier of this table.

Source code in pyiceberg/table/__init__.py
def name(self) -> Identifier:
    """Return the identifier of this table."""
    return self.identifier

name_mapping()

Return the table's field-id NameMapping.

Source code in pyiceberg/table/__init__.py
def name_mapping(self) -> NameMapping:
    """Return the table's field-id NameMapping."""
    if name_mapping_json := self.properties.get(TableProperties.DEFAULT_NAME_MAPPING):
        return parse_mapping_from_json(name_mapping_json)
    else:
        return create_mapping_from_schema(self.schema())

new_snapshot_id()

Generate a new snapshot-id that's not in use.

Source code in pyiceberg/table/__init__.py
def new_snapshot_id(self) -> int:
    """Generate a new snapshot-id that's not in use."""
    snapshot_id = _generate_snapshot_id()
    while self.snapshot_by_id(snapshot_id) is not None:
        snapshot_id = _generate_snapshot_id()

    return snapshot_id

overwrite(df, overwrite_filter=ALWAYS_TRUE)

Overwrite all the data in the table.

Parameters:

Name Type Description Default
df Table

The Arrow dataframe that will be used to overwrite the table

required
overwrite_filter BooleanExpression

ALWAYS_TRUE when you overwrite all the data, or a boolean expression in case of a partial overwrite

ALWAYS_TRUE
Source code in pyiceberg/table/__init__.py
def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE) -> None:
    """
    Overwrite all the data in the table.

    Args:
        df: The Arrow dataframe that will be used to overwrite the table
        overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
                          or a boolean expression in case of a partial overwrite
    """
    try:
        import pyarrow as pa
    except ModuleNotFoundError as e:
        raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e

    if not isinstance(df, pa.Table):
        raise ValueError(f"Expected PyArrow table, got: {df}")

    if overwrite_filter != AlwaysTrue():
        raise NotImplementedError("Cannot overwrite a subset of a table")

    if len(self.spec().fields) > 0:
        raise ValueError("Cannot write to partitioned tables")

    merge = _MergingSnapshotProducer(
        operation=Operation.OVERWRITE if self.current_snapshot() is not None else Operation.APPEND,
        table=self,
    )

    # skip writing data files if the dataframe is empty
    if df.shape[0] > 0:
        data_files = _dataframe_to_data_files(self, df=df)
        for data_file in data_files:
            merge.append_data_file(data_file)

    merge.commit()

refresh()

Refresh the current table metadata.

Source code in pyiceberg/table/__init__.py
def refresh(self) -> Table:
    """Refresh the current table metadata."""
    fresh = self.catalog.load_table(self.identifier[1:])
    self.metadata = fresh.metadata
    self.io = fresh.io
    self.metadata_location = fresh.metadata_location
    return self

refs()

Return the snapshot references in the table.

Source code in pyiceberg/table/__init__.py
def refs(self) -> Dict[str, SnapshotRef]:
    """Return the snapshot references in the table."""
    return self.metadata.refs

schema()

Return the schema for this table.

Source code in pyiceberg/table/__init__.py
def schema(self) -> Schema:
    """Return the schema for this table."""
    return next(schema for schema in self.metadata.schemas if schema.schema_id == self.metadata.current_schema_id)

schemas()

Return a dict of the schema of this table.

Source code in pyiceberg/table/__init__.py
def schemas(self) -> Dict[int, Schema]:
    """Return a dict of the schema of this table."""
    return {schema.schema_id: schema for schema in self.metadata.schemas}

snapshot_by_id(snapshot_id)

Get the snapshot of this table with the given id, or None if there is no matching snapshot.

Source code in pyiceberg/table/__init__.py
def snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]:
    """Get the snapshot of this table with the given id, or None if there is no matching snapshot."""
    return self.metadata.snapshot_by_id(snapshot_id)

snapshot_by_name(name)

Return the snapshot referenced by the given name or null if no such reference exists.

Source code in pyiceberg/table/__init__.py
def snapshot_by_name(self, name: str) -> Optional[Snapshot]:
    """Return the snapshot referenced by the given name or null if no such reference exists."""
    if ref := self.metadata.refs.get(name):
        return self.snapshot_by_id(ref.snapshot_id)
    return None

sort_order()

Return the sort order of this table.

Source code in pyiceberg/table/__init__.py
def sort_order(self) -> SortOrder:
    """Return the sort order of this table."""
    return next(
        sort_order for sort_order in self.metadata.sort_orders if sort_order.order_id == self.metadata.default_sort_order_id
    )

sort_orders()

Return a dict of the sort orders of this table.

Source code in pyiceberg/table/__init__.py
def sort_orders(self) -> Dict[int, SortOrder]:
    """Return a dict of the sort orders of this table."""
    return {sort_order.order_id: sort_order for sort_order in self.metadata.sort_orders}

spec()

Return the partition spec of this table.

Source code in pyiceberg/table/__init__.py
def spec(self) -> PartitionSpec:
    """Return the partition spec of this table."""
    return next(spec for spec in self.metadata.partition_specs if spec.spec_id == self.metadata.default_spec_id)

specs()

Return a dict the partition specs this table.

Source code in pyiceberg/table/__init__.py
def specs(self) -> Dict[int, PartitionSpec]:
    """Return a dict the partition specs this table."""
    return {spec.spec_id: spec for spec in self.metadata.partition_specs}

to_daft()

Read a Daft DataFrame lazily from this Iceberg table.

Returns:

Type Description
DataFrame

daft.DataFrame: Unmaterialized Daft Dataframe created from the Iceberg table

Source code in pyiceberg/table/__init__.py
def to_daft(self) -> daft.DataFrame:
    """Read a Daft DataFrame lazily from this Iceberg table.

    Returns:
        daft.DataFrame: Unmaterialized Daft Dataframe created from the Iceberg table
    """
    import daft

    return daft.read_iceberg(self)

TableIdentifier

Bases: IcebergBaseModel

Fully Qualified identifier to a table.

Source code in pyiceberg/table/__init__.py
class TableIdentifier(IcebergBaseModel):
    """Fully Qualified identifier to a table."""

    namespace: Namespace
    name: str

TableRequirement

Bases: IcebergBaseModel

Source code in pyiceberg/table/__init__.py
class TableRequirement(IcebergBaseModel):
    type: str

    @abstractmethod
    def validate(self, base_metadata: Optional[TableMetadata]) -> None:
        """Validate the requirement against the base metadata.

        Args:
            base_metadata: The base metadata to be validated against.

        Raises:
            CommitFailedException: When the requirement is not met.
        """
        ...

validate(base_metadata) abstractmethod

Validate the requirement against the base metadata.

Parameters:

Name Type Description Default
base_metadata Optional[TableMetadata]

The base metadata to be validated against.

required

Raises:

Type Description
CommitFailedException

When the requirement is not met.

Source code in pyiceberg/table/__init__.py
@abstractmethod
def validate(self, base_metadata: Optional[TableMetadata]) -> None:
    """Validate the requirement against the base metadata.

    Args:
        base_metadata: The base metadata to be validated against.

    Raises:
        CommitFailedException: When the requirement is not met.
    """
    ...

TableScan

Bases: ABC

Source code in pyiceberg/table/__init__.py
class TableScan(ABC):
    table: Table
    row_filter: BooleanExpression
    selected_fields: Tuple[str, ...]
    case_sensitive: bool
    snapshot_id: Optional[int]
    options: Properties
    limit: Optional[int]

    def __init__(
        self,
        table: Table,
        row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE,
        selected_fields: Tuple[str, ...] = ("*",),
        case_sensitive: bool = True,
        snapshot_id: Optional[int] = None,
        options: Properties = EMPTY_DICT,
        limit: Optional[int] = None,
    ):
        self.table = table
        self.row_filter = _parse_row_filter(row_filter)
        self.selected_fields = selected_fields
        self.case_sensitive = case_sensitive
        self.snapshot_id = snapshot_id
        self.options = options
        self.limit = limit

    def snapshot(self) -> Optional[Snapshot]:
        if self.snapshot_id:
            return self.table.snapshot_by_id(self.snapshot_id)
        return self.table.current_snapshot()

    def projection(self) -> Schema:
        current_schema = self.table.schema()
        if self.snapshot_id is not None:
            snapshot = self.table.snapshot_by_id(self.snapshot_id)
            if snapshot is not None:
                if snapshot.schema_id is not None:
                    snapshot_schema = self.table.schemas().get(snapshot.schema_id)
                    if snapshot_schema is not None:
                        current_schema = snapshot_schema
                    else:
                        warnings.warn(f"Metadata does not contain schema with id: {snapshot.schema_id}")
            else:
                raise ValueError(f"Snapshot not found: {self.snapshot_id}")

        if "*" in self.selected_fields:
            return current_schema

        return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive)

    @abstractmethod
    def plan_files(self) -> Iterable[ScanTask]: ...

    @abstractmethod
    def to_arrow(self) -> pa.Table: ...

    @abstractmethod
    def to_pandas(self, **kwargs: Any) -> pd.DataFrame: ...

    def update(self: S, **overrides: Any) -> S:
        """Create a copy of this table scan with updated fields."""
        return type(self)(**{**self.__dict__, **overrides})

    def use_ref(self: S, name: str) -> S:
        if self.snapshot_id:
            raise ValueError(f"Cannot override ref, already set snapshot id={self.snapshot_id}")
        if snapshot := self.table.snapshot_by_name(name):
            return self.update(snapshot_id=snapshot.snapshot_id)

        raise ValueError(f"Cannot scan unknown ref={name}")

    def select(self: S, *field_names: str) -> S:
        if "*" in self.selected_fields:
            return self.update(selected_fields=field_names)
        return self.update(selected_fields=tuple(set(self.selected_fields).intersection(set(field_names))))

    def filter(self: S, expr: Union[str, BooleanExpression]) -> S:
        return self.update(row_filter=And(self.row_filter, _parse_row_filter(expr)))

    def with_case_sensitive(self: S, case_sensitive: bool = True) -> S:
        return self.update(case_sensitive=case_sensitive)

update(**overrides)

Create a copy of this table scan with updated fields.

Source code in pyiceberg/table/__init__.py
def update(self: S, **overrides: Any) -> S:
    """Create a copy of this table scan with updated fields."""
    return type(self)(**{**self.__dict__, **overrides})

Transaction

Source code in pyiceberg/table/__init__.py
class Transaction:
    _table: Table
    _updates: Tuple[TableUpdate, ...]
    _requirements: Tuple[TableRequirement, ...]

    def __init__(
        self,
        table: Table,
        actions: Optional[Tuple[TableUpdate, ...]] = None,
        requirements: Optional[Tuple[TableRequirement, ...]] = None,
    ):
        self._table = table
        self._updates = actions or ()
        self._requirements = requirements or ()

    def __enter__(self) -> Transaction:
        """Start a transaction to update the table."""
        return self

    def __exit__(self, _: Any, value: Any, traceback: Any) -> None:
        """Close and commit the transaction."""
        fresh_table = self.commit_transaction()
        # Update the new data in place
        self._table.metadata = fresh_table.metadata
        self._table.metadata_location = fresh_table.metadata_location

    def _append_updates(self, *new_updates: TableUpdate) -> Transaction:
        """Append updates to the set of staged updates.

        Args:
            *new_updates: Any new updates.

        Raises:
            ValueError: When the type of update is not unique.

        Returns:
            Transaction object with the new updates appended.
        """
        for new_update in new_updates:
            # explicitly get type of new_update as new_update is an instantiated class
            type_new_update = type(new_update)
            if any(isinstance(update, type_new_update) for update in self._updates):
                raise ValueError(f"Updates in a single commit need to be unique, duplicate: {type_new_update}")
        self._updates = self._updates + new_updates
        return self

    def _append_requirements(self, *new_requirements: TableRequirement) -> Transaction:
        """Append requirements to the set of staged requirements.

        Args:
            *new_requirements: Any new requirements.

        Raises:
            ValueError: When the type of requirement is not unique.

        Returns:
            Transaction object with the new requirements appended.
        """
        for new_requirement in new_requirements:
            # explicitly get type of new_update as requirement is an instantiated class
            type_new_requirement = type(new_requirement)
            if any(isinstance(requirement, type_new_requirement) for requirement in self._requirements):
                raise ValueError(f"Requirements in a single commit need to be unique, duplicate: {type_new_requirement}")
        self._requirements = self._requirements + new_requirements
        return self

    def upgrade_table_version(self, format_version: Literal[1, 2]) -> Transaction:
        """Set the table to a certain version.

        Args:
            format_version: The newly set version.

        Returns:
            The alter table builder.
        """
        if format_version not in {1, 2}:
            raise ValueError(f"Unsupported table format version: {format_version}")

        if format_version < self._table.metadata.format_version:
            raise ValueError(f"Cannot downgrade v{self._table.metadata.format_version} table to v{format_version}")
        if format_version > self._table.metadata.format_version:
            return self._append_updates(UpgradeFormatVersionUpdate(format_version=format_version))
        else:
            return self

    def set_properties(self, **updates: str) -> Transaction:
        """Set properties.

        When a property is already set, it will be overwritten.

        Args:
            updates: The properties set on the table.

        Returns:
            The alter table builder.
        """
        return self._append_updates(SetPropertiesUpdate(updates=updates))

    def add_snapshot(self, snapshot: Snapshot) -> Transaction:
        """Add a new snapshot to the table.

        Returns:
            The transaction with the add-snapshot staged.
        """
        self._append_updates(AddSnapshotUpdate(snapshot=snapshot))
        self._append_requirements(AssertTableUUID(uuid=self._table.metadata.table_uuid))

        return self

    def set_ref_snapshot(
        self,
        snapshot_id: int,
        parent_snapshot_id: Optional[int],
        ref_name: str,
        type: str,
        max_age_ref_ms: Optional[int] = None,
        max_snapshot_age_ms: Optional[int] = None,
        min_snapshots_to_keep: Optional[int] = None,
    ) -> Transaction:
        """Update a ref to a snapshot.

        Returns:
            The transaction with the set-snapshot-ref staged
        """
        self._append_updates(
            SetSnapshotRefUpdate(
                snapshot_id=snapshot_id,
                parent_snapshot_id=parent_snapshot_id,
                ref_name=ref_name,
                type=type,
                max_age_ref_ms=max_age_ref_ms,
                max_snapshot_age_ms=max_snapshot_age_ms,
                min_snapshots_to_keep=min_snapshots_to_keep,
            )
        )

        self._append_requirements(AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref="main"))
        return self

    def update_schema(self) -> UpdateSchema:
        """Create a new UpdateSchema to alter the columns of this table.

        Returns:
            A new UpdateSchema.
        """
        return UpdateSchema(self._table, self)

    def remove_properties(self, *removals: str) -> Transaction:
        """Remove properties.

        Args:
            removals: Properties to be removed.

        Returns:
            The alter table builder.
        """
        return self._append_updates(RemovePropertiesUpdate(removals=removals))

    def update_location(self, location: str) -> Transaction:
        """Set the new table location.

        Args:
            location: The new location of the table.

        Returns:
            The alter table builder.
        """
        raise NotImplementedError("Not yet implemented")

    def commit_transaction(self) -> Table:
        """Commit the changes to the catalog.

        Returns:
            The table with the updates applied.
        """
        # Strip the catalog name
        if len(self._updates) > 0:
            self._table._do_commit(  # pylint: disable=W0212
                updates=self._updates,
                requirements=self._requirements,
            )
            return self._table
        else:
            return self._table

__enter__()

Start a transaction to update the table.

Source code in pyiceberg/table/__init__.py
def __enter__(self) -> Transaction:
    """Start a transaction to update the table."""
    return self

__exit__(_, value, traceback)

Close and commit the transaction.

Source code in pyiceberg/table/__init__.py
def __exit__(self, _: Any, value: Any, traceback: Any) -> None:
    """Close and commit the transaction."""
    fresh_table = self.commit_transaction()
    # Update the new data in place
    self._table.metadata = fresh_table.metadata
    self._table.metadata_location = fresh_table.metadata_location

add_snapshot(snapshot)

Add a new snapshot to the table.

Returns:

Type Description
Transaction

The transaction with the add-snapshot staged.

Source code in pyiceberg/table/__init__.py
def add_snapshot(self, snapshot: Snapshot) -> Transaction:
    """Add a new snapshot to the table.

    Returns:
        The transaction with the add-snapshot staged.
    """
    self._append_updates(AddSnapshotUpdate(snapshot=snapshot))
    self._append_requirements(AssertTableUUID(uuid=self._table.metadata.table_uuid))

    return self

commit_transaction()

Commit the changes to the catalog.

Returns:

Type Description
Table

The table with the updates applied.

Source code in pyiceberg/table/__init__.py
def commit_transaction(self) -> Table:
    """Commit the changes to the catalog.

    Returns:
        The table with the updates applied.
    """
    # Strip the catalog name
    if len(self._updates) > 0:
        self._table._do_commit(  # pylint: disable=W0212
            updates=self._updates,
            requirements=self._requirements,
        )
        return self._table
    else:
        return self._table

remove_properties(*removals)

Remove properties.

Parameters:

Name Type Description Default
removals str

Properties to be removed.

()

Returns:

Type Description
Transaction

The alter table builder.

Source code in pyiceberg/table/__init__.py
def remove_properties(self, *removals: str) -> Transaction:
    """Remove properties.

    Args:
        removals: Properties to be removed.

    Returns:
        The alter table builder.
    """
    return self._append_updates(RemovePropertiesUpdate(removals=removals))

set_properties(**updates)

Set properties.

When a property is already set, it will be overwritten.

Parameters:

Name Type Description Default
updates str

The properties set on the table.

{}

Returns:

Type Description
Transaction

The alter table builder.

Source code in pyiceberg/table/__init__.py
def set_properties(self, **updates: str) -> Transaction:
    """Set properties.

    When a property is already set, it will be overwritten.

    Args:
        updates: The properties set on the table.

    Returns:
        The alter table builder.
    """
    return self._append_updates(SetPropertiesUpdate(updates=updates))

set_ref_snapshot(snapshot_id, parent_snapshot_id, ref_name, type, max_age_ref_ms=None, max_snapshot_age_ms=None, min_snapshots_to_keep=None)

Update a ref to a snapshot.

Returns:

Type Description
Transaction

The transaction with the set-snapshot-ref staged

Source code in pyiceberg/table/__init__.py
def set_ref_snapshot(
    self,
    snapshot_id: int,
    parent_snapshot_id: Optional[int],
    ref_name: str,
    type: str,
    max_age_ref_ms: Optional[int] = None,
    max_snapshot_age_ms: Optional[int] = None,
    min_snapshots_to_keep: Optional[int] = None,
) -> Transaction:
    """Update a ref to a snapshot.

    Returns:
        The transaction with the set-snapshot-ref staged
    """
    self._append_updates(
        SetSnapshotRefUpdate(
            snapshot_id=snapshot_id,
            parent_snapshot_id=parent_snapshot_id,
            ref_name=ref_name,
            type=type,
            max_age_ref_ms=max_age_ref_ms,
            max_snapshot_age_ms=max_snapshot_age_ms,
            min_snapshots_to_keep=min_snapshots_to_keep,
        )
    )

    self._append_requirements(AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref="main"))
    return self

update_location(location)

Set the new table location.

Parameters:

Name Type Description Default
location str

The new location of the table.

required

Returns:

Type Description
Transaction

The alter table builder.

Source code in pyiceberg/table/__init__.py
def update_location(self, location: str) -> Transaction:
    """Set the new table location.

    Args:
        location: The new location of the table.

    Returns:
        The alter table builder.
    """
    raise NotImplementedError("Not yet implemented")

update_schema()

Create a new UpdateSchema to alter the columns of this table.

Returns:

Type Description
UpdateSchema

A new UpdateSchema.

Source code in pyiceberg/table/__init__.py
def update_schema(self) -> UpdateSchema:
    """Create a new UpdateSchema to alter the columns of this table.

    Returns:
        A new UpdateSchema.
    """
    return UpdateSchema(self._table, self)

upgrade_table_version(format_version)

Set the table to a certain version.

Parameters:

Name Type Description Default
format_version Literal[1, 2]

The newly set version.

required

Returns:

Type Description
Transaction

The alter table builder.

Source code in pyiceberg/table/__init__.py
def upgrade_table_version(self, format_version: Literal[1, 2]) -> Transaction:
    """Set the table to a certain version.

    Args:
        format_version: The newly set version.

    Returns:
        The alter table builder.
    """
    if format_version not in {1, 2}:
        raise ValueError(f"Unsupported table format version: {format_version}")

    if format_version < self._table.metadata.format_version:
        raise ValueError(f"Cannot downgrade v{self._table.metadata.format_version} table to v{format_version}")
    if format_version > self._table.metadata.format_version:
        return self._append_updates(UpgradeFormatVersionUpdate(format_version=format_version))
    else:
        return self

UpdateSchema

Source code in pyiceberg/table/__init__.py
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
class UpdateSchema:
    _table: Optional[Table]
    _schema: Schema
    _last_column_id: itertools.count[int]
    _identifier_field_names: Set[str]

    _adds: Dict[int, List[NestedField]] = {}
    _updates: Dict[int, NestedField] = {}
    _deletes: Set[int] = set()
    _moves: Dict[int, List[Move]] = {}

    _added_name_to_id: Dict[str, int] = {}
    # Part of https://github.com/apache/iceberg/pull/8393
    _id_to_parent: Dict[int, str] = {}
    _allow_incompatible_changes: bool
    _case_sensitive: bool
    _transaction: Optional[Transaction]

    def __init__(
        self,
        table: Optional[Table],
        transaction: Optional[Transaction] = None,
        allow_incompatible_changes: bool = False,
        case_sensitive: bool = True,
        schema: Optional[Schema] = None,
    ) -> None:
        self._table = table

        if isinstance(schema, Schema):
            self._schema = schema
            self._last_column_id = itertools.count(1 + schema.highest_field_id)
        elif table is not None:
            self._schema = table.schema()
            self._last_column_id = itertools.count(1 + table.metadata.last_column_id)
        else:
            raise ValueError("Either provide a table or a schema")

        self._identifier_field_names = self._schema.identifier_field_names()

        self._adds = {}
        self._updates = {}
        self._deletes = set()
        self._moves = {}

        self._added_name_to_id = {}

        def get_column_name(field_id: int) -> str:
            column_name = self._schema.find_column_name(column_id=field_id)
            if column_name is None:
                raise ValueError(f"Could not find field-id: {field_id}")
            return column_name

        self._id_to_parent = {
            field_id: get_column_name(parent_field_id) for field_id, parent_field_id in self._schema._lazy_id_to_parent.items()
        }

        self._allow_incompatible_changes = allow_incompatible_changes
        self._case_sensitive = case_sensitive
        self._transaction = transaction

    def __exit__(self, _: Any, value: Any, traceback: Any) -> None:
        """Close and commit the change."""
        return self.commit()

    def __enter__(self) -> UpdateSchema:
        """Update the table."""
        return self

    def case_sensitive(self, case_sensitive: bool) -> UpdateSchema:
        """Determine if the case of schema needs to be considered when comparing column names.

        Args:
            case_sensitive: When false case is not considered in column name comparisons.

        Returns:
            This for method chaining
        """
        self._case_sensitive = case_sensitive
        return self

    def union_by_name(self, new_schema: Union[Schema, "pa.Schema"]) -> UpdateSchema:
        from pyiceberg.catalog import Catalog

        visit_with_partner(
            Catalog._convert_schema_if_needed(new_schema),
            -1,
            UnionByNameVisitor(update_schema=self, existing_schema=self._schema, case_sensitive=self._case_sensitive),  # type: ignore
            PartnerIdByNameAccessor(partner_schema=self._schema, case_sensitive=self._case_sensitive),
        )
        return self

    def add_column(
        self, path: Union[str, Tuple[str, ...]], field_type: IcebergType, doc: Optional[str] = None, required: bool = False
    ) -> UpdateSchema:
        """Add a new column to a nested struct or Add a new top-level column.

        Because "." may be interpreted as a column path separator or may be used in field names, it
        is not allowed to add nested column by passing in a string. To add to nested structures or
        to add fields with names that contain "." use a tuple instead to indicate the path.

        If type is a nested type, its field IDs are reassigned when added to the existing schema.

        Args:
            path: Name for the new column.
            field_type: Type for the new column.
            doc: Documentation string for the new column.
            required: Whether the new column is required.

        Returns:
            This for method chaining.
        """
        if isinstance(path, str):
            if "." in path:
                raise ValueError(f"Cannot add column with ambiguous name: {path}, provide a tuple instead")
            path = (path,)

        if required and not self._allow_incompatible_changes:
            # Table format version 1 and 2 cannot add required column because there is no initial value
            raise ValueError(f'Incompatible change: cannot add required column: {".".join(path)}')

        name = path[-1]
        parent = path[:-1]

        full_name = ".".join(path)
        parent_full_path = ".".join(parent)
        parent_id: int = TABLE_ROOT_ID

        if len(parent) > 0:
            parent_field = self._schema.find_field(parent_full_path, self._case_sensitive)
            parent_type = parent_field.field_type
            if isinstance(parent_type, MapType):
                parent_field = parent_type.value_field
            elif isinstance(parent_type, ListType):
                parent_field = parent_type.element_field

            if not parent_field.field_type.is_struct:
                raise ValueError(f"Cannot add column '{name}' to non-struct type: {parent_full_path}")

            parent_id = parent_field.field_id

        existing_field = None
        try:
            existing_field = self._schema.find_field(full_name, self._case_sensitive)
        except ValueError:
            pass

        if existing_field is not None and existing_field.field_id not in self._deletes:
            raise ValueError(f"Cannot add column, name already exists: {full_name}")

        # assign new IDs in order
        new_id = self.assign_new_column_id()

        # update tracking for moves
        self._added_name_to_id[full_name] = new_id
        self._id_to_parent[new_id] = parent_full_path

        new_type = assign_fresh_schema_ids(field_type, self.assign_new_column_id)
        field = NestedField(field_id=new_id, name=name, field_type=new_type, required=required, doc=doc)

        if parent_id in self._adds:
            self._adds[parent_id].append(field)
        else:
            self._adds[parent_id] = [field]

        return self

    def delete_column(self, path: Union[str, Tuple[str, ...]]) -> UpdateSchema:
        """Delete a column from a table.

        Args:
            path: The path to the column.

        Returns:
            The UpdateSchema with the delete operation staged.
        """
        name = (path,) if isinstance(path, str) else path
        full_name = ".".join(name)

        field = self._schema.find_field(full_name, case_sensitive=self._case_sensitive)

        if field.field_id in self._adds:
            raise ValueError(f"Cannot delete a column that has additions: {full_name}")
        if field.field_id in self._updates:
            raise ValueError(f"Cannot delete a column that has updates: {full_name}")

        self._deletes.add(field.field_id)

        return self

    def rename_column(self, path_from: Union[str, Tuple[str, ...]], new_name: str) -> UpdateSchema:
        """Update the name of a column.

        Args:
            path_from: The path to the column to be renamed.
            new_name: The new path of the column.

        Returns:
            The UpdateSchema with the rename operation staged.
        """
        path_from = ".".join(path_from) if isinstance(path_from, tuple) else path_from
        field_from = self._schema.find_field(path_from, self._case_sensitive)

        if field_from.field_id in self._deletes:
            raise ValueError(f"Cannot rename a column that will be deleted: {path_from}")

        if updated := self._updates.get(field_from.field_id):
            self._updates[field_from.field_id] = NestedField(
                field_id=updated.field_id,
                name=new_name,
                field_type=updated.field_type,
                doc=updated.doc,
                required=updated.required,
            )
        else:
            self._updates[field_from.field_id] = NestedField(
                field_id=field_from.field_id,
                name=new_name,
                field_type=field_from.field_type,
                doc=field_from.doc,
                required=field_from.required,
            )

        # Lookup the field because of casing
        from_field_correct_casing = self._schema.find_column_name(field_from.field_id)
        if from_field_correct_casing in self._identifier_field_names:
            self._identifier_field_names.remove(from_field_correct_casing)
            new_identifier_path = f"{from_field_correct_casing[:-len(field_from.name)]}{new_name}"
            self._identifier_field_names.add(new_identifier_path)

        return self

    def make_column_optional(self, path: Union[str, Tuple[str, ...]]) -> UpdateSchema:
        """Make a column optional.

        Args:
            path: The path to the field.

        Returns:
            The UpdateSchema with the requirement change staged.
        """
        self._set_column_requirement(path, required=False)
        return self

    def set_identifier_fields(self, *fields: str) -> None:
        self._identifier_field_names = set(fields)

    def _set_column_requirement(self, path: Union[str, Tuple[str, ...]], required: bool) -> None:
        path = (path,) if isinstance(path, str) else path
        name = ".".join(path)

        field = self._schema.find_field(name, self._case_sensitive)

        if (field.required and required) or (field.optional and not required):
            # if the change is a noop, allow it even if allowIncompatibleChanges is false
            return

        if not self._allow_incompatible_changes and required:
            raise ValueError(f"Cannot change column nullability: {name}: optional -> required")

        if field.field_id in self._deletes:
            raise ValueError(f"Cannot update a column that will be deleted: {name}")

        if updated := self._updates.get(field.field_id):
            self._updates[field.field_id] = NestedField(
                field_id=updated.field_id,
                name=updated.name,
                field_type=updated.field_type,
                doc=updated.doc,
                required=required,
            )
        else:
            self._updates[field.field_id] = NestedField(
                field_id=field.field_id,
                name=field.name,
                field_type=field.field_type,
                doc=field.doc,
                required=required,
            )

    def update_column(
        self,
        path: Union[str, Tuple[str, ...]],
        field_type: Optional[IcebergType] = None,
        required: Optional[bool] = None,
        doc: Optional[str] = None,
    ) -> UpdateSchema:
        """Update the type of column.

        Args:
            path: The path to the field.
            field_type: The new type
            required: If the field should be required
            doc: Documentation describing the column

        Returns:
            The UpdateSchema with the type update staged.
        """
        path = (path,) if isinstance(path, str) else path
        full_name = ".".join(path)

        if field_type is None and required is None and doc is None:
            return self

        field = self._schema.find_field(full_name, self._case_sensitive)

        if field.field_id in self._deletes:
            raise ValueError(f"Cannot update a column that will be deleted: {full_name}")

        if field_type is not None:
            if not field.field_type.is_primitive:
                raise ValidationError(f"Cannot change column type: {field.field_type} is not a primitive")

            if not self._allow_incompatible_changes and field.field_type != field_type:
                try:
                    promote(field.field_type, field_type)
                except ResolveError as e:
                    raise ValidationError(f"Cannot change column type: {full_name}: {field.field_type} -> {field_type}") from e

        if updated := self._updates.get(field.field_id):
            self._updates[field.field_id] = NestedField(
                field_id=updated.field_id,
                name=updated.name,
                field_type=field_type or updated.field_type,
                doc=doc or updated.doc,
                required=updated.required,
            )
        else:
            self._updates[field.field_id] = NestedField(
                field_id=field.field_id,
                name=field.name,
                field_type=field_type or field.field_type,
                doc=doc or field.doc,
                required=field.required,
            )

        if required is not None:
            self._set_column_requirement(path, required=required)

        return self

    def _find_for_move(self, name: str) -> Optional[int]:
        try:
            return self._schema.find_field(name, self._case_sensitive).field_id
        except ValueError:
            pass

        return self._added_name_to_id.get(name)

    def _move(self, move: Move) -> None:
        if parent_name := self._id_to_parent.get(move.field_id):
            parent_field = self._schema.find_field(parent_name, case_sensitive=self._case_sensitive)
            if not parent_field.field_type.is_struct:
                raise ValueError(f"Cannot move fields in non-struct type: {parent_field.field_type}")

            if move.op == MoveOperation.After or move.op == MoveOperation.Before:
                if move.other_field_id is None:
                    raise ValueError("Expected other field when performing before/after move")

                if self._id_to_parent.get(move.field_id) != self._id_to_parent.get(move.other_field_id):
                    raise ValueError(f"Cannot move field {move.full_name} to a different struct")

            self._moves[parent_field.field_id] = self._moves.get(parent_field.field_id, []) + [move]
        else:
            # In the top level field
            if move.op == MoveOperation.After or move.op == MoveOperation.Before:
                if move.other_field_id is None:
                    raise ValueError("Expected other field when performing before/after move")

                if other_struct := self._id_to_parent.get(move.other_field_id):
                    raise ValueError(f"Cannot move field {move.full_name} to a different struct: {other_struct}")

            self._moves[TABLE_ROOT_ID] = self._moves.get(TABLE_ROOT_ID, []) + [move]

    def move_first(self, path: Union[str, Tuple[str, ...]]) -> UpdateSchema:
        """Move the field to the first position of the parent struct.

        Args:
            path: The path to the field.

        Returns:
            The UpdateSchema with the move operation staged.
        """
        full_name = ".".join(path) if isinstance(path, tuple) else path

        field_id = self._find_for_move(full_name)

        if field_id is None:
            raise ValueError(f"Cannot move missing column: {full_name}")

        self._move(Move(field_id=field_id, full_name=full_name, op=MoveOperation.First))

        return self

    def move_before(self, path: Union[str, Tuple[str, ...]], before_path: Union[str, Tuple[str, ...]]) -> UpdateSchema:
        """Move the field to before another field.

        Args:
            path: The path to the field.

        Returns:
            The UpdateSchema with the move operation staged.
        """
        full_name = ".".join(path) if isinstance(path, tuple) else path
        field_id = self._find_for_move(full_name)

        if field_id is None:
            raise ValueError(f"Cannot move missing column: {full_name}")

        before_full_name = (
            ".".join(
                before_path,
            )
            if isinstance(before_path, tuple)
            else before_path
        )
        before_field_id = self._find_for_move(before_full_name)

        if before_field_id is None:
            raise ValueError(f"Cannot move {full_name} before missing column: {before_full_name}")

        if field_id == before_field_id:
            raise ValueError(f"Cannot move {full_name} before itself")

        self._move(Move(field_id=field_id, full_name=full_name, other_field_id=before_field_id, op=MoveOperation.Before))

        return self

    def move_after(self, path: Union[str, Tuple[str, ...]], after_name: Union[str, Tuple[str, ...]]) -> UpdateSchema:
        """Move the field to after another field.

        Args:
            path: The path to the field.

        Returns:
            The UpdateSchema with the move operation staged.
        """
        full_name = ".".join(path) if isinstance(path, tuple) else path

        field_id = self._find_for_move(full_name)

        if field_id is None:
            raise ValueError(f"Cannot move missing column: {full_name}")

        after_path = ".".join(after_name) if isinstance(after_name, tuple) else after_name
        after_field_id = self._find_for_move(after_path)

        if after_field_id is None:
            raise ValueError(f"Cannot move {full_name} after missing column: {after_path}")

        if field_id == after_field_id:
            raise ValueError(f"Cannot move {full_name} after itself")

        self._move(Move(field_id=field_id, full_name=full_name, other_field_id=after_field_id, op=MoveOperation.After))

        return self

    def commit(self) -> None:
        """Apply the pending changes and commit."""
        if self._table is None:
            raise ValueError("Requires a table to commit to")

        new_schema = self._apply()

        existing_schema_id = next((schema.schema_id for schema in self._table.metadata.schemas if schema == new_schema), None)

        # Check if it is different current schema ID
        if existing_schema_id != self._table.schema().schema_id:
            requirements = (AssertCurrentSchemaId(current_schema_id=self._schema.schema_id),)
            if existing_schema_id is None:
                last_column_id = max(self._table.metadata.last_column_id, new_schema.highest_field_id)
                updates = (
                    AddSchemaUpdate(schema=new_schema, last_column_id=last_column_id),
                    SetCurrentSchemaUpdate(schema_id=-1),
                )
            else:
                updates = (SetCurrentSchemaUpdate(schema_id=existing_schema_id),)  # type: ignore

            if self._transaction is not None:
                self._transaction._append_updates(*updates)  # pylint: disable=W0212
                self._transaction._append_requirements(*requirements)  # pylint: disable=W0212
            else:
                self._table._do_commit(updates=updates, requirements=requirements)  # pylint: disable=W0212

    def _apply(self) -> Schema:
        """Apply the pending changes to the original schema and returns the result.

        Returns:
            the result Schema when all pending updates are applied
        """
        struct = visit(self._schema, _ApplyChanges(self._adds, self._updates, self._deletes, self._moves))
        if struct is None:
            # Should never happen
            raise ValueError("Could not apply changes")

        # Check the field-ids
        new_schema = Schema(*struct.fields)
        field_ids = set()
        for name in self._identifier_field_names:
            try:
                field = new_schema.find_field(name, case_sensitive=self._case_sensitive)
            except ValueError as e:
                raise ValueError(
                    f"Cannot find identifier field {name}. In case of deletion, update the identifier fields first."
                ) from e

            field_ids.add(field.field_id)

        next_schema_id = 1 + (max(self._table.schemas().keys()) if self._table is not None else self._schema.schema_id)
        return Schema(*struct.fields, schema_id=next_schema_id, identifier_field_ids=field_ids)

    def assign_new_column_id(self) -> int:
        return next(self._last_column_id)

__enter__()

Update the table.

Source code in pyiceberg/table/__init__.py
def __enter__(self) -> UpdateSchema:
    """Update the table."""
    return self

__exit__(_, value, traceback)

Close and commit the change.

Source code in pyiceberg/table/__init__.py
def __exit__(self, _: Any, value: Any, traceback: Any) -> None:
    """Close and commit the change."""
    return self.commit()

add_column(path, field_type, doc=None, required=False)

Add a new column to a nested struct or Add a new top-level column.

Because "." may be interpreted as a column path separator or may be used in field names, it is not allowed to add nested column by passing in a string. To add to nested structures or to add fields with names that contain "." use a tuple instead to indicate the path.

If type is a nested type, its field IDs are reassigned when added to the existing schema.

Parameters:

Name Type Description Default
path Union[str, Tuple[str, ...]]

Name for the new column.

required
field_type IcebergType

Type for the new column.

required
doc Optional[str]

Documentation string for the new column.

None
required bool

Whether the new column is required.

False

Returns:

Type Description
UpdateSchema

This for method chaining.

Source code in pyiceberg/table/__init__.py
def add_column(
    self, path: Union[str, Tuple[str, ...]], field_type: IcebergType, doc: Optional[str] = None, required: bool = False
) -> UpdateSchema:
    """Add a new column to a nested struct or Add a new top-level column.

    Because "." may be interpreted as a column path separator or may be used in field names, it
    is not allowed to add nested column by passing in a string. To add to nested structures or
    to add fields with names that contain "." use a tuple instead to indicate the path.

    If type is a nested type, its field IDs are reassigned when added to the existing schema.

    Args:
        path: Name for the new column.
        field_type: Type for the new column.
        doc: Documentation string for the new column.
        required: Whether the new column is required.

    Returns:
        This for method chaining.
    """
    if isinstance(path, str):
        if "." in path:
            raise ValueError(f"Cannot add column with ambiguous name: {path}, provide a tuple instead")
        path = (path,)

    if required and not self._allow_incompatible_changes:
        # Table format version 1 and 2 cannot add required column because there is no initial value
        raise ValueError(f'Incompatible change: cannot add required column: {".".join(path)}')

    name = path[-1]
    parent = path[:-1]

    full_name = ".".join(path)
    parent_full_path = ".".join(parent)
    parent_id: int = TABLE_ROOT_ID

    if len(parent) > 0:
        parent_field = self._schema.find_field(parent_full_path, self._case_sensitive)
        parent_type = parent_field.field_type
        if isinstance(parent_type, MapType):
            parent_field = parent_type.value_field
        elif isinstance(parent_type, ListType):
            parent_field = parent_type.element_field

        if not parent_field.field_type.is_struct:
            raise ValueError(f"Cannot add column '{name}' to non-struct type: {parent_full_path}")

        parent_id = parent_field.field_id

    existing_field = None
    try:
        existing_field = self._schema.find_field(full_name, self._case_sensitive)
    except ValueError:
        pass

    if existing_field is not None and existing_field.field_id not in self._deletes:
        raise ValueError(f"Cannot add column, name already exists: {full_name}")

    # assign new IDs in order
    new_id = self.assign_new_column_id()

    # update tracking for moves
    self._added_name_to_id[full_name] = new_id
    self._id_to_parent[new_id] = parent_full_path

    new_type = assign_fresh_schema_ids(field_type, self.assign_new_column_id)
    field = NestedField(field_id=new_id, name=name, field_type=new_type, required=required, doc=doc)

    if parent_id in self._adds:
        self._adds[parent_id].append(field)
    else:
        self._adds[parent_id] = [field]

    return self

case_sensitive(case_sensitive)

Determine if the case of schema needs to be considered when comparing column names.

Parameters:

Name Type Description Default
case_sensitive bool

When false case is not considered in column name comparisons.

required

Returns:

Type Description
UpdateSchema

This for method chaining

Source code in pyiceberg/table/__init__.py
def case_sensitive(self, case_sensitive: bool) -> UpdateSchema:
    """Determine if the case of schema needs to be considered when comparing column names.

    Args:
        case_sensitive: When false case is not considered in column name comparisons.

    Returns:
        This for method chaining
    """
    self._case_sensitive = case_sensitive
    return self

commit()

Apply the pending changes and commit.

Source code in pyiceberg/table/__init__.py
def commit(self) -> None:
    """Apply the pending changes and commit."""
    if self._table is None:
        raise ValueError("Requires a table to commit to")

    new_schema = self._apply()

    existing_schema_id = next((schema.schema_id for schema in self._table.metadata.schemas if schema == new_schema), None)

    # Check if it is different current schema ID
    if existing_schema_id != self._table.schema().schema_id:
        requirements = (AssertCurrentSchemaId(current_schema_id=self._schema.schema_id),)
        if existing_schema_id is None:
            last_column_id = max(self._table.metadata.last_column_id, new_schema.highest_field_id)
            updates = (
                AddSchemaUpdate(schema=new_schema, last_column_id=last_column_id),
                SetCurrentSchemaUpdate(schema_id=-1),
            )
        else:
            updates = (SetCurrentSchemaUpdate(schema_id=existing_schema_id),)  # type: ignore

        if self._transaction is not None:
            self._transaction._append_updates(*updates)  # pylint: disable=W0212
            self._transaction._append_requirements(*requirements)  # pylint: disable=W0212
        else:
            self._table._do_commit(updates=updates, requirements=requirements)  # pylint: disable=W0212

delete_column(path)

Delete a column from a table.

Parameters:

Name Type Description Default
path Union[str, Tuple[str, ...]]

The path to the column.

required

Returns:

Type Description
UpdateSchema

The UpdateSchema with the delete operation staged.

Source code in pyiceberg/table/__init__.py
def delete_column(self, path: Union[str, Tuple[str, ...]]) -> UpdateSchema:
    """Delete a column from a table.

    Args:
        path: The path to the column.

    Returns:
        The UpdateSchema with the delete operation staged.
    """
    name = (path,) if isinstance(path, str) else path
    full_name = ".".join(name)

    field = self._schema.find_field(full_name, case_sensitive=self._case_sensitive)

    if field.field_id in self._adds:
        raise ValueError(f"Cannot delete a column that has additions: {full_name}")
    if field.field_id in self._updates:
        raise ValueError(f"Cannot delete a column that has updates: {full_name}")

    self._deletes.add(field.field_id)

    return self

make_column_optional(path)

Make a column optional.

Parameters:

Name Type Description Default
path Union[str, Tuple[str, ...]]

The path to the field.

required

Returns:

Type Description
UpdateSchema

The UpdateSchema with the requirement change staged.

Source code in pyiceberg/table/__init__.py
def make_column_optional(self, path: Union[str, Tuple[str, ...]]) -> UpdateSchema:
    """Make a column optional.

    Args:
        path: The path to the field.

    Returns:
        The UpdateSchema with the requirement change staged.
    """
    self._set_column_requirement(path, required=False)
    return self

move_after(path, after_name)

Move the field to after another field.

Parameters:

Name Type Description Default
path Union[str, Tuple[str, ...]]

The path to the field.

required

Returns:

Type Description
UpdateSchema

The UpdateSchema with the move operation staged.

Source code in pyiceberg/table/__init__.py
def move_after(self, path: Union[str, Tuple[str, ...]], after_name: Union[str, Tuple[str, ...]]) -> UpdateSchema:
    """Move the field to after another field.

    Args:
        path: The path to the field.

    Returns:
        The UpdateSchema with the move operation staged.
    """
    full_name = ".".join(path) if isinstance(path, tuple) else path

    field_id = self._find_for_move(full_name)

    if field_id is None:
        raise ValueError(f"Cannot move missing column: {full_name}")

    after_path = ".".join(after_name) if isinstance(after_name, tuple) else after_name
    after_field_id = self._find_for_move(after_path)

    if after_field_id is None:
        raise ValueError(f"Cannot move {full_name} after missing column: {after_path}")

    if field_id == after_field_id:
        raise ValueError(f"Cannot move {full_name} after itself")

    self._move(Move(field_id=field_id, full_name=full_name, other_field_id=after_field_id, op=MoveOperation.After))

    return self

move_before(path, before_path)

Move the field to before another field.

Parameters:

Name Type Description Default
path Union[str, Tuple[str, ...]]

The path to the field.

required

Returns:

Type Description
UpdateSchema

The UpdateSchema with the move operation staged.

Source code in pyiceberg/table/__init__.py
def move_before(self, path: Union[str, Tuple[str, ...]], before_path: Union[str, Tuple[str, ...]]) -> UpdateSchema:
    """Move the field to before another field.

    Args:
        path: The path to the field.

    Returns:
        The UpdateSchema with the move operation staged.
    """
    full_name = ".".join(path) if isinstance(path, tuple) else path
    field_id = self._find_for_move(full_name)

    if field_id is None:
        raise ValueError(f"Cannot move missing column: {full_name}")

    before_full_name = (
        ".".join(
            before_path,
        )
        if isinstance(before_path, tuple)
        else before_path
    )
    before_field_id = self._find_for_move(before_full_name)

    if before_field_id is None:
        raise ValueError(f"Cannot move {full_name} before missing column: {before_full_name}")

    if field_id == before_field_id:
        raise ValueError(f"Cannot move {full_name} before itself")

    self._move(Move(field_id=field_id, full_name=full_name, other_field_id=before_field_id, op=MoveOperation.Before))

    return self

move_first(path)

Move the field to the first position of the parent struct.

Parameters:

Name Type Description Default
path Union[str, Tuple[str, ...]]

The path to the field.

required

Returns:

Type Description
UpdateSchema

The UpdateSchema with the move operation staged.

Source code in pyiceberg/table/__init__.py
def move_first(self, path: Union[str, Tuple[str, ...]]) -> UpdateSchema:
    """Move the field to the first position of the parent struct.

    Args:
        path: The path to the field.

    Returns:
        The UpdateSchema with the move operation staged.
    """
    full_name = ".".join(path) if isinstance(path, tuple) else path

    field_id = self._find_for_move(full_name)

    if field_id is None:
        raise ValueError(f"Cannot move missing column: {full_name}")

    self._move(Move(field_id=field_id, full_name=full_name, op=MoveOperation.First))

    return self

rename_column(path_from, new_name)

Update the name of a column.

Parameters:

Name Type Description Default
path_from Union[str, Tuple[str, ...]]

The path to the column to be renamed.

required
new_name str

The new path of the column.

required

Returns:

Type Description
UpdateSchema

The UpdateSchema with the rename operation staged.

Source code in pyiceberg/table/__init__.py
def rename_column(self, path_from: Union[str, Tuple[str, ...]], new_name: str) -> UpdateSchema:
    """Update the name of a column.

    Args:
        path_from: The path to the column to be renamed.
        new_name: The new path of the column.

    Returns:
        The UpdateSchema with the rename operation staged.
    """
    path_from = ".".join(path_from) if isinstance(path_from, tuple) else path_from
    field_from = self._schema.find_field(path_from, self._case_sensitive)

    if field_from.field_id in self._deletes:
        raise ValueError(f"Cannot rename a column that will be deleted: {path_from}")

    if updated := self._updates.get(field_from.field_id):
        self._updates[field_from.field_id] = NestedField(
            field_id=updated.field_id,
            name=new_name,
            field_type=updated.field_type,
            doc=updated.doc,
            required=updated.required,
        )
    else:
        self._updates[field_from.field_id] = NestedField(
            field_id=field_from.field_id,
            name=new_name,
            field_type=field_from.field_type,
            doc=field_from.doc,
            required=field_from.required,
        )

    # Lookup the field because of casing
    from_field_correct_casing = self._schema.find_column_name(field_from.field_id)
    if from_field_correct_casing in self._identifier_field_names:
        self._identifier_field_names.remove(from_field_correct_casing)
        new_identifier_path = f"{from_field_correct_casing[:-len(field_from.name)]}{new_name}"
        self._identifier_field_names.add(new_identifier_path)

    return self

update_column(path, field_type=None, required=None, doc=None)

Update the type of column.

Parameters:

Name Type Description Default
path Union[str, Tuple[str, ...]]

The path to the field.

required
field_type Optional[IcebergType]

The new type

None
required Optional[bool]

If the field should be required

None
doc Optional[str]

Documentation describing the column

None

Returns:

Type Description
UpdateSchema

The UpdateSchema with the type update staged.

Source code in pyiceberg/table/__init__.py
def update_column(
    self,
    path: Union[str, Tuple[str, ...]],
    field_type: Optional[IcebergType] = None,
    required: Optional[bool] = None,
    doc: Optional[str] = None,
) -> UpdateSchema:
    """Update the type of column.

    Args:
        path: The path to the field.
        field_type: The new type
        required: If the field should be required
        doc: Documentation describing the column

    Returns:
        The UpdateSchema with the type update staged.
    """
    path = (path,) if isinstance(path, str) else path
    full_name = ".".join(path)

    if field_type is None and required is None and doc is None:
        return self

    field = self._schema.find_field(full_name, self._case_sensitive)

    if field.field_id in self._deletes:
        raise ValueError(f"Cannot update a column that will be deleted: {full_name}")

    if field_type is not None:
        if not field.field_type.is_primitive:
            raise ValidationError(f"Cannot change column type: {field.field_type} is not a primitive")

        if not self._allow_incompatible_changes and field.field_type != field_type:
            try:
                promote(field.field_type, field_type)
            except ResolveError as e:
                raise ValidationError(f"Cannot change column type: {full_name}: {field.field_type} -> {field_type}") from e

    if updated := self._updates.get(field.field_id):
        self._updates[field.field_id] = NestedField(
            field_id=updated.field_id,
            name=updated.name,
            field_type=field_type or updated.field_type,
            doc=doc or updated.doc,
            required=updated.required,
        )
    else:
        self._updates[field.field_id] = NestedField(
            field_id=field.field_id,
            name=field.name,
            field_type=field_type or field.field_type,
            doc=doc or field.doc,
            required=field.required,
        )

    if required is not None:
        self._set_column_requirement(path, required=required)

    return self

update_table_metadata(base_metadata, updates)

Update the table metadata with the given updates in one transaction.

Parameters:

Name Type Description Default
base_metadata TableMetadata

The base metadata to be updated.

required
updates Tuple[TableUpdate, ...]

The updates in one transaction.

required

Returns:

Type Description
TableMetadata

The metadata with the updates applied.

Source code in pyiceberg/table/__init__.py
def update_table_metadata(base_metadata: TableMetadata, updates: Tuple[TableUpdate, ...]) -> TableMetadata:
    """Update the table metadata with the given updates in one transaction.

    Args:
        base_metadata: The base metadata to be updated.
        updates: The updates in one transaction.

    Returns:
        The metadata with the updates applied.
    """
    context = _TableMetadataUpdateContext()
    new_metadata = base_metadata

    for update in updates:
        new_metadata = _apply_table_update(update, new_metadata, context)

    return new_metadata.model_copy(deep=True)