Skip to content


Accessor dataclass

An accessor for a specific position in a container that implements the StructProtocol.

Source code in pyiceberg/
@dataclass(init=True, eq=True, frozen=True)
class Accessor:
    """An accessor for a specific position in a container that implements the StructProtocol."""

    position: int
    inner: Optional[Accessor] = None

    def __str__(self) -> str:
        """Return the string representation of the Accessor class."""
        return f"Accessor(position={self.position},inner={self.inner})"

    def __repr__(self) -> str:
        """Return the string representation of the Accessor class."""
        return self.__str__()

    def get(self, container: StructProtocol) -> Any:
        """Return the value at self.position in `container`.

            container (StructProtocol): A container to access at position `self.position`.

            Any: The value at position `self.position` in the container.
        pos = self.position
        val = container[pos]
        inner = self
        while inner.inner:
            inner = inner.inner
            val = val[inner.position]

        return val


Return the string representation of the Accessor class.

Source code in pyiceberg/
def __repr__(self) -> str:
    """Return the string representation of the Accessor class."""
    return self.__str__()


Return the string representation of the Accessor class.

Source code in pyiceberg/
def __str__(self) -> str:
    """Return the string representation of the Accessor class."""
    return f"Accessor(position={self.position},inner={self.inner})"


Return the value at self.position in container.


Name Type Description Default
container StructProtocol

A container to access at position self.position.



Name Type Description
Any Any

The value at position self.position in the container.

Source code in pyiceberg/
def get(self, container: StructProtocol) -> Any:
    """Return the value at self.position in `container`.

        container (StructProtocol): A container to access at position `self.position`.

        Any: The value at position `self.position` in the container.
    pos = self.position
    val = container[pos]
    inner = self
    while inner.inner:
        inner = inner.inner
        val = val[inner.position]

    return val


Bases: Generic[P], ABC

Source code in pyiceberg/
class PartnerAccessor(Generic[P], ABC):
    def schema_partner(self, partner: Optional[P]) -> Optional[P]:
        """Return the equivalent of the schema as a struct."""

    def field_partner(self, partner_struct: Optional[P], field_id: int, field_name: str) -> Optional[P]:
        """Return the equivalent struct field by name or id in the partner struct."""

    def list_element_partner(self, partner_list: Optional[P]) -> Optional[P]:
        """Return the equivalent list element in the partner list."""

    def map_key_partner(self, partner_map: Optional[P]) -> Optional[P]:
        """Return the equivalent map key in the partner map."""

    def map_value_partner(self, partner_map: Optional[P]) -> Optional[P]:
        """Return the equivalent map value in the partner map."""

field_partner(partner_struct, field_id, field_name) abstractmethod

Return the equivalent struct field by name or id in the partner struct.

Source code in pyiceberg/
def field_partner(self, partner_struct: Optional[P], field_id: int, field_name: str) -> Optional[P]:
    """Return the equivalent struct field by name or id in the partner struct."""

list_element_partner(partner_list) abstractmethod

Return the equivalent list element in the partner list.

Source code in pyiceberg/
def list_element_partner(self, partner_list: Optional[P]) -> Optional[P]:
    """Return the equivalent list element in the partner list."""

map_key_partner(partner_map) abstractmethod

Return the equivalent map key in the partner map.

Source code in pyiceberg/
def map_key_partner(self, partner_map: Optional[P]) -> Optional[P]:
    """Return the equivalent map key in the partner map."""

map_value_partner(partner_map) abstractmethod

Return the equivalent map value in the partner map.

Source code in pyiceberg/
def map_value_partner(self, partner_map: Optional[P]) -> Optional[P]:
    """Return the equivalent map value in the partner map."""

schema_partner(partner) abstractmethod

Return the equivalent of the schema as a struct.

Source code in pyiceberg/
def schema_partner(self, partner: Optional[P]) -> Optional[P]:
    """Return the equivalent of the schema as a struct."""


Bases: Generic[T], ABC

Source code in pyiceberg/
class PreOrderSchemaVisitor(Generic[T], ABC):
    def schema(self, schema: Schema, struct_result: Callable[[], T]) -> T:
        """Visit a Schema."""

    def struct(self, struct: StructType, field_results: List[Callable[[], T]]) -> T:
        """Visit a StructType."""

    def field(self, field: NestedField, field_result: Callable[[], T]) -> T:
        """Visit a NestedField."""

    def list(self, list_type: ListType, element_result: Callable[[], T]) -> T:
        """Visit a ListType."""

    def map(self, map_type: MapType, key_result: Callable[[], T], value_result: Callable[[], T]) -> T:
        """Visit a MapType."""

    def primitive(self, primitive: PrimitiveType) -> T:
        """Visit a PrimitiveType."""

field(field, field_result) abstractmethod

Visit a NestedField.

Source code in pyiceberg/
def field(self, field: NestedField, field_result: Callable[[], T]) -> T:
    """Visit a NestedField."""

list(list_type, element_result) abstractmethod

Visit a ListType.

Source code in pyiceberg/
def list(self, list_type: ListType, element_result: Callable[[], T]) -> T:
    """Visit a ListType."""

map(map_type, key_result, value_result) abstractmethod

Visit a MapType.

Source code in pyiceberg/
def map(self, map_type: MapType, key_result: Callable[[], T], value_result: Callable[[], T]) -> T:
    """Visit a MapType."""

primitive(primitive) abstractmethod

Visit a PrimitiveType.

Source code in pyiceberg/
def primitive(self, primitive: PrimitiveType) -> T:
    """Visit a PrimitiveType."""

schema(schema, struct_result) abstractmethod

Visit a Schema.

Source code in pyiceberg/
def schema(self, schema: Schema, struct_result: Callable[[], T]) -> T:
    """Visit a Schema."""

struct(struct, field_results) abstractmethod

Visit a StructType.

Source code in pyiceberg/
def struct(self, struct: StructType, field_results: List[Callable[[], T]]) -> T:
    """Visit a StructType."""


Bases: SchemaWithPartnerVisitor[P, T]

Source code in pyiceberg/
class PrimitiveWithPartnerVisitor(SchemaWithPartnerVisitor[P, T]):
    def primitive(self, primitive: PrimitiveType, primitive_partner: Optional[P]) -> T:
        """Visit a PrimitiveType."""
        if isinstance(primitive, BooleanType):
            return self.visit_boolean(primitive, primitive_partner)
        elif isinstance(primitive, IntegerType):
            return self.visit_integer(primitive, primitive_partner)
        elif isinstance(primitive, LongType):
            return self.visit_long(primitive, primitive_partner)
        elif isinstance(primitive, FloatType):
            return self.visit_float(primitive, primitive_partner)
        elif isinstance(primitive, DoubleType):
            return self.visit_double(primitive, primitive_partner)
        elif isinstance(primitive, DecimalType):
            return self.visit_decimal(primitive, primitive_partner)
        elif isinstance(primitive, DateType):
            return self.visit_date(primitive, primitive_partner)
        elif isinstance(primitive, TimeType):
            return self.visit_time(primitive, primitive_partner)
        elif isinstance(primitive, TimestampType):
            return self.visit_timestamp(primitive, primitive_partner)
        elif isinstance(primitive, TimestamptzType):
            return self.visit_timestamptz(primitive, primitive_partner)
        elif isinstance(primitive, StringType):
            return self.visit_string(primitive, primitive_partner)
        elif isinstance(primitive, UUIDType):
            return self.visit_uuid(primitive, primitive_partner)
        elif isinstance(primitive, FixedType):
            return self.visit_fixed(primitive, primitive_partner)
        elif isinstance(primitive, BinaryType):
            return self.visit_binary(primitive, primitive_partner)
            raise ValueError(f"Unknown type: {primitive}")

    def visit_boolean(self, boolean_type: BooleanType, partner: Optional[P]) -> T:
        """Visit a BooleanType."""

    def visit_integer(self, integer_type: IntegerType, partner: Optional[P]) -> T:
        """Visit a IntegerType."""

    def visit_long(self, long_type: LongType, partner: Optional[P]) -> T:
        """Visit a LongType."""

    def visit_float(self, float_type: FloatType, partner: Optional[P]) -> T:
        """Visit a FloatType."""

    def visit_double(self, double_type: DoubleType, partner: Optional[P]) -> T:
        """Visit a DoubleType."""

    def visit_decimal(self, decimal_type: DecimalType, partner: Optional[P]) -> T:
        """Visit a DecimalType."""

    def visit_date(self, date_type: DateType, partner: Optional[P]) -> T:
        """Visit a DecimalType."""

    def visit_time(self, time_type: TimeType, partner: Optional[P]) -> T:
        """Visit a DecimalType."""

    def visit_timestamp(self, timestamp_type: TimestampType, partner: Optional[P]) -> T:
        """Visit a TimestampType."""

    def visit_timestamptz(self, timestamptz_type: TimestamptzType, partner: Optional[P]) -> T:
        """Visit a TimestamptzType."""

    def visit_string(self, string_type: StringType, partner: Optional[P]) -> T:
        """Visit a StringType."""

    def visit_uuid(self, uuid_type: UUIDType, partner: Optional[P]) -> T:
        """Visit a UUIDType."""

    def visit_fixed(self, fixed_type: FixedType, partner: Optional[P]) -> T:
        """Visit a FixedType."""

    def visit_binary(self, binary_type: BinaryType, partner: Optional[P]) -> T:
        """Visit a BinaryType."""

primitive(primitive, primitive_partner)

Visit a PrimitiveType.

Source code in pyiceberg/
def primitive(self, primitive: PrimitiveType, primitive_partner: Optional[P]) -> T:
    """Visit a PrimitiveType."""
    if isinstance(primitive, BooleanType):
        return self.visit_boolean(primitive, primitive_partner)
    elif isinstance(primitive, IntegerType):
        return self.visit_integer(primitive, primitive_partner)
    elif isinstance(primitive, LongType):
        return self.visit_long(primitive, primitive_partner)
    elif isinstance(primitive, FloatType):
        return self.visit_float(primitive, primitive_partner)
    elif isinstance(primitive, DoubleType):
        return self.visit_double(primitive, primitive_partner)
    elif isinstance(primitive, DecimalType):
        return self.visit_decimal(primitive, primitive_partner)
    elif isinstance(primitive, DateType):
        return self.visit_date(primitive, primitive_partner)
    elif isinstance(primitive, TimeType):
        return self.visit_time(primitive, primitive_partner)
    elif isinstance(primitive, TimestampType):
        return self.visit_timestamp(primitive, primitive_partner)
    elif isinstance(primitive, TimestamptzType):
        return self.visit_timestamptz(primitive, primitive_partner)
    elif isinstance(primitive, StringType):
        return self.visit_string(primitive, primitive_partner)
    elif isinstance(primitive, UUIDType):
        return self.visit_uuid(primitive, primitive_partner)
    elif isinstance(primitive, FixedType):
        return self.visit_fixed(primitive, primitive_partner)
    elif isinstance(primitive, BinaryType):
        return self.visit_binary(primitive, primitive_partner)
        raise ValueError(f"Unknown type: {primitive}")

visit_binary(binary_type, partner) abstractmethod

Visit a BinaryType.

Source code in pyiceberg/
def visit_binary(self, binary_type: BinaryType, partner: Optional[P]) -> T:
    """Visit a BinaryType."""

visit_boolean(boolean_type, partner) abstractmethod

Visit a BooleanType.

Source code in pyiceberg/
def visit_boolean(self, boolean_type: BooleanType, partner: Optional[P]) -> T:
    """Visit a BooleanType."""

visit_date(date_type, partner) abstractmethod

Visit a DecimalType.

Source code in pyiceberg/
def visit_date(self, date_type: DateType, partner: Optional[P]) -> T:
    """Visit a DecimalType."""

visit_decimal(decimal_type, partner) abstractmethod

Visit a DecimalType.

Source code in pyiceberg/
def visit_decimal(self, decimal_type: DecimalType, partner: Optional[P]) -> T:
    """Visit a DecimalType."""

visit_double(double_type, partner) abstractmethod

Visit a DoubleType.

Source code in pyiceberg/
def visit_double(self, double_type: DoubleType, partner: Optional[P]) -> T:
    """Visit a DoubleType."""

visit_fixed(fixed_type, partner) abstractmethod

Visit a FixedType.

Source code in pyiceberg/
def visit_fixed(self, fixed_type: FixedType, partner: Optional[P]) -> T:
    """Visit a FixedType."""

visit_float(float_type, partner) abstractmethod

Visit a FloatType.

Source code in pyiceberg/
def visit_float(self, float_type: FloatType, partner: Optional[P]) -> T:
    """Visit a FloatType."""

visit_integer(integer_type, partner) abstractmethod

Visit a IntegerType.

Source code in pyiceberg/
def visit_integer(self, integer_type: IntegerType, partner: Optional[P]) -> T:
    """Visit a IntegerType."""

visit_long(long_type, partner) abstractmethod

Visit a LongType.

Source code in pyiceberg/
def visit_long(self, long_type: LongType, partner: Optional[P]) -> T:
    """Visit a LongType."""

visit_string(string_type, partner) abstractmethod

Visit a StringType.

Source code in pyiceberg/
def visit_string(self, string_type: StringType, partner: Optional[P]) -> T:
    """Visit a StringType."""

visit_time(time_type, partner) abstractmethod

Visit a DecimalType.

Source code in pyiceberg/
def visit_time(self, time_type: TimeType, partner: Optional[P]) -> T:
    """Visit a DecimalType."""

visit_timestamp(timestamp_type, partner) abstractmethod

Visit a TimestampType.

Source code in pyiceberg/
def visit_timestamp(self, timestamp_type: TimestampType, partner: Optional[P]) -> T:
    """Visit a TimestampType."""

visit_timestamptz(timestamptz_type, partner) abstractmethod

Visit a TimestamptzType.

Source code in pyiceberg/
def visit_timestamptz(self, timestamptz_type: TimestamptzType, partner: Optional[P]) -> T:
    """Visit a TimestamptzType."""

visit_uuid(uuid_type, partner) abstractmethod

Visit a UUIDType.

Source code in pyiceberg/
def visit_uuid(self, uuid_type: UUIDType, partner: Optional[P]) -> T:
    """Visit a UUIDType."""


Bases: IcebergBaseModel

A table Schema.


from pyiceberg import schema from pyiceberg import types

Source code in pyiceberg/
class Schema(IcebergBaseModel):
    """A table Schema.

        >>> from pyiceberg import schema
        >>> from pyiceberg import types

    type: Literal["struct"] = "struct"
    fields: Tuple[NestedField, ...] = Field(default_factory=tuple)
    schema_id: int = Field(alias="schema-id", default=INITIAL_SCHEMA_ID)
    identifier_field_ids: List[int] = Field(alias="identifier-field-ids", default_factory=list)

    _name_to_id: Dict[str, int] = PrivateAttr()

    def __init__(self, *fields: NestedField, **data: Any):
        if fields:
            data["fields"] = fields
        self._name_to_id = index_by_name(self)

    def __str__(self) -> str:
        """Return the string representation of the Schema class."""
        return "table {\n" + "\n".join(["  " + str(field) for field in self.columns]) + "\n}"

    def __repr__(self) -> str:
        """Return the string representation of the Schema class."""
        return f"Schema({', '.join(repr(column) for column in self.columns)}, schema_id={self.schema_id}, identifier_field_ids={self.identifier_field_ids})"

    def __len__(self) -> int:
        """Return the length of an instance of the Literal class."""
        return len(self.fields)

    def __eq__(self, other: Any) -> bool:
        """Return the equality of two instances of the Schema class."""
        if not other:
            return False

        if not isinstance(other, Schema):
            return False

        if len(self.columns) != len(other.columns):
            return False

        identifier_field_ids_is_equal = self.identifier_field_ids == other.identifier_field_ids
        schema_is_equal = all(lhs == rhs for lhs, rhs in zip(self.columns, other.columns))

        return identifier_field_ids_is_equal and schema_is_equal

    def check_schema(self) -> Schema:
        if self.identifier_field_ids:
            for field_id in self.identifier_field_ids:

        return self

    def columns(self) -> Tuple[NestedField, ...]:
        """A tuple of the top-level fields."""
        return self.fields

    def _lazy_id_to_field(self) -> Dict[int, NestedField]:
        """Return an index of field ID to NestedField instance.

        This is calculated once when called for the first time. Subsequent calls to this method will use a cached index.
        return index_by_id(self)

    def _lazy_id_to_parent(self) -> Dict[int, int]:
        """Returns an index of field ID to parent field IDs.

        This is calculated once when called for the first time. Subsequent calls to this method will use a cached index.
        return _index_parents(self)

    def _lazy_name_to_id_lower(self) -> Dict[str, int]:
        """Return an index of lower-case field names to field IDs.

        This is calculated once when called for the first time. Subsequent calls to this method will use a cached index.
        return {name.lower(): field_id for name, field_id in self._name_to_id.items()}

    def _lazy_id_to_name(self) -> Dict[int, str]:
        """Return an index of field ID to full name.

        This is calculated once when called for the first time. Subsequent calls to this method will use a cached index.
        return index_name_by_id(self)

    def _lazy_id_to_accessor(self) -> Dict[int, Accessor]:
        """Return an index of field ID to accessor.

        This is calculated once when called for the first time. Subsequent calls to this method will use a cached index.
        return build_position_accessors(self)

    def as_struct(self) -> StructType:
        """Return the schema as a struct."""
        return StructType(*self.fields)

    def as_arrow(self) -> "pa.Schema":
        """Return the schema as an Arrow schema."""
        from import schema_to_pyarrow

        return schema_to_pyarrow(self)

    def find_field(self, name_or_id: Union[str, int], case_sensitive: bool = True) -> NestedField:
        """Find a field using a field name or field ID.

            name_or_id (Union[str, int]): Either a field name or a field ID.
            case_sensitive (bool, optional): Whether to perform a case-sensitive lookup using a field name. Defaults to True.

            ValueError: When the value cannot be found.

            NestedField: The matched NestedField.
        if isinstance(name_or_id, int):
            if name_or_id not in self._lazy_id_to_field:
                raise ValueError(f"Could not find field with id: {name_or_id}")
            return self._lazy_id_to_field[name_or_id]

        if case_sensitive:
            field_id = self._name_to_id.get(name_or_id)
            field_id = self._lazy_name_to_id_lower.get(name_or_id.lower())

        if field_id is None:
            raise ValueError(f"Could not find field with name {name_or_id}, case_sensitive={case_sensitive}")

        return self._lazy_id_to_field[field_id]

    def find_type(self, name_or_id: Union[str, int], case_sensitive: bool = True) -> IcebergType:
        """Find a field type using a field name or field ID.

            name_or_id (Union[str, int]): Either a field name or a field ID.
            case_sensitive (bool, optional): Whether to perform a case-sensitive lookup using a field name. Defaults to True.

            NestedField: The type of the matched NestedField.
        field = self.find_field(name_or_id=name_or_id, case_sensitive=case_sensitive)
        if not field:
            raise ValueError(f"Could not find field with name or id {name_or_id}, case_sensitive={case_sensitive}")
        return field.field_type

    def highest_field_id(self) -> int:
        return max(self._lazy_id_to_name.keys(), default=0)

    def name_mapping(self) -> NameMapping:
        from pyiceberg.table.name_mapping import create_mapping_from_schema

        return create_mapping_from_schema(self)

    def find_column_name(self, column_id: int) -> Optional[str]:
        """Find a column name given a column ID.

            column_id (int): The ID of the column.

            str: The column name (or None if the column ID cannot be found).
        return self._lazy_id_to_name.get(column_id)

    def column_names(self) -> List[str]:
        Return a list of all the column names, including nested fields.

        Excludes short names.

            List[str]: The column names.
        return list(self._lazy_id_to_name.values())

    def accessor_for_field(self, field_id: int) -> Accessor:
        """Find a schema position accessor given a field ID.

            field_id (int): The ID of the field.

            ValueError: When the value cannot be found.

            Accessor: An accessor for the given field ID.
        if field_id not in self._lazy_id_to_accessor:
            raise ValueError(f"Could not find accessor for field with id: {field_id}")

        return self._lazy_id_to_accessor[field_id]

    def identifier_field_names(self) -> Set[str]:
        """Return the names of the identifier fields.

            Set of names of the identifier fields
        ids = set()
        for field_id in self.identifier_field_ids:
            column_name = self.find_column_name(field_id)
            if column_name is None:
                raise ValueError(f"Could not find identifier column id: {field_id}")

        return ids

    def select(self, *names: str, case_sensitive: bool = True) -> Schema:
        """Return a new schema instance pruned to a subset of columns.

            names (List[str]): A list of column names.
            case_sensitive (bool, optional): Whether to perform a case-sensitive lookup for each column name. Defaults to True.

            Schema: A new schema with pruned columns.

            ValueError: If a column is selected that doesn't exist.
            if case_sensitive:
                ids = {self._name_to_id[name] for name in names}
                ids = {self._lazy_name_to_id_lower[name.lower()] for name in names}
        except KeyError as e:
            raise ValueError(f"Could not find column: {e}") from e

        return prune_columns(self, ids)

    def field_ids(self) -> Set[int]:
        """Return the IDs of the current schema."""
        return set(self._name_to_id.values())

    def _validate_identifier_field(self, field_id: int) -> None:
        """Validate that the field with the given ID is a valid identifier field.

          field_id: The ID of the field to validate.

          ValueError: If the field is not valid.
        field = self.find_field(field_id)
        if not field.field_type.is_primitive:
            raise ValueError(f"Identifier field {field_id} invalid: not a primitive type field")

        if not field.required:
            raise ValueError(f"Identifier field {field_id} invalid: not a required field")

        if isinstance(field.field_type, (DoubleType, FloatType)):
            raise ValueError(f"Identifier field {field_id} invalid: must not be float or double field")

        # Check whether the nested field is in a chain of required struct fields
        # Exploring from root for better error message for list and map types
        parent_id = self._lazy_id_to_parent.get(field.field_id)
        fields: List[int] = []
        while parent_id is not None:
            parent_id = self._lazy_id_to_parent.get(parent_id)

        while fields:
            parent = self.find_field(fields.pop())
            if not parent.field_type.is_struct:
                raise ValueError(f"Cannot add field {} as an identifier field: must not be nested in {parent}")

            if not parent.required:
                raise ValueError(
                    f"Cannot add field {} as an identifier field: must not be nested in an optional field {parent}"

_lazy_id_to_accessor cached property

Return an index of field ID to accessor.

This is calculated once when called for the first time. Subsequent calls to this method will use a cached index.

_lazy_id_to_field cached property

Return an index of field ID to NestedField instance.

This is calculated once when called for the first time. Subsequent calls to this method will use a cached index.

_lazy_id_to_name cached property

Return an index of field ID to full name.

This is calculated once when called for the first time. Subsequent calls to this method will use a cached index.

_lazy_id_to_parent cached property

Returns an index of field ID to parent field IDs.

This is calculated once when called for the first time. Subsequent calls to this method will use a cached index.

_lazy_name_to_id_lower cached property

Return an index of lower-case field names to field IDs.

This is calculated once when called for the first time. Subsequent calls to this method will use a cached index.

column_names property

Return a list of all the column names, including nested fields.

Excludes short names.


Type Description

List[str]: The column names.

columns property

A tuple of the top-level fields.

field_ids property

Return the IDs of the current schema.


Return the equality of two instances of the Schema class.

Source code in pyiceberg/
def __eq__(self, other: Any) -> bool:
    """Return the equality of two instances of the Schema class."""
    if not other:
        return False

    if not isinstance(other, Schema):
        return False

    if len(self.columns) != len(other.columns):
        return False

    identifier_field_ids_is_equal = self.identifier_field_ids == other.identifier_field_ids
    schema_is_equal = all(lhs == rhs for lhs, rhs in zip(self.columns, other.columns))

    return identifier_field_ids_is_equal and schema_is_equal


Return the length of an instance of the Literal class.

Source code in pyiceberg/
def __len__(self) -> int:
    """Return the length of an instance of the Literal class."""
    return len(self.fields)


Return the string representation of the Schema class.

Source code in pyiceberg/
def __repr__(self) -> str:
    """Return the string representation of the Schema class."""
    return f"Schema({', '.join(repr(column) for column in self.columns)}, schema_id={self.schema_id}, identifier_field_ids={self.identifier_field_ids})"


Return the string representation of the Schema class.

Source code in pyiceberg/
def __str__(self) -> str:
    """Return the string representation of the Schema class."""
    return "table {\n" + "\n".join(["  " + str(field) for field in self.columns]) + "\n}"


Validate that the field with the given ID is a valid identifier field.


Name Type Description Default
field_id int

The ID of the field to validate.



Type Description

If the field is not valid.

Source code in pyiceberg/
def _validate_identifier_field(self, field_id: int) -> None:
    """Validate that the field with the given ID is a valid identifier field.

      field_id: The ID of the field to validate.

      ValueError: If the field is not valid.
    field = self.find_field(field_id)
    if not field.field_type.is_primitive:
        raise ValueError(f"Identifier field {field_id} invalid: not a primitive type field")

    if not field.required:
        raise ValueError(f"Identifier field {field_id} invalid: not a required field")

    if isinstance(field.field_type, (DoubleType, FloatType)):
        raise ValueError(f"Identifier field {field_id} invalid: must not be float or double field")

    # Check whether the nested field is in a chain of required struct fields
    # Exploring from root for better error message for list and map types
    parent_id = self._lazy_id_to_parent.get(field.field_id)
    fields: List[int] = []
    while parent_id is not None:
        parent_id = self._lazy_id_to_parent.get(parent_id)

    while fields:
        parent = self.find_field(fields.pop())
        if not parent.field_type.is_struct:
            raise ValueError(f"Cannot add field {} as an identifier field: must not be nested in {parent}")

        if not parent.required:
            raise ValueError(
                f"Cannot add field {} as an identifier field: must not be nested in an optional field {parent}"


Find a schema position accessor given a field ID.


Name Type Description Default
field_id int

The ID of the field.



Type Description

When the value cannot be found.


Name Type Description
Accessor Accessor

An accessor for the given field ID.

Source code in pyiceberg/
def accessor_for_field(self, field_id: int) -> Accessor:
    """Find a schema position accessor given a field ID.

        field_id (int): The ID of the field.

        ValueError: When the value cannot be found.

        Accessor: An accessor for the given field ID.
    if field_id not in self._lazy_id_to_accessor:
        raise ValueError(f"Could not find accessor for field with id: {field_id}")

    return self._lazy_id_to_accessor[field_id]


Return the schema as an Arrow schema.

Source code in pyiceberg/
def as_arrow(self) -> "pa.Schema":
    """Return the schema as an Arrow schema."""
    from import schema_to_pyarrow

    return schema_to_pyarrow(self)


Return the schema as a struct.

Source code in pyiceberg/
def as_struct(self) -> StructType:
    """Return the schema as a struct."""
    return StructType(*self.fields)


Find a column name given a column ID.


Name Type Description Default
column_id int

The ID of the column.



Name Type Description
str Optional[str]

The column name (or None if the column ID cannot be found).

Source code in pyiceberg/
def find_column_name(self, column_id: int) -> Optional[str]:
    """Find a column name given a column ID.

        column_id (int): The ID of the column.

        str: The column name (or None if the column ID cannot be found).
    return self._lazy_id_to_name.get(column_id)

find_field(name_or_id, case_sensitive=True)

Find a field using a field name or field ID.


Name Type Description Default
name_or_id Union[str, int]

Either a field name or a field ID.

case_sensitive bool

Whether to perform a case-sensitive lookup using a field name. Defaults to True.



Type Description

When the value cannot be found.


Name Type Description
NestedField NestedField

The matched NestedField.

Source code in pyiceberg/
def find_field(self, name_or_id: Union[str, int], case_sensitive: bool = True) -> NestedField:
    """Find a field using a field name or field ID.

        name_or_id (Union[str, int]): Either a field name or a field ID.
        case_sensitive (bool, optional): Whether to perform a case-sensitive lookup using a field name. Defaults to True.

        ValueError: When the value cannot be found.

        NestedField: The matched NestedField.
    if isinstance(name_or_id, int):
        if name_or_id not in self._lazy_id_to_field:
            raise ValueError(f"Could not find field with id: {name_or_id}")
        return self._lazy_id_to_field[name_or_id]

    if case_sensitive:
        field_id = self._name_to_id.get(name_or_id)
        field_id = self._lazy_name_to_id_lower.get(name_or_id.lower())

    if field_id is None:
        raise ValueError(f"Could not find field with name {name_or_id}, case_sensitive={case_sensitive}")

    return self._lazy_id_to_field[field_id]

find_type(name_or_id, case_sensitive=True)

Find a field type using a field name or field ID.


Name Type Description Default
name_or_id Union[str, int]

Either a field name or a field ID.

case_sensitive bool

Whether to perform a case-sensitive lookup using a field name. Defaults to True.



Name Type Description
NestedField IcebergType

The type of the matched NestedField.

Source code in pyiceberg/
def find_type(self, name_or_id: Union[str, int], case_sensitive: bool = True) -> IcebergType:
    """Find a field type using a field name or field ID.

        name_or_id (Union[str, int]): Either a field name or a field ID.
        case_sensitive (bool, optional): Whether to perform a case-sensitive lookup using a field name. Defaults to True.

        NestedField: The type of the matched NestedField.
    field = self.find_field(name_or_id=name_or_id, case_sensitive=case_sensitive)
    if not field:
        raise ValueError(f"Could not find field with name or id {name_or_id}, case_sensitive={case_sensitive}")
    return field.field_type


Return the names of the identifier fields.


Type Description

Set of names of the identifier fields

Source code in pyiceberg/
def identifier_field_names(self) -> Set[str]:
    """Return the names of the identifier fields.

        Set of names of the identifier fields
    ids = set()
    for field_id in self.identifier_field_ids:
        column_name = self.find_column_name(field_id)
        if column_name is None:
            raise ValueError(f"Could not find identifier column id: {field_id}")

    return ids

select(*names, case_sensitive=True)

Return a new schema instance pruned to a subset of columns.


Name Type Description Default
names List[str]

A list of column names.

case_sensitive bool

Whether to perform a case-sensitive lookup for each column name. Defaults to True.



Name Type Description
Schema Schema

A new schema with pruned columns.


Type Description

If a column is selected that doesn't exist.

Source code in pyiceberg/
def select(self, *names: str, case_sensitive: bool = True) -> Schema:
    """Return a new schema instance pruned to a subset of columns.

        names (List[str]): A list of column names.
        case_sensitive (bool, optional): Whether to perform a case-sensitive lookup for each column name. Defaults to True.

        Schema: A new schema with pruned columns.

        ValueError: If a column is selected that doesn't exist.
        if case_sensitive:
            ids = {self._name_to_id[name] for name in names}
            ids = {self._lazy_name_to_id_lower[name.lower()] for name in names}
    except KeyError as e:
        raise ValueError(f"Could not find column: {e}") from e

    return prune_columns(self, ids)


Bases: Generic[T], ABC

Source code in pyiceberg/
class SchemaVisitor(Generic[T], ABC):
    def before_field(self, field: NestedField) -> None:
        """Override this method to perform an action immediately before visiting a field."""

    def after_field(self, field: NestedField) -> None:
        """Override this method to perform an action immediately after visiting a field."""

    def before_list_element(self, element: NestedField) -> None:
        """Override this method to perform an action immediately before visiting an element within a ListType."""

    def after_list_element(self, element: NestedField) -> None:
        """Override this method to perform an action immediately after visiting an element within a ListType."""

    def before_map_key(self, key: NestedField) -> None:
        """Override this method to perform an action immediately before visiting a key within a MapType."""

    def after_map_key(self, key: NestedField) -> None:
        """Override this method to perform an action immediately after visiting a key within a MapType."""

    def before_map_value(self, value: NestedField) -> None:
        """Override this method to perform an action immediately before visiting a value within a MapType."""

    def after_map_value(self, value: NestedField) -> None:
        """Override this method to perform an action immediately after visiting a value within a MapType."""

    def schema(self, schema: Schema, struct_result: T) -> T:
        """Visit a Schema."""

    def struct(self, struct: StructType, field_results: List[T]) -> T:
        """Visit a StructType."""

    def field(self, field: NestedField, field_result: T) -> T:
        """Visit a NestedField."""

    def list(self, list_type: ListType, element_result: T) -> T:
        """Visit a ListType."""

    def map(self, map_type: MapType, key_result: T, value_result: T) -> T:
        """Visit a MapType."""

    def primitive(self, primitive: PrimitiveType) -> T:
        """Visit a PrimitiveType."""


Override this method to perform an action immediately after visiting a field.

Source code in pyiceberg/
def after_field(self, field: NestedField) -> None:
    """Override this method to perform an action immediately after visiting a field."""


Override this method to perform an action immediately after visiting an element within a ListType.

Source code in pyiceberg/
def after_list_element(self, element: NestedField) -> None:
    """Override this method to perform an action immediately after visiting an element within a ListType."""


Override this method to perform an action immediately after visiting a key within a MapType.

Source code in pyiceberg/
def after_map_key(self, key: NestedField) -> None:
    """Override this method to perform an action immediately after visiting a key within a MapType."""


Override this method to perform an action immediately after visiting a value within a MapType.

Source code in pyiceberg/
def after_map_value(self, value: NestedField) -> None:
    """Override this method to perform an action immediately after visiting a value within a MapType."""


Override this method to perform an action immediately before visiting a field.

Source code in pyiceberg/
def before_field(self, field: NestedField) -> None:
    """Override this method to perform an action immediately before visiting a field."""


Override this method to perform an action immediately before visiting an element within a ListType.

Source code in pyiceberg/
def before_list_element(self, element: NestedField) -> None:
    """Override this method to perform an action immediately before visiting an element within a ListType."""


Override this method to perform an action immediately before visiting a key within a MapType.

Source code in pyiceberg/
def before_map_key(self, key: NestedField) -> None:
    """Override this method to perform an action immediately before visiting a key within a MapType."""


Override this method to perform an action immediately before visiting a value within a MapType.

Source code in pyiceberg/
def before_map_value(self, value: NestedField) -> None:
    """Override this method to perform an action immediately before visiting a value within a MapType."""

field(field, field_result) abstractmethod

Visit a NestedField.

Source code in pyiceberg/
def field(self, field: NestedField, field_result: T) -> T:
    """Visit a NestedField."""

list(list_type, element_result) abstractmethod

Visit a ListType.

Source code in pyiceberg/
def list(self, list_type: ListType, element_result: T) -> T:
    """Visit a ListType."""

map(map_type, key_result, value_result) abstractmethod

Visit a MapType.

Source code in pyiceberg/
def map(self, map_type: MapType, key_result: T, value_result: T) -> T:
    """Visit a MapType."""

primitive(primitive) abstractmethod

Visit a PrimitiveType.

Source code in pyiceberg/
def primitive(self, primitive: PrimitiveType) -> T:
    """Visit a PrimitiveType."""

schema(schema, struct_result) abstractmethod

Visit a Schema.

Source code in pyiceberg/
def schema(self, schema: Schema, struct_result: T) -> T:
    """Visit a Schema."""

struct(struct, field_results) abstractmethod

Visit a StructType.

Source code in pyiceberg/
def struct(self, struct: StructType, field_results: List[T]) -> T:
    """Visit a StructType."""


Bases: SchemaVisitor[T], ABC

Source code in pyiceberg/
class SchemaVisitorPerPrimitiveType(SchemaVisitor[T], ABC):
    def primitive(self, primitive: PrimitiveType) -> T:
        """Visit a PrimitiveType."""
        if isinstance(primitive, FixedType):
            return self.visit_fixed(primitive)
        elif isinstance(primitive, DecimalType):
            return self.visit_decimal(primitive)
        elif isinstance(primitive, BooleanType):
            return self.visit_boolean(primitive)
        elif isinstance(primitive, IntegerType):
            return self.visit_integer(primitive)
        elif isinstance(primitive, LongType):
            return self.visit_long(primitive)
        elif isinstance(primitive, FloatType):
            return self.visit_float(primitive)
        elif isinstance(primitive, DoubleType):
            return self.visit_double(primitive)
        elif isinstance(primitive, DateType):
            return self.visit_date(primitive)
        elif isinstance(primitive, TimeType):
            return self.visit_time(primitive)
        elif isinstance(primitive, TimestampType):
            return self.visit_timestamp(primitive)
        elif isinstance(primitive, TimestamptzType):
            return self.visit_timestamptz(primitive)
        elif isinstance(primitive, StringType):
            return self.visit_string(primitive)
        elif isinstance(primitive, UUIDType):
            return self.visit_uuid(primitive)
        elif isinstance(primitive, BinaryType):
            return self.visit_binary(primitive)
            raise ValueError(f"Unknown type: {primitive}")

    def visit_fixed(self, fixed_type: FixedType) -> T:
        """Visit a FixedType."""

    def visit_decimal(self, decimal_type: DecimalType) -> T:
        """Visit a DecimalType."""

    def visit_boolean(self, boolean_type: BooleanType) -> T:
        """Visit a BooleanType."""

    def visit_integer(self, integer_type: IntegerType) -> T:
        """Visit a IntegerType."""

    def visit_long(self, long_type: LongType) -> T:
        """Visit a LongType."""

    def visit_float(self, float_type: FloatType) -> T:
        """Visit a FloatType."""

    def visit_double(self, double_type: DoubleType) -> T:
        """Visit a DoubleType."""

    def visit_date(self, date_type: DateType) -> T:
        """Visit a DecimalType."""

    def visit_time(self, time_type: TimeType) -> T:
        """Visit a DecimalType."""

    def visit_timestamp(self, timestamp_type: TimestampType) -> T:
        """Visit a TimestampType."""

    def visit_timestamptz(self, timestamptz_type: TimestamptzType) -> T:
        """Visit a TimestamptzType."""

    def visit_string(self, string_type: StringType) -> T:
        """Visit a StringType."""

    def visit_uuid(self, uuid_type: UUIDType) -> T:
        """Visit a UUIDType."""

    def visit_binary(self, binary_type: BinaryType) -> T:
        """Visit a BinaryType."""


Visit a PrimitiveType.

Source code in pyiceberg/
def primitive(self, primitive: PrimitiveType) -> T:
    """Visit a PrimitiveType."""
    if isinstance(primitive, FixedType):
        return self.visit_fixed(primitive)
    elif isinstance(primitive, DecimalType):
        return self.visit_decimal(primitive)
    elif isinstance(primitive, BooleanType):
        return self.visit_boolean(primitive)
    elif isinstance(primitive, IntegerType):
        return self.visit_integer(primitive)
    elif isinstance(primitive, LongType):
        return self.visit_long(primitive)
    elif isinstance(primitive, FloatType):
        return self.visit_float(primitive)
    elif isinstance(primitive, DoubleType):
        return self.visit_double(primitive)
    elif isinstance(primitive, DateType):
        return self.visit_date(primitive)
    elif isinstance(primitive, TimeType):
        return self.visit_time(primitive)
    elif isinstance(primitive, TimestampType):
        return self.visit_timestamp(primitive)
    elif isinstance(primitive, TimestamptzType):
        return self.visit_timestamptz(primitive)
    elif isinstance(primitive, StringType):
        return self.visit_string(primitive)
    elif isinstance(primitive, UUIDType):
        return self.visit_uuid(primitive)
    elif isinstance(primitive, BinaryType):
        return self.visit_binary(primitive)
        raise ValueError(f"Unknown type: {primitive}")

visit_binary(binary_type) abstractmethod

Visit a BinaryType.

Source code in pyiceberg/
def visit_binary(self, binary_type: BinaryType) -> T:
    """Visit a BinaryType."""

visit_boolean(boolean_type) abstractmethod

Visit a BooleanType.

Source code in pyiceberg/
def visit_boolean(self, boolean_type: BooleanType) -> T:
    """Visit a BooleanType."""

visit_date(date_type) abstractmethod

Visit a DecimalType.

Source code in pyiceberg/
def visit_date(self, date_type: DateType) -> T:
    """Visit a DecimalType."""

visit_decimal(decimal_type) abstractmethod

Visit a DecimalType.

Source code in pyiceberg/
def visit_decimal(self, decimal_type: DecimalType) -> T:
    """Visit a DecimalType."""

visit_double(double_type) abstractmethod

Visit a DoubleType.

Source code in pyiceberg/
def visit_double(self, double_type: DoubleType) -> T:
    """Visit a DoubleType."""

visit_fixed(fixed_type) abstractmethod

Visit a FixedType.

Source code in pyiceberg/
def visit_fixed(self, fixed_type: FixedType) -> T:
    """Visit a FixedType."""

visit_float(float_type) abstractmethod

Visit a FloatType.

Source code in pyiceberg/
def visit_float(self, float_type: FloatType) -> T:
    """Visit a FloatType."""

visit_integer(integer_type) abstractmethod

Visit a IntegerType.

Source code in pyiceberg/
def visit_integer(self, integer_type: IntegerType) -> T:
    """Visit a IntegerType."""

visit_long(long_type) abstractmethod

Visit a LongType.

Source code in pyiceberg/
def visit_long(self, long_type: LongType) -> T:
    """Visit a LongType."""

visit_string(string_type) abstractmethod

Visit a StringType.

Source code in pyiceberg/
def visit_string(self, string_type: StringType) -> T:
    """Visit a StringType."""

visit_time(time_type) abstractmethod

Visit a DecimalType.

Source code in pyiceberg/
def visit_time(self, time_type: TimeType) -> T:
    """Visit a DecimalType."""

visit_timestamp(timestamp_type) abstractmethod

Visit a TimestampType.

Source code in pyiceberg/
def visit_timestamp(self, timestamp_type: TimestampType) -> T:
    """Visit a TimestampType."""

visit_timestamptz(timestamptz_type) abstractmethod

Visit a TimestamptzType.

Source code in pyiceberg/
def visit_timestamptz(self, timestamptz_type: TimestamptzType) -> T:
    """Visit a TimestamptzType."""

visit_uuid(uuid_type) abstractmethod

Visit a UUIDType.

Source code in pyiceberg/
def visit_uuid(self, uuid_type: UUIDType) -> T:
    """Visit a UUIDType."""


Bases: Generic[P, T], ABC

Source code in pyiceberg/
class SchemaWithPartnerVisitor(Generic[P, T], ABC):
    def before_field(self, field: NestedField, field_partner: Optional[P]) -> None:
        """Override this method to perform an action immediately before visiting a field."""

    def after_field(self, field: NestedField, field_partner: Optional[P]) -> None:
        """Override this method to perform an action immediately after visiting a field."""

    def before_list_element(self, element: NestedField, element_partner: Optional[P]) -> None:
        """Override this method to perform an action immediately before visiting an element within a ListType."""
        self.before_field(element, element_partner)

    def after_list_element(self, element: NestedField, element_partner: Optional[P]) -> None:
        """Override this method to perform an action immediately after visiting an element within a ListType."""
        self.after_field(element, element_partner)

    def before_map_key(self, key: NestedField, key_partner: Optional[P]) -> None:
        """Override this method to perform an action immediately before visiting a key within a MapType."""
        self.before_field(key, key_partner)

    def after_map_key(self, key: NestedField, key_partner: Optional[P]) -> None:
        """Override this method to perform an action immediately after visiting a key within a MapType."""
        self.after_field(key, key_partner)

    def before_map_value(self, value: NestedField, value_partner: Optional[P]) -> None:
        """Override this method to perform an action immediately before visiting a value within a MapType."""
        self.before_field(value, value_partner)

    def after_map_value(self, value: NestedField, value_partner: Optional[P]) -> None:
        """Override this method to perform an action immediately after visiting a value within a MapType."""
        self.after_field(value, value_partner)

    def schema(self, schema: Schema, schema_partner: Optional[P], struct_result: T) -> T:
        """Visit a schema with a partner."""

    def struct(self, struct: StructType, struct_partner: Optional[P], field_results: List[T]) -> T:
        """Visit a struct type with a partner."""

    def field(self, field: NestedField, field_partner: Optional[P], field_result: T) -> T:
        """Visit a nested field with a partner."""

    def list(self, list_type: ListType, list_partner: Optional[P], element_result: T) -> T:
        """Visit a list type with a partner."""

    def map(self, map_type: MapType, map_partner: Optional[P], key_result: T, value_result: T) -> T:
        """Visit a map type with a partner."""

    def primitive(self, primitive: PrimitiveType, primitive_partner: Optional[P]) -> T:
        """Visit a primitive type with a partner."""

after_field(field, field_partner)

Override this method to perform an action immediately after visiting a field.

Source code in pyiceberg/
def after_field(self, field: NestedField, field_partner: Optional[P]) -> None:
    """Override this method to perform an action immediately after visiting a field."""

after_list_element(element, element_partner)

Override this method to perform an action immediately after visiting an element within a ListType.

Source code in pyiceberg/
def after_list_element(self, element: NestedField, element_partner: Optional[P]) -> None:
    """Override this method to perform an action immediately after visiting an element within a ListType."""
    self.after_field(element, element_partner)

after_map_key(key, key_partner)

Override this method to perform an action immediately after visiting a key within a MapType.

Source code in pyiceberg/
def after_map_key(self, key: NestedField, key_partner: Optional[P]) -> None:
    """Override this method to perform an action immediately after visiting a key within a MapType."""
    self.after_field(key, key_partner)

after_map_value(value, value_partner)

Override this method to perform an action immediately after visiting a value within a MapType.

Source code in pyiceberg/
def after_map_value(self, value: NestedField, value_partner: Optional[P]) -> None:
    """Override this method to perform an action immediately after visiting a value within a MapType."""
    self.after_field(value, value_partner)

before_field(field, field_partner)

Override this method to perform an action immediately before visiting a field.

Source code in pyiceberg/
def before_field(self, field: NestedField, field_partner: Optional[P]) -> None:
    """Override this method to perform an action immediately before visiting a field."""

before_list_element(element, element_partner)

Override this method to perform an action immediately before visiting an element within a ListType.

Source code in pyiceberg/
def before_list_element(self, element: NestedField, element_partner: Optional[P]) -> None:
    """Override this method to perform an action immediately before visiting an element within a ListType."""
    self.before_field(element, element_partner)

before_map_key(key, key_partner)

Override this method to perform an action immediately before visiting a key within a MapType.

Source code in pyiceberg/
def before_map_key(self, key: NestedField, key_partner: Optional[P]) -> None:
    """Override this method to perform an action immediately before visiting a key within a MapType."""
    self.before_field(key, key_partner)

before_map_value(value, value_partner)

Override this method to perform an action immediately before visiting a value within a MapType.

Source code in pyiceberg/
def before_map_value(self, value: NestedField, value_partner: Optional[P]) -> None:
    """Override this method to perform an action immediately before visiting a value within a MapType."""
    self.before_field(value, value_partner)

field(field, field_partner, field_result) abstractmethod

Visit a nested field with a partner.

Source code in pyiceberg/
def field(self, field: NestedField, field_partner: Optional[P], field_result: T) -> T:
    """Visit a nested field with a partner."""

list(list_type, list_partner, element_result) abstractmethod

Visit a list type with a partner.

Source code in pyiceberg/
def list(self, list_type: ListType, list_partner: Optional[P], element_result: T) -> T:
    """Visit a list type with a partner."""

map(map_type, map_partner, key_result, value_result) abstractmethod

Visit a map type with a partner.

Source code in pyiceberg/
def map(self, map_type: MapType, map_partner: Optional[P], key_result: T, value_result: T) -> T:
    """Visit a map type with a partner."""

primitive(primitive, primitive_partner) abstractmethod

Visit a primitive type with a partner.

Source code in pyiceberg/
def primitive(self, primitive: PrimitiveType, primitive_partner: Optional[P]) -> T:
    """Visit a primitive type with a partner."""

schema(schema, schema_partner, struct_result) abstractmethod

Visit a schema with a partner.

Source code in pyiceberg/
def schema(self, schema: Schema, schema_partner: Optional[P], struct_result: T) -> T:
    """Visit a schema with a partner."""

struct(struct, struct_partner, field_results) abstractmethod

Visit a struct type with a partner.

Source code in pyiceberg/
def struct(self, struct: StructType, struct_partner: Optional[P], field_results: List[T]) -> T:
    """Visit a struct type with a partner."""


Bases: SchemaVisitor[Dict[Position, Accessor]]

A schema visitor for generating a field ID to accessor index.


from pyiceberg.schema import Schema from pyiceberg.types import * schema = Schema( ... NestedField(field_id=2, name="id", field_type=IntegerType(), required=False), ... NestedField(field_id=1, name="data", field_type=StringType(), required=True), ... NestedField( ... field_id=3, ... name="location", ... field_type=StructType( ... NestedField(field_id=5, name="latitude", field_type=FloatType(), required=False), ... NestedField(field_id=6, name="longitude", field_type=FloatType(), required=False), ... ), ... required=True, ... ), ... schema_id=1, ... identifier_field_ids=[1], ... ) result = build_position_accessors(schema) expected = { ... 2: Accessor(position=0, inner=None), ... 1: Accessor(position=1, inner=None), ... 5: Accessor(position=2, inner=Accessor(position=0, inner=None)), ... 6: Accessor(position=2, inner=Accessor(position=1, inner=None)) ... } result == expected True

Source code in pyiceberg/
class _BuildPositionAccessors(SchemaVisitor[Dict[Position, Accessor]]):
    """A schema visitor for generating a field ID to accessor index.

        >>> from pyiceberg.schema import Schema
        >>> from pyiceberg.types import *
        >>> schema = Schema(
        ...     NestedField(field_id=2, name="id", field_type=IntegerType(), required=False),
        ...     NestedField(field_id=1, name="data", field_type=StringType(), required=True),
        ...     NestedField(
        ...         field_id=3,
        ...         name="location",
        ...         field_type=StructType(
        ...             NestedField(field_id=5, name="latitude", field_type=FloatType(), required=False),
        ...             NestedField(field_id=6, name="longitude", field_type=FloatType(), required=False),
        ...         ),
        ...         required=True,
        ...     ),
        ...     schema_id=1,
        ...     identifier_field_ids=[1],
        ... )
        >>> result = build_position_accessors(schema)
        >>> expected = {
        ...     2: Accessor(position=0, inner=None),
        ...     1: Accessor(position=1, inner=None),
        ...     5: Accessor(position=2, inner=Accessor(position=0, inner=None)),
        ...     6: Accessor(position=2, inner=Accessor(position=1, inner=None))
        ... }
        >>> result == expected

    def schema(self, schema: Schema, struct_result: Dict[Position, Accessor]) -> Dict[Position, Accessor]:
        return struct_result

    def struct(self, struct: StructType, field_results: List[Dict[Position, Accessor]]) -> Dict[Position, Accessor]:
        result = {}

        for position, field in enumerate(struct.fields):
            if field_results[position]:
                for inner_field_id, acc in field_results[position].items():
                    result[inner_field_id] = Accessor(position, inner=acc)
                result[field.field_id] = Accessor(position)

        return result

    def field(self, field: NestedField, field_result: Dict[Position, Accessor]) -> Dict[Position, Accessor]:
        return field_result

    def list(self, list_type: ListType, element_result: Dict[Position, Accessor]) -> Dict[Position, Accessor]:
        return {}

    def map(
        self, map_type: MapType, key_result: Dict[Position, Accessor], value_result: Dict[Position, Accessor]
    ) -> Dict[Position, Accessor]:
        return {}

    def primitive(self, primitive: PrimitiveType) -> Dict[Position, Accessor]:
        return {}


Bases: SchemaVisitor[Dict[int, NestedField]]

A schema visitor for generating a field ID to NestedField index.

Source code in pyiceberg/
class _IndexById(SchemaVisitor[Dict[int, NestedField]]):
    """A schema visitor for generating a field ID to NestedField index."""

    def __init__(self) -> None:
        self._index: Dict[int, NestedField] = {}

    def schema(self, schema: Schema, struct_result: Dict[int, NestedField]) -> Dict[int, NestedField]:
        return self._index

    def struct(self, struct: StructType, field_results: List[Dict[int, NestedField]]) -> Dict[int, NestedField]:
        return self._index

    def field(self, field: NestedField, field_result: Dict[int, NestedField]) -> Dict[int, NestedField]:
        """Add the field ID to the index."""
        self._index[field.field_id] = field
        return self._index

    def list(self, list_type: ListType, element_result: Dict[int, NestedField]) -> Dict[int, NestedField]:
        """Add the list element ID to the index."""
        self._index[list_type.element_field.field_id] = list_type.element_field
        return self._index

    def map(
        self, map_type: MapType, key_result: Dict[int, NestedField], value_result: Dict[int, NestedField]
    ) -> Dict[int, NestedField]:
        """Add the key ID and value ID as individual items in the index."""
        self._index[map_type.key_field.field_id] = map_type.key_field
        self._index[map_type.value_field.field_id] = map_type.value_field
        return self._index

    def primitive(self, primitive: PrimitiveType) -> Dict[int, NestedField]:
        return self._index

field(field, field_result)

Add the field ID to the index.

Source code in pyiceberg/
def field(self, field: NestedField, field_result: Dict[int, NestedField]) -> Dict[int, NestedField]:
    """Add the field ID to the index."""
    self._index[field.field_id] = field
    return self._index

list(list_type, element_result)

Add the list element ID to the index.

Source code in pyiceberg/
def list(self, list_type: ListType, element_result: Dict[int, NestedField]) -> Dict[int, NestedField]:
    """Add the list element ID to the index."""
    self._index[list_type.element_field.field_id] = list_type.element_field
    return self._index

map(map_type, key_result, value_result)

Add the key ID and value ID as individual items in the index.

Source code in pyiceberg/
def map(
    self, map_type: MapType, key_result: Dict[int, NestedField], value_result: Dict[int, NestedField]
) -> Dict[int, NestedField]:
    """Add the key ID and value ID as individual items in the index."""
    self._index[map_type.key_field.field_id] = map_type.key_field
    self._index[map_type.value_field.field_id] = map_type.value_field
    return self._index


Bases: SchemaVisitor[Dict[str, int]]

A schema visitor for generating a field name to field ID index.

Source code in pyiceberg/
class _IndexByName(SchemaVisitor[Dict[str, int]]):
    """A schema visitor for generating a field name to field ID index."""

    def __init__(self) -> None:
        self._index: Dict[str, int] = {}
        self._short_name_to_id: Dict[str, int] = {}
        self._combined_index: Dict[str, int] = {}
        self._field_names: List[str] = []
        self._short_field_names: List[str] = []

    def before_map_value(self, value: NestedField) -> None:
        if not isinstance(value.field_type, StructType):

    def after_map_value(self, value: NestedField) -> None:
        if not isinstance(value.field_type, StructType):

    def before_list_element(self, element: NestedField) -> None:
        """Short field names omit element when the element is a StructType."""
        if not isinstance(element.field_type, StructType):

    def after_list_element(self, element: NestedField) -> None:
        if not isinstance(element.field_type, StructType):

    def before_field(self, field: NestedField) -> None:
        """Store the field name."""

    def after_field(self, field: NestedField) -> None:
        """Remove the last field name stored."""

    def schema(self, schema: Schema, struct_result: Dict[str, int]) -> Dict[str, int]:
        return self._index

    def struct(self, struct: StructType, field_results: List[Dict[str, int]]) -> Dict[str, int]:
        return self._index

    def field(self, field: NestedField, field_result: Dict[str, int]) -> Dict[str, int]:
        """Add the field name to the index."""
        self._add_field(, field.field_id)
        return self._index

    def list(self, list_type: ListType, element_result: Dict[str, int]) -> Dict[str, int]:
        """Add the list element name to the index."""
        self._add_field(, list_type.element_field.field_id)
        return self._index

    def map(self, map_type: MapType, key_result: Dict[str, int], value_result: Dict[str, int]) -> Dict[str, int]:
        """Add the key name and value name as individual items in the index."""
        self._add_field(, map_type.key_field.field_id)
        self._add_field(, map_type.value_field.field_id)
        return self._index

    def _add_field(self, name: str, field_id: int) -> None:
        """Add a field name to the index, mapping its full name to its field ID.

            name (str): The field name.
            field_id (int): The field ID.

            ValueError: If the field name is already contained in the index.
        full_name = name

        if self._field_names:
            full_name = ".".join([".".join(self._field_names), name])

        if full_name in self._index:
            raise ValueError(f"Invalid schema, multiple fields for name {full_name}: {self._index[full_name]} and {field_id}")
        self._index[full_name] = field_id

        if self._short_field_names:
            short_name = ".".join([".".join(self._short_field_names), name])
            self._short_name_to_id[short_name] = field_id

    def primitive(self, primitive: PrimitiveType) -> Dict[str, int]:
        return self._index

    def by_name(self) -> Dict[str, int]:
        """Return an index of combined full and short names.

        Note: Only short names that do not conflict with full names are included.
        combined_index = self._short_name_to_id.copy()
        return combined_index

    def by_id(self) -> Dict[int, str]:
        """Return an index of ID to full names."""
        id_to_full_name = {value: key for key, value in self._index.items()}
        return id_to_full_name

_add_field(name, field_id)

Add a field name to the index, mapping its full name to its field ID.


Name Type Description Default
name str

The field name.

field_id int

The field ID.



Type Description

If the field name is already contained in the index.

Source code in pyiceberg/
def _add_field(self, name: str, field_id: int) -> None:
    """Add a field name to the index, mapping its full name to its field ID.

        name (str): The field name.
        field_id (int): The field ID.

        ValueError: If the field name is already contained in the index.
    full_name = name

    if self._field_names:
        full_name = ".".join([".".join(self._field_names), name])

    if full_name in self._index:
        raise ValueError(f"Invalid schema, multiple fields for name {full_name}: {self._index[full_name]} and {field_id}")
    self._index[full_name] = field_id

    if self._short_field_names:
        short_name = ".".join([".".join(self._short_field_names), name])
        self._short_name_to_id[short_name] = field_id


Remove the last field name stored.

Source code in pyiceberg/
def after_field(self, field: NestedField) -> None:
    """Remove the last field name stored."""


Store the field name.

Source code in pyiceberg/
def before_field(self, field: NestedField) -> None:
    """Store the field name."""


Short field names omit element when the element is a StructType.

Source code in pyiceberg/
def before_list_element(self, element: NestedField) -> None:
    """Short field names omit element when the element is a StructType."""
    if not isinstance(element.field_type, StructType):


Return an index of ID to full names.

Source code in pyiceberg/
def by_id(self) -> Dict[int, str]:
    """Return an index of ID to full names."""
    id_to_full_name = {value: key for key, value in self._index.items()}
    return id_to_full_name


Return an index of combined full and short names.

Note: Only short names that do not conflict with full names are included.

Source code in pyiceberg/
def by_name(self) -> Dict[str, int]:
    """Return an index of combined full and short names.

    Note: Only short names that do not conflict with full names are included.
    combined_index = self._short_name_to_id.copy()
    return combined_index

field(field, field_result)

Add the field name to the index.

Source code in pyiceberg/
def field(self, field: NestedField, field_result: Dict[str, int]) -> Dict[str, int]:
    """Add the field name to the index."""
    self._add_field(, field.field_id)
    return self._index

list(list_type, element_result)

Add the list element name to the index.

Source code in pyiceberg/
def list(self, list_type: ListType, element_result: Dict[str, int]) -> Dict[str, int]:
    """Add the list element name to the index."""
    self._add_field(, list_type.element_field.field_id)
    return self._index

map(map_type, key_result, value_result)

Add the key name and value name as individual items in the index.

Source code in pyiceberg/
def map(self, map_type: MapType, key_result: Dict[str, int], value_result: Dict[str, int]) -> Dict[str, int]:
    """Add the key name and value name as individual items in the index."""
    self._add_field(, map_type.key_field.field_id)
    self._add_field(, map_type.value_field.field_id)
    return self._index


Bases: PreOrderSchemaVisitor[IcebergType]

Traverses the schema and assigns monotonically increasing ids.

Source code in pyiceberg/
class _SetFreshIDs(PreOrderSchemaVisitor[IcebergType]):
    """Traverses the schema and assigns monotonically increasing ids."""

    old_id_to_new_id: Dict[int, int]

    def __init__(self, next_id_func: Optional[Callable[[], int]] = None) -> None:
        self.old_id_to_new_id = {}
        counter = itertools.count(1)
        self.next_id_func = next_id_func if next_id_func is not None else lambda: next(counter)

    def _get_and_increment(self, current_id: int) -> int:
        new_id = self.next_id_func()
        self.old_id_to_new_id[current_id] = new_id
        return new_id

    def schema(self, schema: Schema, struct_result: Callable[[], StructType]) -> Schema:
        return Schema(
            identifier_field_ids=[self.old_id_to_new_id[field_id] for field_id in schema.identifier_field_ids],

    def struct(self, struct: StructType, field_results: List[Callable[[], IcebergType]]) -> StructType:
        new_ids = [self._get_and_increment(field.field_id) for field in struct.fields]
        new_fields = []
        for field_id, field, field_type in zip(new_ids, struct.fields, field_results):
        return StructType(*new_fields)

    def field(self, field: NestedField, field_result: Callable[[], IcebergType]) -> IcebergType:
        return field_result()

    def list(self, list_type: ListType, element_result: Callable[[], IcebergType]) -> ListType:
        element_id = self._get_and_increment(list_type.element_id)
        return ListType(

    def map(self, map_type: MapType, key_result: Callable[[], IcebergType], value_result: Callable[[], IcebergType]) -> MapType:
        key_id = self._get_and_increment(map_type.key_id)
        value_id = self._get_and_increment(map_type.value_id)
        return MapType(

    def primitive(self, primitive: PrimitiveType) -> PrimitiveType:
        return primitive

_check_schema_compatible(requested_schema, provided_schema)

Check if the provided_schema is compatible with requested_schema.

Both Schemas must have valid IDs and share the same ID for the same field names.

Two schemas are considered compatible when: 1. All required fields in requested_schema are present and are also required in the provided_schema 2. Field Types are consistent for fields that are present in both schemas. I.e. the field type in the provided_schema can be promoted to the field type of the same field ID in requested_schema


Type Description

If the schemas are not compatible.

Source code in pyiceberg/
def _check_schema_compatible(requested_schema: Schema, provided_schema: Schema) -> None:
    Check if the `provided_schema` is compatible with `requested_schema`.

    Both Schemas must have valid IDs and share the same ID for the same field names.

    Two schemas are considered compatible when:
    1. All `required` fields in `requested_schema` are present and are also `required` in the `provided_schema`
    2. Field Types are consistent for fields that are present in both schemas. I.e. the field type
       in the `provided_schema` can be promoted to the field type of the same field ID in `requested_schema`

        ValueError: If the schemas are not compatible.
    pre_order_visit(requested_schema, _SchemaCompatibilityVisitor(provided_schema))


Generate an index of field IDs to their parent field IDs.


Name Type Description Default
schema_or_type Union[Schema, IcebergType]

A schema or type to index.



Type Description
Dict[int, int]

Dict[int, int]: An index of field IDs to their parent field IDs.

Source code in pyiceberg/
def _index_parents(schema_or_type: Union[Schema, IcebergType]) -> Dict[int, int]:
    """Generate an index of field IDs to their parent field IDs.

        schema_or_type (Union[Schema, IcebergType]): A schema or type to index.

        Dict[int, int]: An index of field IDs to their parent field IDs.
    return visit(schema_or_type, _IndexParents())

assign_fresh_schema_ids(schema_or_type, next_id=None)

Traverses the schema, and sets new IDs.

Source code in pyiceberg/
def assign_fresh_schema_ids(schema_or_type: Union[Schema, IcebergType], next_id: Optional[Callable[[], int]] = None) -> Schema:
    """Traverses the schema, and sets new IDs."""
    return pre_order_visit(schema_or_type, _SetFreshIDs(next_id_func=next_id))


Generate an index of field IDs to schema position accessors.


Name Type Description Default
schema_or_type Union[Schema, IcebergType]

A schema or type to index.



Type Description
Dict[int, Accessor]

Dict[int, Accessor]: An index of field IDs to accessors.

Source code in pyiceberg/
def build_position_accessors(schema_or_type: Union[Schema, IcebergType]) -> Dict[int, Accessor]:
    """Generate an index of field IDs to schema position accessors.

        schema_or_type (Union[Schema, IcebergType]): A schema or type to index.

        Dict[int, Accessor]: An index of field IDs to accessors.
    return visit(schema_or_type, _BuildPositionAccessors())


Generate an index of field IDs to NestedField instances.


Name Type Description Default
schema_or_type Union[Schema, IcebergType]

A schema or type to index.



Type Description
Dict[int, NestedField]

Dict[int, NestedField]: An index of field IDs to NestedField instances.

Source code in pyiceberg/
def index_by_id(schema_or_type: Union[Schema, IcebergType]) -> Dict[int, NestedField]:
    """Generate an index of field IDs to NestedField instances.

        schema_or_type (Union[Schema, IcebergType]): A schema or type to index.

        Dict[int, NestedField]: An index of field IDs to NestedField instances.
    return visit(schema_or_type, _IndexById())


Generate an index of field names to field IDs.


Name Type Description Default
schema_or_type Union[Schema, IcebergType]

A schema or type to index.



Type Description
Dict[str, int]

Dict[str, int]: An index of field names to field IDs.

Source code in pyiceberg/
def index_by_name(schema_or_type: Union[Schema, IcebergType]) -> Dict[str, int]:
    """Generate an index of field names to field IDs.

        schema_or_type (Union[Schema, IcebergType]): A schema or type to index.

        Dict[str, int]: An index of field names to field IDs.
    if len(schema_or_type.fields) > 0:
        indexer = _IndexByName()
        visit(schema_or_type, indexer)
        return indexer.by_name()
        return EMPTY_DICT


Generate an index of field IDs full field names.


Name Type Description Default
schema_or_type Union[Schema, IcebergType]

A schema or type to index.



Type Description
Dict[int, str]

Dict[str, int]: An index of field IDs to full names.

Source code in pyiceberg/
def index_name_by_id(schema_or_type: Union[Schema, IcebergType]) -> Dict[int, str]:
    """Generate an index of field IDs full field names.

        schema_or_type (Union[Schema, IcebergType]): A schema or type to index.

        Dict[str, int]: An index of field IDs to full names.
    indexer = _IndexByName()
    visit(schema_or_type, indexer)
    return indexer.by_id()

pre_order_visit(obj, visitor)

Apply a schema visitor to any point within a schema.

The function traverses the schema in pre-order fashion. This is a slimmed down version compared to the post-order traversal (missing before and after methods), mostly because we don't use the pre-order traversal much.


Name Type Description Default
obj Union[Schema, IcebergType]

An instance of a Schema or an IcebergType.

visitor PreOrderSchemaVisitor[T]

An instance of an implementation of the generic PreOrderSchemaVisitor base class.



Type Description

If attempting to visit an unrecognized object type.

Source code in pyiceberg/
def pre_order_visit(obj: Union[Schema, IcebergType], visitor: PreOrderSchemaVisitor[T]) -> T:
    """Apply a schema visitor to any point within a schema.

    The function traverses the schema in pre-order fashion. This is a slimmed down version
    compared to the post-order traversal (missing before and after methods), mostly
    because we don't use the pre-order traversal much.

        obj (Union[Schema, IcebergType]): An instance of a Schema or an IcebergType.
        visitor (PreOrderSchemaVisitor[T]): An instance of an implementation of the generic PreOrderSchemaVisitor base class.

        NotImplementedError: If attempting to visit an unrecognized object type.
    raise NotImplementedError(f"Cannot visit non-type: {obj}")

promote(file_type, read_type)

Promotes reading a file type to a read type.


Name Type Description Default
file_type IcebergType

The type of the Avro file.

read_type IcebergType

The requested read type.



Type Description

If attempting to resolve an unrecognized object type.

Source code in pyiceberg/
def promote(file_type: IcebergType, read_type: IcebergType) -> IcebergType:
    """Promotes reading a file type to a read type.

        file_type (IcebergType): The type of the Avro file.
        read_type (IcebergType): The requested read type.

        ResolveError: If attempting to resolve an unrecognized object type.
    if file_type == read_type:
        return file_type
        raise ResolveError(f"Cannot promote {file_type} to {read_type}")

prune_columns(schema, selected, select_full_types=True)

Prunes a column by only selecting a set of field-ids.


Name Type Description Default
schema Schema

The schema to be pruned.

selected Set[int]

The field-ids to be included.

select_full_types bool

Return the full struct when a subset is recorded



Type Description

The pruned schema.

Source code in pyiceberg/
def prune_columns(schema: Schema, selected: Set[int], select_full_types: bool = True) -> Schema:
    """Prunes a column by only selecting a set of field-ids.

        schema: The schema to be pruned.
        selected: The field-ids to be included.
        select_full_types: Return the full struct when a subset is recorded

        The pruned schema.
    result = visit(schema.as_struct(), _PruneColumnsVisitor(selected, select_full_types))
    return Schema(
        *(result or StructType()).fields,


Sanitize column names to make them compatible with Avro.

The column name should be starting with '' or digit followed by a string only contains '', digit or alphabet, otherwise it will be sanitized to conform the avro naming convention.


Name Type Description Default
schema Schema

The schema to be sanitized.



Type Description

The sanitized schema.

Source code in pyiceberg/
def sanitize_column_names(schema: Schema) -> Schema:
    """Sanitize column names to make them compatible with Avro.

    The column name should be starting with '_' or digit followed by a string only contains '_', digit or alphabet,
    otherwise it will be sanitized to conform the avro naming convention.

        schema: The schema to be sanitized.

        The sanitized schema.
    result = visit(schema.as_struct(), _SanitizeColumnsVisitor())
    return Schema(
        *(result or StructType()).fields,

visit(obj, visitor)

Apply a schema visitor to any point within a schema.

The function traverses the schema in post-order fashion.


Name Type Description Default
obj Union[Schema, IcebergType]

An instance of a Schema or an IcebergType.

visitor SchemaVisitor[T]

An instance of an implementation of the generic SchemaVisitor base class.



Type Description

If attempting to visit an unrecognized object type.

Source code in pyiceberg/
def visit(obj: Union[Schema, IcebergType], visitor: SchemaVisitor[T]) -> T:
    """Apply a schema visitor to any point within a schema.

    The function traverses the schema in post-order fashion.

        obj (Union[Schema, IcebergType]): An instance of a Schema or an IcebergType.
        visitor (SchemaVisitor[T]): An instance of an implementation of the generic SchemaVisitor base class.

        NotImplementedError: If attempting to visit an unrecognized object type.
    raise NotImplementedError(f"Cannot visit non-type: {obj}")