Skip to content

schema

Accessor dataclass

An accessor for a specific position in a container that implements the StructProtocol.

Source code in pyiceberg/schema.py
@dataclass(init=True, eq=True, frozen=True)
class Accessor:
    """An accessor for a specific position in a container that implements the StructProtocol."""

    position: int
    inner: Optional[Accessor] = None

    def __str__(self) -> str:
        """Return the string representation of the Accessor class."""
        return f"Accessor(position={self.position},inner={self.inner})"

    def __repr__(self) -> str:
        """Return the string representation of the Accessor class."""
        return self.__str__()

    def get(self, container: StructProtocol) -> Any:
        """Return the value at self.position in `container`.

        Args:
            container (StructProtocol): A container to access at position `self.position`.

        Returns:
            Any: The value at position `self.position` in the container.
        """
        pos = self.position
        val = container[pos]
        inner = self
        while inner.inner:
            inner = inner.inner
            val = val[inner.position]

        return val

__repr__()

Return the string representation of the Accessor class.

Source code in pyiceberg/schema.py
def __repr__(self) -> str:
    """Return the string representation of the Accessor class."""
    return self.__str__()

__str__()

Return the string representation of the Accessor class.

Source code in pyiceberg/schema.py
def __str__(self) -> str:
    """Return the string representation of the Accessor class."""
    return f"Accessor(position={self.position},inner={self.inner})"

get(container)

Return the value at self.position in container.

Parameters:

Name Type Description Default
container StructProtocol

A container to access at position self.position.

required

Returns:

Name Type Description
Any Any

The value at position self.position in the container.

Source code in pyiceberg/schema.py
def get(self, container: StructProtocol) -> Any:
    """Return the value at self.position in `container`.

    Args:
        container (StructProtocol): A container to access at position `self.position`.

    Returns:
        Any: The value at position `self.position` in the container.
    """
    pos = self.position
    val = container[pos]
    inner = self
    while inner.inner:
        inner = inner.inner
        val = val[inner.position]

    return val

PartnerAccessor

Bases: Generic[P], ABC

Source code in pyiceberg/schema.py
class PartnerAccessor(Generic[P], ABC):
    @abstractmethod
    def schema_partner(self, partner: Optional[P]) -> Optional[P]:
        """Return the equivalent of the schema as a struct."""

    @abstractmethod
    def field_partner(self, partner_struct: Optional[P], field_id: int, field_name: str) -> Optional[P]:
        """Return the equivalent struct field by name or id in the partner struct."""

    @abstractmethod
    def list_element_partner(self, partner_list: Optional[P]) -> Optional[P]:
        """Return the equivalent list element in the partner list."""

    @abstractmethod
    def map_key_partner(self, partner_map: Optional[P]) -> Optional[P]:
        """Return the equivalent map key in the partner map."""

    @abstractmethod
    def map_value_partner(self, partner_map: Optional[P]) -> Optional[P]:
        """Return the equivalent map value in the partner map."""

field_partner(partner_struct, field_id, field_name) abstractmethod

Return the equivalent struct field by name or id in the partner struct.

Source code in pyiceberg/schema.py
@abstractmethod
def field_partner(self, partner_struct: Optional[P], field_id: int, field_name: str) -> Optional[P]:
    """Return the equivalent struct field by name or id in the partner struct."""

list_element_partner(partner_list) abstractmethod

Return the equivalent list element in the partner list.

Source code in pyiceberg/schema.py
@abstractmethod
def list_element_partner(self, partner_list: Optional[P]) -> Optional[P]:
    """Return the equivalent list element in the partner list."""

map_key_partner(partner_map) abstractmethod

Return the equivalent map key in the partner map.

Source code in pyiceberg/schema.py
@abstractmethod
def map_key_partner(self, partner_map: Optional[P]) -> Optional[P]:
    """Return the equivalent map key in the partner map."""

map_value_partner(partner_map) abstractmethod

Return the equivalent map value in the partner map.

Source code in pyiceberg/schema.py
@abstractmethod
def map_value_partner(self, partner_map: Optional[P]) -> Optional[P]:
    """Return the equivalent map value in the partner map."""

schema_partner(partner) abstractmethod

Return the equivalent of the schema as a struct.

Source code in pyiceberg/schema.py
@abstractmethod
def schema_partner(self, partner: Optional[P]) -> Optional[P]:
    """Return the equivalent of the schema as a struct."""

PreOrderSchemaVisitor

Bases: Generic[T], ABC

Source code in pyiceberg/schema.py
class PreOrderSchemaVisitor(Generic[T], ABC):
    @abstractmethod
    def schema(self, schema: Schema, struct_result: Callable[[], T]) -> T:
        """Visit a Schema."""

    @abstractmethod
    def struct(self, struct: StructType, field_results: List[Callable[[], T]]) -> T:
        """Visit a StructType."""

    @abstractmethod
    def field(self, field: NestedField, field_result: Callable[[], T]) -> T:
        """Visit a NestedField."""

    @abstractmethod
    def list(self, list_type: ListType, element_result: Callable[[], T]) -> T:
        """Visit a ListType."""

    @abstractmethod
    def map(self, map_type: MapType, key_result: Callable[[], T], value_result: Callable[[], T]) -> T:
        """Visit a MapType."""

    @abstractmethod
    def primitive(self, primitive: PrimitiveType) -> T:
        """Visit a PrimitiveType."""

field(field, field_result) abstractmethod

Visit a NestedField.

Source code in pyiceberg/schema.py
@abstractmethod
def field(self, field: NestedField, field_result: Callable[[], T]) -> T:
    """Visit a NestedField."""

list(list_type, element_result) abstractmethod

Visit a ListType.

Source code in pyiceberg/schema.py
@abstractmethod
def list(self, list_type: ListType, element_result: Callable[[], T]) -> T:
    """Visit a ListType."""

map(map_type, key_result, value_result) abstractmethod

Visit a MapType.

Source code in pyiceberg/schema.py
@abstractmethod
def map(self, map_type: MapType, key_result: Callable[[], T], value_result: Callable[[], T]) -> T:
    """Visit a MapType."""

primitive(primitive) abstractmethod

Visit a PrimitiveType.

Source code in pyiceberg/schema.py
@abstractmethod
def primitive(self, primitive: PrimitiveType) -> T:
    """Visit a PrimitiveType."""

schema(schema, struct_result) abstractmethod

Visit a Schema.

Source code in pyiceberg/schema.py
@abstractmethod
def schema(self, schema: Schema, struct_result: Callable[[], T]) -> T:
    """Visit a Schema."""

struct(struct, field_results) abstractmethod

Visit a StructType.

Source code in pyiceberg/schema.py
@abstractmethod
def struct(self, struct: StructType, field_results: List[Callable[[], T]]) -> T:
    """Visit a StructType."""

PrimitiveWithPartnerVisitor

Bases: SchemaWithPartnerVisitor[P, T]

Source code in pyiceberg/schema.py
class PrimitiveWithPartnerVisitor(SchemaWithPartnerVisitor[P, T]):
    def primitive(self, primitive: PrimitiveType, primitive_partner: Optional[P]) -> T:
        """Visit a PrimitiveType."""
        if isinstance(primitive, BooleanType):
            return self.visit_boolean(primitive, primitive_partner)
        elif isinstance(primitive, IntegerType):
            return self.visit_integer(primitive, primitive_partner)
        elif isinstance(primitive, LongType):
            return self.visit_long(primitive, primitive_partner)
        elif isinstance(primitive, FloatType):
            return self.visit_float(primitive, primitive_partner)
        elif isinstance(primitive, DoubleType):
            return self.visit_double(primitive, primitive_partner)
        elif isinstance(primitive, DecimalType):
            return self.visit_decimal(primitive, primitive_partner)
        elif isinstance(primitive, DateType):
            return self.visit_date(primitive, primitive_partner)
        elif isinstance(primitive, TimeType):
            return self.visit_time(primitive, primitive_partner)
        elif isinstance(primitive, TimestampType):
            return self.visit_timestamp(primitive, primitive_partner)
        elif isinstance(primitive, TimestamptzType):
            return self.visit_timestamptz(primitive, primitive_partner)
        elif isinstance(primitive, StringType):
            return self.visit_string(primitive, primitive_partner)
        elif isinstance(primitive, UUIDType):
            return self.visit_uuid(primitive, primitive_partner)
        elif isinstance(primitive, FixedType):
            return self.visit_fixed(primitive, primitive_partner)
        elif isinstance(primitive, BinaryType):
            return self.visit_binary(primitive, primitive_partner)
        else:
            raise ValueError(f"Unknown type: {primitive}")

    @abstractmethod
    def visit_boolean(self, boolean_type: BooleanType, partner: Optional[P]) -> T:
        """Visit a BooleanType."""

    @abstractmethod
    def visit_integer(self, integer_type: IntegerType, partner: Optional[P]) -> T:
        """Visit a IntegerType."""

    @abstractmethod
    def visit_long(self, long_type: LongType, partner: Optional[P]) -> T:
        """Visit a LongType."""

    @abstractmethod
    def visit_float(self, float_type: FloatType, partner: Optional[P]) -> T:
        """Visit a FloatType."""

    @abstractmethod
    def visit_double(self, double_type: DoubleType, partner: Optional[P]) -> T:
        """Visit a DoubleType."""

    @abstractmethod
    def visit_decimal(self, decimal_type: DecimalType, partner: Optional[P]) -> T:
        """Visit a DecimalType."""

    @abstractmethod
    def visit_date(self, date_type: DateType, partner: Optional[P]) -> T:
        """Visit a DecimalType."""

    @abstractmethod
    def visit_time(self, time_type: TimeType, partner: Optional[P]) -> T:
        """Visit a DecimalType."""

    @abstractmethod
    def visit_timestamp(self, timestamp_type: TimestampType, partner: Optional[P]) -> T:
        """Visit a TimestampType."""

    @abstractmethod
    def visit_timestamptz(self, timestamptz_type: TimestamptzType, partner: Optional[P]) -> T:
        """Visit a TimestamptzType."""

    @abstractmethod
    def visit_string(self, string_type: StringType, partner: Optional[P]) -> T:
        """Visit a StringType."""

    @abstractmethod
    def visit_uuid(self, uuid_type: UUIDType, partner: Optional[P]) -> T:
        """Visit a UUIDType."""

    @abstractmethod
    def visit_fixed(self, fixed_type: FixedType, partner: Optional[P]) -> T:
        """Visit a FixedType."""

    @abstractmethod
    def visit_binary(self, binary_type: BinaryType, partner: Optional[P]) -> T:
        """Visit a BinaryType."""

primitive(primitive, primitive_partner)

Visit a PrimitiveType.

Source code in pyiceberg/schema.py
def primitive(self, primitive: PrimitiveType, primitive_partner: Optional[P]) -> T:
    """Visit a PrimitiveType."""
    if isinstance(primitive, BooleanType):
        return self.visit_boolean(primitive, primitive_partner)
    elif isinstance(primitive, IntegerType):
        return self.visit_integer(primitive, primitive_partner)
    elif isinstance(primitive, LongType):
        return self.visit_long(primitive, primitive_partner)
    elif isinstance(primitive, FloatType):
        return self.visit_float(primitive, primitive_partner)
    elif isinstance(primitive, DoubleType):
        return self.visit_double(primitive, primitive_partner)
    elif isinstance(primitive, DecimalType):
        return self.visit_decimal(primitive, primitive_partner)
    elif isinstance(primitive, DateType):
        return self.visit_date(primitive, primitive_partner)
    elif isinstance(primitive, TimeType):
        return self.visit_time(primitive, primitive_partner)
    elif isinstance(primitive, TimestampType):
        return self.visit_timestamp(primitive, primitive_partner)
    elif isinstance(primitive, TimestamptzType):
        return self.visit_timestamptz(primitive, primitive_partner)
    elif isinstance(primitive, StringType):
        return self.visit_string(primitive, primitive_partner)
    elif isinstance(primitive, UUIDType):
        return self.visit_uuid(primitive, primitive_partner)
    elif isinstance(primitive, FixedType):
        return self.visit_fixed(primitive, primitive_partner)
    elif isinstance(primitive, BinaryType):
        return self.visit_binary(primitive, primitive_partner)
    else:
        raise ValueError(f"Unknown type: {primitive}")

visit_binary(binary_type, partner) abstractmethod

Visit a BinaryType.

Source code in pyiceberg/schema.py
@abstractmethod
def visit_binary(self, binary_type: BinaryType, partner: Optional[P]) -> T:
    """Visit a BinaryType."""

visit_boolean(boolean_type, partner) abstractmethod

Visit a BooleanType.

Source code in pyiceberg/schema.py
@abstractmethod
def visit_boolean(self, boolean_type: BooleanType, partner: Optional[P]) -> T:
    """Visit a BooleanType."""

visit_date(date_type, partner) abstractmethod

Visit a DecimalType.

Source code in pyiceberg/schema.py
@abstractmethod
def visit_date(self, date_type: DateType, partner: Optional[P]) -> T:
    """Visit a DecimalType."""

visit_decimal(decimal_type, partner) abstractmethod

Visit a DecimalType.

Source code in pyiceberg/schema.py
@abstractmethod
def visit_decimal(self, decimal_type: DecimalType, partner: Optional[P]) -> T:
    """Visit a DecimalType."""

visit_double(double_type, partner) abstractmethod

Visit a DoubleType.

Source code in pyiceberg/schema.py
@abstractmethod
def visit_double(self, double_type: DoubleType, partner: Optional[P]) -> T:
    """Visit a DoubleType."""

visit_fixed(fixed_type, partner) abstractmethod

Visit a FixedType.

Source code in pyiceberg/schema.py
@abstractmethod
def visit_fixed(self, fixed_type: FixedType, partner: Optional[P]) -> T:
    """Visit a FixedType."""

visit_float(float_type, partner) abstractmethod

Visit a FloatType.

Source code in pyiceberg/schema.py
@abstractmethod
def visit_float(self, float_type: FloatType, partner: Optional[P]) -> T:
    """Visit a FloatType."""

visit_integer(integer_type, partner) abstractmethod

Visit a IntegerType.

Source code in pyiceberg/schema.py
@abstractmethod
def visit_integer(self, integer_type: IntegerType, partner: Optional[P]) -> T:
    """Visit a IntegerType."""

visit_long(long_type, partner) abstractmethod

Visit a LongType.

Source code in pyiceberg/schema.py
@abstractmethod
def visit_long(self, long_type: LongType, partner: Optional[P]) -> T:
    """Visit a LongType."""

visit_string(string_type, partner) abstractmethod

Visit a StringType.

Source code in pyiceberg/schema.py
@abstractmethod
def visit_string(self, string_type: StringType, partner: Optional[P]) -> T:
    """Visit a StringType."""

visit_time(time_type, partner) abstractmethod

Visit a DecimalType.

Source code in pyiceberg/schema.py
@abstractmethod
def visit_time(self, time_type: TimeType, partner: Optional[P]) -> T:
    """Visit a DecimalType."""

visit_timestamp(timestamp_type, partner) abstractmethod

Visit a TimestampType.

Source code in pyiceberg/schema.py
@abstractmethod
def visit_timestamp(self, timestamp_type: TimestampType, partner: Optional[P]) -> T:
    """Visit a TimestampType."""

visit_timestamptz(timestamptz_type, partner) abstractmethod

Visit a TimestamptzType.

Source code in pyiceberg/schema.py
@abstractmethod
def visit_timestamptz(self, timestamptz_type: TimestamptzType, partner: Optional[P]) -> T:
    """Visit a TimestamptzType."""

visit_uuid(uuid_type, partner) abstractmethod

Visit a UUIDType.

Source code in pyiceberg/schema.py
@abstractmethod
def visit_uuid(self, uuid_type: UUIDType, partner: Optional[P]) -> T:
    """Visit a UUIDType."""

Schema

Bases: IcebergBaseModel

A table Schema.

Example

from pyiceberg import schema from pyiceberg import types

Source code in pyiceberg/schema.py
class Schema(IcebergBaseModel):
    """A table Schema.

    Example:
        >>> from pyiceberg import schema
        >>> from pyiceberg import types
    """

    type: Literal["struct"] = "struct"
    fields: Tuple[NestedField, ...] = Field(default_factory=tuple)
    schema_id: int = Field(alias="schema-id", default=INITIAL_SCHEMA_ID)
    identifier_field_ids: List[int] = Field(alias="identifier-field-ids", default_factory=list)

    _name_to_id: Dict[str, int] = PrivateAttr()

    def __init__(self, *fields: NestedField, **data: Any):
        if fields:
            data["fields"] = fields
        super().__init__(**data)
        self._name_to_id = index_by_name(self)

    def __str__(self) -> str:
        """Return the string representation of the Schema class."""
        return "table {\n" + "\n".join(["  " + str(field) for field in self.columns]) + "\n}"

    def __repr__(self) -> str:
        """Return the string representation of the Schema class."""
        return f"Schema({', '.join(repr(column) for column in self.columns)}, schema_id={self.schema_id}, identifier_field_ids={self.identifier_field_ids})"

    def __len__(self) -> int:
        """Return the length of an instance of the Literal class."""
        return len(self.fields)

    def __eq__(self, other: Any) -> bool:
        """Return the equality of two instances of the Schema class."""
        if not other:
            return False

        if not isinstance(other, Schema):
            return False

        if len(self.columns) != len(other.columns):
            return False

        identifier_field_ids_is_equal = self.identifier_field_ids == other.identifier_field_ids
        schema_is_equal = all(lhs == rhs for lhs, rhs in zip(self.columns, other.columns))

        return identifier_field_ids_is_equal and schema_is_equal

    @model_validator(mode="after")
    def check_schema(self) -> Schema:
        if self.identifier_field_ids:
            for field_id in self.identifier_field_ids:
                self._validate_identifier_field(field_id)

        return self

    @property
    def columns(self) -> Tuple[NestedField, ...]:
        """A tuple of the top-level fields."""
        return self.fields

    @cached_property
    def _lazy_id_to_field(self) -> Dict[int, NestedField]:
        """Return an index of field ID to NestedField instance.

        This is calculated once when called for the first time. Subsequent calls to this method will use a cached index.
        """
        return index_by_id(self)

    @cached_property
    def _lazy_id_to_parent(self) -> Dict[int, int]:
        """Returns an index of field ID to parent field IDs.

        This is calculated once when called for the first time. Subsequent calls to this method will use a cached index.
        """
        return _index_parents(self)

    @cached_property
    def _lazy_name_to_id_lower(self) -> Dict[str, int]:
        """Return an index of lower-case field names to field IDs.

        This is calculated once when called for the first time. Subsequent calls to this method will use a cached index.
        """
        return {name.lower(): field_id for name, field_id in self._name_to_id.items()}

    @cached_property
    def _lazy_id_to_name(self) -> Dict[int, str]:
        """Return an index of field ID to full name.

        This is calculated once when called for the first time. Subsequent calls to this method will use a cached index.
        """
        return index_name_by_id(self)

    @cached_property
    def _lazy_id_to_accessor(self) -> Dict[int, Accessor]:
        """Return an index of field ID to accessor.

        This is calculated once when called for the first time. Subsequent calls to this method will use a cached index.
        """
        return build_position_accessors(self)

    def as_struct(self) -> StructType:
        """Return the schema as a struct."""
        return StructType(*self.fields)

    def as_arrow(self) -> "pa.Schema":
        """Return the schema as an Arrow schema."""
        from pyiceberg.io.pyarrow import schema_to_pyarrow

        return schema_to_pyarrow(self)

    def find_field(self, name_or_id: Union[str, int], case_sensitive: bool = True) -> NestedField:
        """Find a field using a field name or field ID.

        Args:
            name_or_id (Union[str, int]): Either a field name or a field ID.
            case_sensitive (bool, optional): Whether to perform a case-sensitive lookup using a field name. Defaults to True.

        Raises:
            ValueError: When the value cannot be found.

        Returns:
            NestedField: The matched NestedField.
        """
        if isinstance(name_or_id, int):
            if name_or_id not in self._lazy_id_to_field:
                raise ValueError(f"Could not find field with id: {name_or_id}")
            return self._lazy_id_to_field[name_or_id]

        if case_sensitive:
            field_id = self._name_to_id.get(name_or_id)
        else:
            field_id = self._lazy_name_to_id_lower.get(name_or_id.lower())

        if field_id is None:
            raise ValueError(f"Could not find field with name {name_or_id}, case_sensitive={case_sensitive}")

        return self._lazy_id_to_field[field_id]

    def find_type(self, name_or_id: Union[str, int], case_sensitive: bool = True) -> IcebergType:
        """Find a field type using a field name or field ID.

        Args:
            name_or_id (Union[str, int]): Either a field name or a field ID.
            case_sensitive (bool, optional): Whether to perform a case-sensitive lookup using a field name. Defaults to True.

        Returns:
            NestedField: The type of the matched NestedField.
        """
        field = self.find_field(name_or_id=name_or_id, case_sensitive=case_sensitive)
        if not field:
            raise ValueError(f"Could not find field with name or id {name_or_id}, case_sensitive={case_sensitive}")
        return field.field_type

    @property
    def highest_field_id(self) -> int:
        return max(self._lazy_id_to_name.keys(), default=0)

    @cached_property
    def name_mapping(self) -> NameMapping:
        from pyiceberg.table.name_mapping import create_mapping_from_schema

        return create_mapping_from_schema(self)

    def find_column_name(self, column_id: int) -> Optional[str]:
        """Find a column name given a column ID.

        Args:
            column_id (int): The ID of the column.

        Returns:
            str: The column name (or None if the column ID cannot be found).
        """
        return self._lazy_id_to_name.get(column_id)

    @property
    def column_names(self) -> List[str]:
        """
        Return a list of all the column names, including nested fields.

        Excludes short names.

        Returns:
            List[str]: The column names.
        """
        return list(self._lazy_id_to_name.values())

    def accessor_for_field(self, field_id: int) -> Accessor:
        """Find a schema position accessor given a field ID.

        Args:
            field_id (int): The ID of the field.

        Raises:
            ValueError: When the value cannot be found.

        Returns:
            Accessor: An accessor for the given field ID.
        """
        if field_id not in self._lazy_id_to_accessor:
            raise ValueError(f"Could not find accessor for field with id: {field_id}")

        return self._lazy_id_to_accessor[field_id]

    def identifier_field_names(self) -> Set[str]:
        """Return the names of the identifier fields.

        Returns:
            Set of names of the identifier fields
        """
        ids = set()
        for field_id in self.identifier_field_ids:
            column_name = self.find_column_name(field_id)
            if column_name is None:
                raise ValueError(f"Could not find identifier column id: {field_id}")
            ids.add(column_name)

        return ids

    def select(self, *names: str, case_sensitive: bool = True) -> Schema:
        """Return a new schema instance pruned to a subset of columns.

        Args:
            names (List[str]): A list of column names.
            case_sensitive (bool, optional): Whether to perform a case-sensitive lookup for each column name. Defaults to True.

        Returns:
            Schema: A new schema with pruned columns.

        Raises:
            ValueError: If a column is selected that doesn't exist.
        """
        try:
            if case_sensitive:
                ids = {self._name_to_id[name] for name in names}
            else:
                ids = {self._lazy_name_to_id_lower[name.lower()] for name in names}
        except KeyError as e:
            raise ValueError(f"Could not find column: {e}") from e

        return prune_columns(self, ids)

    @property
    def field_ids(self) -> Set[int]:
        """Return the IDs of the current schema."""
        return set(self._name_to_id.values())

    def _validate_identifier_field(self, field_id: int) -> None:
        """Validate that the field with the given ID is a valid identifier field.

        Args:
          field_id: The ID of the field to validate.

        Raises:
          ValueError: If the field is not valid.
        """
        field = self.find_field(field_id)
        if not field.field_type.is_primitive:
            raise ValueError(f"Identifier field {field_id} invalid: not a primitive type field")

        if not field.required:
            raise ValueError(f"Identifier field {field_id} invalid: not a required field")

        if isinstance(field.field_type, (DoubleType, FloatType)):
            raise ValueError(f"Identifier field {field_id} invalid: must not be float or double field")

        # Check whether the nested field is in a chain of required struct fields
        # Exploring from root for better error message for list and map types
        parent_id = self._lazy_id_to_parent.get(field.field_id)
        fields: List[int] = []
        while parent_id is not None:
            fields.append(parent_id)
            parent_id = self._lazy_id_to_parent.get(parent_id)

        while fields:
            parent = self.find_field(fields.pop())
            if not parent.field_type.is_struct:
                raise ValueError(f"Cannot add field {field.name} as an identifier field: must not be nested in {parent}")

            if not parent.required:
                raise ValueError(
                    f"Cannot add field {field.name} as an identifier field: must not be nested in an optional field {parent}"
                )

column_names: List[str] property

Return a list of all the column names, including nested fields.

Excludes short names.

Returns:

Type Description
List[str]

List[str]: The column names.

columns: Tuple[NestedField, ...] property

A tuple of the top-level fields.

field_ids: Set[int] property

Return the IDs of the current schema.

__eq__(other)

Return the equality of two instances of the Schema class.

Source code in pyiceberg/schema.py
def __eq__(self, other: Any) -> bool:
    """Return the equality of two instances of the Schema class."""
    if not other:
        return False

    if not isinstance(other, Schema):
        return False

    if len(self.columns) != len(other.columns):
        return False

    identifier_field_ids_is_equal = self.identifier_field_ids == other.identifier_field_ids
    schema_is_equal = all(lhs == rhs for lhs, rhs in zip(self.columns, other.columns))

    return identifier_field_ids_is_equal and schema_is_equal

__len__()

Return the length of an instance of the Literal class.

Source code in pyiceberg/schema.py
def __len__(self) -> int:
    """Return the length of an instance of the Literal class."""
    return len(self.fields)

__repr__()

Return the string representation of the Schema class.

Source code in pyiceberg/schema.py
def __repr__(self) -> str:
    """Return the string representation of the Schema class."""
    return f"Schema({', '.join(repr(column) for column in self.columns)}, schema_id={self.schema_id}, identifier_field_ids={self.identifier_field_ids})"

__str__()

Return the string representation of the Schema class.

Source code in pyiceberg/schema.py
def __str__(self) -> str:
    """Return the string representation of the Schema class."""
    return "table {\n" + "\n".join(["  " + str(field) for field in self.columns]) + "\n}"

accessor_for_field(field_id)

Find a schema position accessor given a field ID.

Parameters:

Name Type Description Default
field_id int

The ID of the field.

required

Raises:

Type Description
ValueError

When the value cannot be found.

Returns:

Name Type Description
Accessor Accessor

An accessor for the given field ID.

Source code in pyiceberg/schema.py
def accessor_for_field(self, field_id: int) -> Accessor:
    """Find a schema position accessor given a field ID.

    Args:
        field_id (int): The ID of the field.

    Raises:
        ValueError: When the value cannot be found.

    Returns:
        Accessor: An accessor for the given field ID.
    """
    if field_id not in self._lazy_id_to_accessor:
        raise ValueError(f"Could not find accessor for field with id: {field_id}")

    return self._lazy_id_to_accessor[field_id]

as_arrow()

Return the schema as an Arrow schema.

Source code in pyiceberg/schema.py
def as_arrow(self) -> "pa.Schema":
    """Return the schema as an Arrow schema."""
    from pyiceberg.io.pyarrow import schema_to_pyarrow

    return schema_to_pyarrow(self)

as_struct()

Return the schema as a struct.

Source code in pyiceberg/schema.py
def as_struct(self) -> StructType:
    """Return the schema as a struct."""
    return StructType(*self.fields)

find_column_name(column_id)

Find a column name given a column ID.

Parameters:

Name Type Description Default
column_id int

The ID of the column.

required

Returns:

Name Type Description
str Optional[str]

The column name (or None if the column ID cannot be found).

Source code in pyiceberg/schema.py
def find_column_name(self, column_id: int) -> Optional[str]:
    """Find a column name given a column ID.

    Args:
        column_id (int): The ID of the column.

    Returns:
        str: The column name (or None if the column ID cannot be found).
    """
    return self._lazy_id_to_name.get(column_id)

find_field(name_or_id, case_sensitive=True)

Find a field using a field name or field ID.

Parameters:

Name Type Description Default
name_or_id Union[str, int]

Either a field name or a field ID.

required
case_sensitive bool

Whether to perform a case-sensitive lookup using a field name. Defaults to True.

True

Raises:

Type Description
ValueError

When the value cannot be found.

Returns:

Name Type Description
NestedField NestedField

The matched NestedField.

Source code in pyiceberg/schema.py
def find_field(self, name_or_id: Union[str, int], case_sensitive: bool = True) -> NestedField:
    """Find a field using a field name or field ID.

    Args:
        name_or_id (Union[str, int]): Either a field name or a field ID.
        case_sensitive (bool, optional): Whether to perform a case-sensitive lookup using a field name. Defaults to True.

    Raises:
        ValueError: When the value cannot be found.

    Returns:
        NestedField: The matched NestedField.
    """
    if isinstance(name_or_id, int):
        if name_or_id not in self._lazy_id_to_field:
            raise ValueError(f"Could not find field with id: {name_or_id}")
        return self._lazy_id_to_field[name_or_id]

    if case_sensitive:
        field_id = self._name_to_id.get(name_or_id)
    else:
        field_id = self._lazy_name_to_id_lower.get(name_or_id.lower())

    if field_id is None:
        raise ValueError(f"Could not find field with name {name_or_id}, case_sensitive={case_sensitive}")

    return self._lazy_id_to_field[field_id]

find_type(name_or_id, case_sensitive=True)

Find a field type using a field name or field ID.

Parameters:

Name Type Description Default
name_or_id Union[str, int]

Either a field name or a field ID.

required
case_sensitive bool

Whether to perform a case-sensitive lookup using a field name. Defaults to True.

True

Returns:

Name Type Description
NestedField IcebergType

The type of the matched NestedField.

Source code in pyiceberg/schema.py
def find_type(self, name_or_id: Union[str, int], case_sensitive: bool = True) -> IcebergType:
    """Find a field type using a field name or field ID.

    Args:
        name_or_id (Union[str, int]): Either a field name or a field ID.
        case_sensitive (bool, optional): Whether to perform a case-sensitive lookup using a field name. Defaults to True.

    Returns:
        NestedField: The type of the matched NestedField.
    """
    field = self.find_field(name_or_id=name_or_id, case_sensitive=case_sensitive)
    if not field:
        raise ValueError(f"Could not find field with name or id {name_or_id}, case_sensitive={case_sensitive}")
    return field.field_type

identifier_field_names()

Return the names of the identifier fields.

Returns:

Type Description
Set[str]

Set of names of the identifier fields

Source code in pyiceberg/schema.py
def identifier_field_names(self) -> Set[str]:
    """Return the names of the identifier fields.

    Returns:
        Set of names of the identifier fields
    """
    ids = set()
    for field_id in self.identifier_field_ids:
        column_name = self.find_column_name(field_id)
        if column_name is None:
            raise ValueError(f"Could not find identifier column id: {field_id}")
        ids.add(column_name)

    return ids

select(*names, case_sensitive=True)

Return a new schema instance pruned to a subset of columns.

Parameters:

Name Type Description Default
names List[str]

A list of column names.

()
case_sensitive bool

Whether to perform a case-sensitive lookup for each column name. Defaults to True.

True

Returns:

Name Type Description
Schema Schema

A new schema with pruned columns.

Raises:

Type Description
ValueError

If a column is selected that doesn't exist.

Source code in pyiceberg/schema.py
def select(self, *names: str, case_sensitive: bool = True) -> Schema:
    """Return a new schema instance pruned to a subset of columns.

    Args:
        names (List[str]): A list of column names.
        case_sensitive (bool, optional): Whether to perform a case-sensitive lookup for each column name. Defaults to True.

    Returns:
        Schema: A new schema with pruned columns.

    Raises:
        ValueError: If a column is selected that doesn't exist.
    """
    try:
        if case_sensitive:
            ids = {self._name_to_id[name] for name in names}
        else:
            ids = {self._lazy_name_to_id_lower[name.lower()] for name in names}
    except KeyError as e:
        raise ValueError(f"Could not find column: {e}") from e

    return prune_columns(self, ids)

SchemaVisitor

Bases: Generic[T], ABC

Source code in pyiceberg/schema.py
class SchemaVisitor(Generic[T], ABC):
    def before_field(self, field: NestedField) -> None:
        """Override this method to perform an action immediately before visiting a field."""

    def after_field(self, field: NestedField) -> None:
        """Override this method to perform an action immediately after visiting a field."""

    def before_list_element(self, element: NestedField) -> None:
        """Override this method to perform an action immediately before visiting an element within a ListType."""
        self.before_field(element)

    def after_list_element(self, element: NestedField) -> None:
        """Override this method to perform an action immediately after visiting an element within a ListType."""
        self.after_field(element)

    def before_map_key(self, key: NestedField) -> None:
        """Override this method to perform an action immediately before visiting a key within a MapType."""
        self.before_field(key)

    def after_map_key(self, key: NestedField) -> None:
        """Override this method to perform an action immediately after visiting a key within a MapType."""
        self.after_field(key)

    def before_map_value(self, value: NestedField) -> None:
        """Override this method to perform an action immediately before visiting a value within a MapType."""
        self.before_field(value)

    def after_map_value(self, value: NestedField) -> None:
        """Override this method to perform an action immediately after visiting a value within a MapType."""
        self.after_field(value)

    @abstractmethod
    def schema(self, schema: Schema, struct_result: T) -> T:
        """Visit a Schema."""

    @abstractmethod
    def struct(self, struct: StructType, field_results: List[T]) -> T:
        """Visit a StructType."""

    @abstractmethod
    def field(self, field: NestedField, field_result: T) -> T:
        """Visit a NestedField."""

    @abstractmethod
    def list(self, list_type: ListType, element_result: T) -> T:
        """Visit a ListType."""

    @abstractmethod
    def map(self, map_type: MapType, key_result: T, value_result: T) -> T:
        """Visit a MapType."""

    @abstractmethod
    def primitive(self, primitive: PrimitiveType) -> T:
        """Visit a PrimitiveType."""

after_field(field)

Override this method to perform an action immediately after visiting a field.

Source code in pyiceberg/schema.py
def after_field(self, field: NestedField) -> None:
    """Override this method to perform an action immediately after visiting a field."""

after_list_element(element)

Override this method to perform an action immediately after visiting an element within a ListType.

Source code in pyiceberg/schema.py
def after_list_element(self, element: NestedField) -> None:
    """Override this method to perform an action immediately after visiting an element within a ListType."""
    self.after_field(element)

after_map_key(key)

Override this method to perform an action immediately after visiting a key within a MapType.

Source code in pyiceberg/schema.py
def after_map_key(self, key: NestedField) -> None:
    """Override this method to perform an action immediately after visiting a key within a MapType."""
    self.after_field(key)

after_map_value(value)

Override this method to perform an action immediately after visiting a value within a MapType.

Source code in pyiceberg/schema.py
def after_map_value(self, value: NestedField) -> None:
    """Override this method to perform an action immediately after visiting a value within a MapType."""
    self.after_field(value)

before_field(field)

Override this method to perform an action immediately before visiting a field.

Source code in pyiceberg/schema.py
def before_field(self, field: NestedField) -> None:
    """Override this method to perform an action immediately before visiting a field."""

before_list_element(element)

Override this method to perform an action immediately before visiting an element within a ListType.

Source code in pyiceberg/schema.py
def before_list_element(self, element: NestedField) -> None:
    """Override this method to perform an action immediately before visiting an element within a ListType."""
    self.before_field(element)

before_map_key(key)

Override this method to perform an action immediately before visiting a key within a MapType.

Source code in pyiceberg/schema.py
def before_map_key(self, key: NestedField) -> None:
    """Override this method to perform an action immediately before visiting a key within a MapType."""
    self.before_field(key)

before_map_value(value)

Override this method to perform an action immediately before visiting a value within a MapType.

Source code in pyiceberg/schema.py
def before_map_value(self, value: NestedField) -> None:
    """Override this method to perform an action immediately before visiting a value within a MapType."""
    self.before_field(value)

field(field, field_result) abstractmethod

Visit a NestedField.

Source code in pyiceberg/schema.py
@abstractmethod
def field(self, field: NestedField, field_result: T) -> T:
    """Visit a NestedField."""

list(list_type, element_result) abstractmethod

Visit a ListType.

Source code in pyiceberg/schema.py
@abstractmethod
def list(self, list_type: ListType, element_result: T) -> T:
    """Visit a ListType."""

map(map_type, key_result, value_result) abstractmethod

Visit a MapType.

Source code in pyiceberg/schema.py
@abstractmethod
def map(self, map_type: MapType, key_result: T, value_result: T) -> T:
    """Visit a MapType."""

primitive(primitive) abstractmethod

Visit a PrimitiveType.

Source code in pyiceberg/schema.py
@abstractmethod
def primitive(self, primitive: PrimitiveType) -> T:
    """Visit a PrimitiveType."""

schema(schema, struct_result) abstractmethod

Visit a Schema.

Source code in pyiceberg/schema.py
@abstractmethod
def schema(self, schema: Schema, struct_result: T) -> T:
    """Visit a Schema."""

struct(struct, field_results) abstractmethod

Visit a StructType.

Source code in pyiceberg/schema.py
@abstractmethod
def struct(self, struct: StructType, field_results: List[T]) -> T:
    """Visit a StructType."""

SchemaVisitorPerPrimitiveType

Bases: SchemaVisitor[T], ABC

Source code in pyiceberg/schema.py
class SchemaVisitorPerPrimitiveType(SchemaVisitor[T], ABC):
    def primitive(self, primitive: PrimitiveType) -> T:
        """Visit a PrimitiveType."""
        if isinstance(primitive, FixedType):
            return self.visit_fixed(primitive)
        elif isinstance(primitive, DecimalType):
            return self.visit_decimal(primitive)
        elif isinstance(primitive, BooleanType):
            return self.visit_boolean(primitive)
        elif isinstance(primitive, IntegerType):
            return self.visit_integer(primitive)
        elif isinstance(primitive, LongType):
            return self.visit_long(primitive)
        elif isinstance(primitive, FloatType):
            return self.visit_float(primitive)
        elif isinstance(primitive, DoubleType):
            return self.visit_double(primitive)
        elif isinstance(primitive, DateType):
            return self.visit_date(primitive)
        elif isinstance(primitive, TimeType):
            return self.visit_time(primitive)
        elif isinstance(primitive, TimestampType):
            return self.visit_timestamp(primitive)
        elif isinstance(primitive, TimestamptzType):
            return self.visit_timestamptz(primitive)
        elif isinstance(primitive, StringType):
            return self.visit_string(primitive)
        elif isinstance(primitive, UUIDType):
            return self.visit_uuid(primitive)
        elif isinstance(primitive, BinaryType):
            return self.visit_binary(primitive)
        else:
            raise ValueError(f"Unknown type: {primitive}")

    @abstractmethod
    def visit_fixed(self, fixed_type: FixedType) -> T:
        """Visit a FixedType."""

    @abstractmethod
    def visit_decimal(self, decimal_type: DecimalType) -> T:
        """Visit a DecimalType."""

    @abstractmethod
    def visit_boolean(self, boolean_type: BooleanType) -> T:
        """Visit a BooleanType."""

    @abstractmethod
    def visit_integer(self, integer_type: IntegerType) -> T:
        """Visit a IntegerType."""

    @abstractmethod
    def visit_long(self, long_type: LongType) -> T:
        """Visit a LongType."""

    @abstractmethod
    def visit_float(self, float_type: FloatType) -> T:
        """Visit a FloatType."""

    @abstractmethod
    def visit_double(self, double_type: DoubleType) -> T:
        """Visit a DoubleType."""

    @abstractmethod
    def visit_date(self, date_type: DateType) -> T:
        """Visit a DecimalType."""

    @abstractmethod
    def visit_time(self, time_type: TimeType) -> T:
        """Visit a DecimalType."""

    @abstractmethod
    def visit_timestamp(self, timestamp_type: TimestampType) -> T:
        """Visit a TimestampType."""

    @abstractmethod
    def visit_timestamptz(self, timestamptz_type: TimestamptzType) -> T:
        """Visit a TimestamptzType."""

    @abstractmethod
    def visit_string(self, string_type: StringType) -> T:
        """Visit a StringType."""

    @abstractmethod
    def visit_uuid(self, uuid_type: UUIDType) -> T:
        """Visit a UUIDType."""

    @abstractmethod
    def visit_binary(self, binary_type: BinaryType) -> T:
        """Visit a BinaryType."""

primitive(primitive)

Visit a PrimitiveType.

Source code in pyiceberg/schema.py
def primitive(self, primitive: PrimitiveType) -> T:
    """Visit a PrimitiveType."""
    if isinstance(primitive, FixedType):
        return self.visit_fixed(primitive)
    elif isinstance(primitive, DecimalType):
        return self.visit_decimal(primitive)
    elif isinstance(primitive, BooleanType):
        return self.visit_boolean(primitive)
    elif isinstance(primitive, IntegerType):
        return self.visit_integer(primitive)
    elif isinstance(primitive, LongType):
        return self.visit_long(primitive)
    elif isinstance(primitive, FloatType):
        return self.visit_float(primitive)
    elif isinstance(primitive, DoubleType):
        return self.visit_double(primitive)
    elif isinstance(primitive, DateType):
        return self.visit_date(primitive)
    elif isinstance(primitive, TimeType):
        return self.visit_time(primitive)
    elif isinstance(primitive, TimestampType):
        return self.visit_timestamp(primitive)
    elif isinstance(primitive, TimestamptzType):
        return self.visit_timestamptz(primitive)
    elif isinstance(primitive, StringType):
        return self.visit_string(primitive)
    elif isinstance(primitive, UUIDType):
        return self.visit_uuid(primitive)
    elif isinstance(primitive, BinaryType):
        return self.visit_binary(primitive)
    else:
        raise ValueError(f"Unknown type: {primitive}")

visit_binary(binary_type) abstractmethod

Visit a BinaryType.

Source code in pyiceberg/schema.py
@abstractmethod
def visit_binary(self, binary_type: BinaryType) -> T:
    """Visit a BinaryType."""

visit_boolean(boolean_type) abstractmethod

Visit a BooleanType.

Source code in pyiceberg/schema.py
@abstractmethod
def visit_boolean(self, boolean_type: BooleanType) -> T:
    """Visit a BooleanType."""

visit_date(date_type) abstractmethod

Visit a DecimalType.

Source code in pyiceberg/schema.py
@abstractmethod
def visit_date(self, date_type: DateType) -> T:
    """Visit a DecimalType."""

visit_decimal(decimal_type) abstractmethod

Visit a DecimalType.

Source code in pyiceberg/schema.py
@abstractmethod
def visit_decimal(self, decimal_type: DecimalType) -> T:
    """Visit a DecimalType."""

visit_double(double_type) abstractmethod

Visit a DoubleType.

Source code in pyiceberg/schema.py
@abstractmethod
def visit_double(self, double_type: DoubleType) -> T:
    """Visit a DoubleType."""

visit_fixed(fixed_type) abstractmethod

Visit a FixedType.

Source code in pyiceberg/schema.py
@abstractmethod
def visit_fixed(self, fixed_type: FixedType) -> T:
    """Visit a FixedType."""

visit_float(float_type) abstractmethod

Visit a FloatType.

Source code in pyiceberg/schema.py
@abstractmethod
def visit_float(self, float_type: FloatType) -> T:
    """Visit a FloatType."""

visit_integer(integer_type) abstractmethod

Visit a IntegerType.

Source code in pyiceberg/schema.py
@abstractmethod
def visit_integer(self, integer_type: IntegerType) -> T:
    """Visit a IntegerType."""

visit_long(long_type) abstractmethod

Visit a LongType.

Source code in pyiceberg/schema.py
@abstractmethod
def visit_long(self, long_type: LongType) -> T:
    """Visit a LongType."""

visit_string(string_type) abstractmethod

Visit a StringType.

Source code in pyiceberg/schema.py
@abstractmethod
def visit_string(self, string_type: StringType) -> T:
    """Visit a StringType."""

visit_time(time_type) abstractmethod

Visit a DecimalType.

Source code in pyiceberg/schema.py
@abstractmethod
def visit_time(self, time_type: TimeType) -> T:
    """Visit a DecimalType."""

visit_timestamp(timestamp_type) abstractmethod

Visit a TimestampType.

Source code in pyiceberg/schema.py
@abstractmethod
def visit_timestamp(self, timestamp_type: TimestampType) -> T:
    """Visit a TimestampType."""

visit_timestamptz(timestamptz_type) abstractmethod

Visit a TimestamptzType.

Source code in pyiceberg/schema.py
@abstractmethod
def visit_timestamptz(self, timestamptz_type: TimestamptzType) -> T:
    """Visit a TimestamptzType."""

visit_uuid(uuid_type) abstractmethod

Visit a UUIDType.

Source code in pyiceberg/schema.py
@abstractmethod
def visit_uuid(self, uuid_type: UUIDType) -> T:
    """Visit a UUIDType."""

SchemaWithPartnerVisitor

Bases: Generic[P, T], ABC

Source code in pyiceberg/schema.py
class SchemaWithPartnerVisitor(Generic[P, T], ABC):
    def before_field(self, field: NestedField, field_partner: Optional[P]) -> None:
        """Override this method to perform an action immediately before visiting a field."""

    def after_field(self, field: NestedField, field_partner: Optional[P]) -> None:
        """Override this method to perform an action immediately after visiting a field."""

    def before_list_element(self, element: NestedField, element_partner: Optional[P]) -> None:
        """Override this method to perform an action immediately before visiting an element within a ListType."""
        self.before_field(element, element_partner)

    def after_list_element(self, element: NestedField, element_partner: Optional[P]) -> None:
        """Override this method to perform an action immediately after visiting an element within a ListType."""
        self.after_field(element, element_partner)

    def before_map_key(self, key: NestedField, key_partner: Optional[P]) -> None:
        """Override this method to perform an action immediately before visiting a key within a MapType."""
        self.before_field(key, key_partner)

    def after_map_key(self, key: NestedField, key_partner: Optional[P]) -> None:
        """Override this method to perform an action immediately after visiting a key within a MapType."""
        self.after_field(key, key_partner)

    def before_map_value(self, value: NestedField, value_partner: Optional[P]) -> None:
        """Override this method to perform an action immediately before visiting a value within a MapType."""
        self.before_field(value, value_partner)

    def after_map_value(self, value: NestedField, value_partner: Optional[P]) -> None:
        """Override this method to perform an action immediately after visiting a value within a MapType."""
        self.after_field(value, value_partner)

    @abstractmethod
    def schema(self, schema: Schema, schema_partner: Optional[P], struct_result: T) -> T:
        """Visit a schema with a partner."""

    @abstractmethod
    def struct(self, struct: StructType, struct_partner: Optional[P], field_results: List[T]) -> T:
        """Visit a struct type with a partner."""

    @abstractmethod
    def field(self, field: NestedField, field_partner: Optional[P], field_result: T) -> T:
        """Visit a nested field with a partner."""

    @abstractmethod
    def list(self, list_type: ListType, list_partner: Optional[P], element_result: T) -> T:
        """Visit a list type with a partner."""

    @abstractmethod
    def map(self, map_type: MapType, map_partner: Optional[P], key_result: T, value_result: T) -> T:
        """Visit a map type with a partner."""

    @abstractmethod
    def primitive(self, primitive: PrimitiveType, primitive_partner: Optional[P]) -> T:
        """Visit a primitive type with a partner."""

after_field(field, field_partner)

Override this method to perform an action immediately after visiting a field.

Source code in pyiceberg/schema.py
def after_field(self, field: NestedField, field_partner: Optional[P]) -> None:
    """Override this method to perform an action immediately after visiting a field."""

after_list_element(element, element_partner)

Override this method to perform an action immediately after visiting an element within a ListType.

Source code in pyiceberg/schema.py
def after_list_element(self, element: NestedField, element_partner: Optional[P]) -> None:
    """Override this method to perform an action immediately after visiting an element within a ListType."""
    self.after_field(element, element_partner)

after_map_key(key, key_partner)

Override this method to perform an action immediately after visiting a key within a MapType.

Source code in pyiceberg/schema.py
def after_map_key(self, key: NestedField, key_partner: Optional[P]) -> None:
    """Override this method to perform an action immediately after visiting a key within a MapType."""
    self.after_field(key, key_partner)

after_map_value(value, value_partner)

Override this method to perform an action immediately after visiting a value within a MapType.

Source code in pyiceberg/schema.py
def after_map_value(self, value: NestedField, value_partner: Optional[P]) -> None:
    """Override this method to perform an action immediately after visiting a value within a MapType."""
    self.after_field(value, value_partner)

before_field(field, field_partner)

Override this method to perform an action immediately before visiting a field.

Source code in pyiceberg/schema.py
def before_field(self, field: NestedField, field_partner: Optional[P]) -> None:
    """Override this method to perform an action immediately before visiting a field."""

before_list_element(element, element_partner)

Override this method to perform an action immediately before visiting an element within a ListType.

Source code in pyiceberg/schema.py
def before_list_element(self, element: NestedField, element_partner: Optional[P]) -> None:
    """Override this method to perform an action immediately before visiting an element within a ListType."""
    self.before_field(element, element_partner)

before_map_key(key, key_partner)

Override this method to perform an action immediately before visiting a key within a MapType.

Source code in pyiceberg/schema.py
def before_map_key(self, key: NestedField, key_partner: Optional[P]) -> None:
    """Override this method to perform an action immediately before visiting a key within a MapType."""
    self.before_field(key, key_partner)

before_map_value(value, value_partner)

Override this method to perform an action immediately before visiting a value within a MapType.

Source code in pyiceberg/schema.py
def before_map_value(self, value: NestedField, value_partner: Optional[P]) -> None:
    """Override this method to perform an action immediately before visiting a value within a MapType."""
    self.before_field(value, value_partner)

field(field, field_partner, field_result) abstractmethod

Visit a nested field with a partner.

Source code in pyiceberg/schema.py
@abstractmethod
def field(self, field: NestedField, field_partner: Optional[P], field_result: T) -> T:
    """Visit a nested field with a partner."""

list(list_type, list_partner, element_result) abstractmethod

Visit a list type with a partner.

Source code in pyiceberg/schema.py
@abstractmethod
def list(self, list_type: ListType, list_partner: Optional[P], element_result: T) -> T:
    """Visit a list type with a partner."""

map(map_type, map_partner, key_result, value_result) abstractmethod

Visit a map type with a partner.

Source code in pyiceberg/schema.py
@abstractmethod
def map(self, map_type: MapType, map_partner: Optional[P], key_result: T, value_result: T) -> T:
    """Visit a map type with a partner."""

primitive(primitive, primitive_partner) abstractmethod

Visit a primitive type with a partner.

Source code in pyiceberg/schema.py
@abstractmethod
def primitive(self, primitive: PrimitiveType, primitive_partner: Optional[P]) -> T:
    """Visit a primitive type with a partner."""

schema(schema, schema_partner, struct_result) abstractmethod

Visit a schema with a partner.

Source code in pyiceberg/schema.py
@abstractmethod
def schema(self, schema: Schema, schema_partner: Optional[P], struct_result: T) -> T:
    """Visit a schema with a partner."""

struct(struct, struct_partner, field_results) abstractmethod

Visit a struct type with a partner.

Source code in pyiceberg/schema.py
@abstractmethod
def struct(self, struct: StructType, struct_partner: Optional[P], field_results: List[T]) -> T:
    """Visit a struct type with a partner."""

assign_fresh_schema_ids(schema_or_type, next_id=None)

Traverses the schema, and sets new IDs.

Source code in pyiceberg/schema.py
def assign_fresh_schema_ids(schema_or_type: Union[Schema, IcebergType], next_id: Optional[Callable[[], int]] = None) -> Schema:
    """Traverses the schema, and sets new IDs."""
    return pre_order_visit(schema_or_type, _SetFreshIDs(next_id_func=next_id))

build_position_accessors(schema_or_type)

Generate an index of field IDs to schema position accessors.

Parameters:

Name Type Description Default
schema_or_type Union[Schema, IcebergType]

A schema or type to index.

required

Returns:

Type Description
Dict[int, Accessor]

Dict[int, Accessor]: An index of field IDs to accessors.

Source code in pyiceberg/schema.py
def build_position_accessors(schema_or_type: Union[Schema, IcebergType]) -> Dict[int, Accessor]:
    """Generate an index of field IDs to schema position accessors.

    Args:
        schema_or_type (Union[Schema, IcebergType]): A schema or type to index.

    Returns:
        Dict[int, Accessor]: An index of field IDs to accessors.
    """
    return visit(schema_or_type, _BuildPositionAccessors())

index_by_id(schema_or_type)

Generate an index of field IDs to NestedField instances.

Parameters:

Name Type Description Default
schema_or_type Union[Schema, IcebergType]

A schema or type to index.

required

Returns:

Type Description
Dict[int, NestedField]

Dict[int, NestedField]: An index of field IDs to NestedField instances.

Source code in pyiceberg/schema.py
def index_by_id(schema_or_type: Union[Schema, IcebergType]) -> Dict[int, NestedField]:
    """Generate an index of field IDs to NestedField instances.

    Args:
        schema_or_type (Union[Schema, IcebergType]): A schema or type to index.

    Returns:
        Dict[int, NestedField]: An index of field IDs to NestedField instances.
    """
    return visit(schema_or_type, _IndexById())

index_by_name(schema_or_type)

Generate an index of field names to field IDs.

Parameters:

Name Type Description Default
schema_or_type Union[Schema, IcebergType]

A schema or type to index.

required

Returns:

Type Description
Dict[str, int]

Dict[str, int]: An index of field names to field IDs.

Source code in pyiceberg/schema.py
def index_by_name(schema_or_type: Union[Schema, IcebergType]) -> Dict[str, int]:
    """Generate an index of field names to field IDs.

    Args:
        schema_or_type (Union[Schema, IcebergType]): A schema or type to index.

    Returns:
        Dict[str, int]: An index of field names to field IDs.
    """
    if len(schema_or_type.fields) > 0:
        indexer = _IndexByName()
        visit(schema_or_type, indexer)
        return indexer.by_name()
    else:
        return EMPTY_DICT

index_name_by_id(schema_or_type)

Generate an index of field IDs full field names.

Parameters:

Name Type Description Default
schema_or_type Union[Schema, IcebergType]

A schema or type to index.

required

Returns:

Type Description
Dict[int, str]

Dict[str, int]: An index of field IDs to full names.

Source code in pyiceberg/schema.py
def index_name_by_id(schema_or_type: Union[Schema, IcebergType]) -> Dict[int, str]:
    """Generate an index of field IDs full field names.

    Args:
        schema_or_type (Union[Schema, IcebergType]): A schema or type to index.

    Returns:
        Dict[str, int]: An index of field IDs to full names.
    """
    indexer = _IndexByName()
    visit(schema_or_type, indexer)
    return indexer.by_id()

pre_order_visit(obj, visitor)

Apply a schema visitor to any point within a schema.

The function traverses the schema in pre-order fashion. This is a slimmed down version compared to the post-order traversal (missing before and after methods), mostly because we don't use the pre-order traversal much.

Parameters:

Name Type Description Default
obj Union[Schema, IcebergType]

An instance of a Schema or an IcebergType.

required
visitor PreOrderSchemaVisitor[T]

An instance of an implementation of the generic PreOrderSchemaVisitor base class.

required

Raises:

Type Description
NotImplementedError

If attempting to visit an unrecognized object type.

Source code in pyiceberg/schema.py
@singledispatch
def pre_order_visit(obj: Union[Schema, IcebergType], visitor: PreOrderSchemaVisitor[T]) -> T:
    """Apply a schema visitor to any point within a schema.

    The function traverses the schema in pre-order fashion. This is a slimmed down version
    compared to the post-order traversal (missing before and after methods), mostly
    because we don't use the pre-order traversal much.

    Args:
        obj (Union[Schema, IcebergType]): An instance of a Schema or an IcebergType.
        visitor (PreOrderSchemaVisitor[T]): An instance of an implementation of the generic PreOrderSchemaVisitor base class.

    Raises:
        NotImplementedError: If attempting to visit an unrecognized object type.
    """
    raise NotImplementedError(f"Cannot visit non-type: {obj}")

promote(file_type, read_type)

Promotes reading a file type to a read type.

Parameters:

Name Type Description Default
file_type IcebergType

The type of the Avro file.

required
read_type IcebergType

The requested read type.

required

Raises:

Type Description
ResolveError

If attempting to resolve an unrecognized object type.

Source code in pyiceberg/schema.py
@singledispatch
def promote(file_type: IcebergType, read_type: IcebergType) -> IcebergType:
    """Promotes reading a file type to a read type.

    Args:
        file_type (IcebergType): The type of the Avro file.
        read_type (IcebergType): The requested read type.

    Raises:
        ResolveError: If attempting to resolve an unrecognized object type.
    """
    if file_type == read_type:
        return file_type
    else:
        raise ResolveError(f"Cannot promote {file_type} to {read_type}")

prune_columns(schema, selected, select_full_types=True)

Prunes a column by only selecting a set of field-ids.

Parameters:

Name Type Description Default
schema Schema

The schema to be pruned.

required
selected Set[int]

The field-ids to be included.

required
select_full_types bool

Return the full struct when a subset is recorded

True

Returns:

Type Description
Schema

The pruned schema.

Source code in pyiceberg/schema.py
def prune_columns(schema: Schema, selected: Set[int], select_full_types: bool = True) -> Schema:
    """Prunes a column by only selecting a set of field-ids.

    Args:
        schema: The schema to be pruned.
        selected: The field-ids to be included.
        select_full_types: Return the full struct when a subset is recorded

    Returns:
        The pruned schema.
    """
    result = visit(schema.as_struct(), _PruneColumnsVisitor(selected, select_full_types))
    return Schema(
        *(result or StructType()).fields,
        schema_id=schema.schema_id,
        identifier_field_ids=list(selected.intersection(schema.identifier_field_ids)),
    )

sanitize_column_names(schema)

Sanitize column names to make them compatible with Avro.

The column name should be starting with '' or digit followed by a string only contains '', digit or alphabet, otherwise it will be sanitized to conform the avro naming convention.

Parameters:

Name Type Description Default
schema Schema

The schema to be sanitized.

required

Returns:

Type Description
Schema

The sanitized schema.

Source code in pyiceberg/schema.py
def sanitize_column_names(schema: Schema) -> Schema:
    """Sanitize column names to make them compatible with Avro.

    The column name should be starting with '_' or digit followed by a string only contains '_', digit or alphabet,
    otherwise it will be sanitized to conform the avro naming convention.

    Args:
        schema: The schema to be sanitized.

    Returns:
        The sanitized schema.
    """
    result = visit(schema.as_struct(), _SanitizeColumnsVisitor())
    return Schema(
        *(result or StructType()).fields,
        schema_id=schema.schema_id,
        identifier_field_ids=schema.identifier_field_ids,
    )

visit(obj, visitor)

Apply a schema visitor to any point within a schema.

The function traverses the schema in post-order fashion.

Parameters:

Name Type Description Default
obj Union[Schema, IcebergType]

An instance of a Schema or an IcebergType.

required
visitor SchemaVisitor[T]

An instance of an implementation of the generic SchemaVisitor base class.

required

Raises:

Type Description
NotImplementedError

If attempting to visit an unrecognized object type.

Source code in pyiceberg/schema.py
@singledispatch
def visit(obj: Union[Schema, IcebergType], visitor: SchemaVisitor[T]) -> T:
    """Apply a schema visitor to any point within a schema.

    The function traverses the schema in post-order fashion.

    Args:
        obj (Union[Schema, IcebergType]): An instance of a Schema or an IcebergType.
        visitor (SchemaVisitor[T]): An instance of an implementation of the generic SchemaVisitor base class.

    Raises:
        NotImplementedError: If attempting to visit an unrecognized object type.
    """
    raise NotImplementedError(f"Cannot visit non-type: {obj}")