Skip to content

partitioning

PartitionField

Bases: IcebergBaseModel

PartitionField represents how one partition value is derived from the source column via transformation.

Attributes:

Name Type Description
source_id(int)

The source column id of table's schema.

field_id(int)

The partition field id across all the table partition specs.

transform(Transform)

The transform used to produce partition values from source column.

name(str)

The name of this partition field.

Source code in pyiceberg/partitioning.py
class PartitionField(IcebergBaseModel):
    """PartitionField represents how one partition value is derived from the source column via transformation.

    Attributes:
        source_id(int): The source column id of table's schema.
        field_id(int): The partition field id across all the table partition specs.
        transform(Transform): The transform used to produce partition values from source column.
        name(str): The name of this partition field.
    """

    source_id: int = Field(alias="source-id")
    field_id: int = Field(alias="field-id")
    transform: Annotated[  # type: ignore
        Transform,
        BeforeValidator(parse_transform),
        PlainSerializer(lambda c: str(c), return_type=str),  # pylint: disable=W0108
        WithJsonSchema({"type": "string"}, mode="serialization"),
    ] = Field()
    name: str = Field()

    def __init__(
        self,
        source_id: Optional[int] = None,
        field_id: Optional[int] = None,
        transform: Optional[Transform[Any, Any]] = None,
        name: Optional[str] = None,
        **data: Any,
    ):
        if source_id is not None:
            data["source-id"] = source_id
        if field_id is not None:
            data["field-id"] = field_id
        if transform is not None:
            data["transform"] = transform
        if name is not None:
            data["name"] = name

        super().__init__(**data)

    def __str__(self) -> str:
        """Return the string representation of the PartitionField class."""
        return f"{self.field_id}: {self.name}: {self.transform}({self.source_id})"

__str__()

Return the string representation of the PartitionField class.

Source code in pyiceberg/partitioning.py
def __str__(self) -> str:
    """Return the string representation of the PartitionField class."""
    return f"{self.field_id}: {self.name}: {self.transform}({self.source_id})"

PartitionSpec

Bases: IcebergBaseModel

PartitionSpec captures the transformation from table data to partition values.

Attributes:

Name Type Description
spec_id(int)

any change to PartitionSpec will produce a new specId.

fields(Tuple[PartitionField)

list of partition fields to produce partition values.

Source code in pyiceberg/partitioning.py
class PartitionSpec(IcebergBaseModel):
    """
    PartitionSpec captures the transformation from table data to partition values.

    Attributes:
        spec_id(int): any change to PartitionSpec will produce a new specId.
        fields(Tuple[PartitionField): list of partition fields to produce partition values.
    """

    spec_id: int = Field(alias="spec-id", default=INITIAL_PARTITION_SPEC_ID)
    fields: Tuple[PartitionField, ...] = Field(default_factory=tuple)

    def __init__(
        self,
        *fields: PartitionField,
        **data: Any,
    ):
        if fields:
            data["fields"] = tuple(fields)
        super().__init__(**data)

    def __eq__(self, other: Any) -> bool:
        """
        Produce a boolean to return True if two objects are considered equal.

        Note:
            Equality of PartitionSpec is determined by spec_id and partition fields only.
        """
        if not isinstance(other, PartitionSpec):
            return False
        return self.spec_id == other.spec_id and self.fields == other.fields

    def __str__(self) -> str:
        """
        Produce a human-readable string representation of PartitionSpec.

        Note:
            Only include list of partition fields in the PartitionSpec's string representation.
        """
        result_str = "["
        if self.fields:
            result_str += "\n  " + "\n  ".join([str(field) for field in self.fields]) + "\n"
        result_str += "]"
        return result_str

    def __repr__(self) -> str:
        """Return the string representation of the PartitionSpec class."""
        fields = f"{', '.join(repr(column) for column in self.fields)}, " if self.fields else ""
        return f"PartitionSpec({fields}spec_id={self.spec_id})"

    def is_unpartitioned(self) -> bool:
        return not self.fields

    @property
    def last_assigned_field_id(self) -> int:
        if self.fields:
            return max(pf.field_id for pf in self.fields)
        return PARTITION_FIELD_ID_START - 1

    @cached_property
    def source_id_to_fields_map(self) -> Dict[int, List[PartitionField]]:
        source_id_to_fields_map: Dict[int, List[PartitionField]] = {}
        for partition_field in self.fields:
            existing = source_id_to_fields_map.get(partition_field.source_id, [])
            existing.append(partition_field)
            source_id_to_fields_map[partition_field.source_id] = existing
        return source_id_to_fields_map

    def fields_by_source_id(self, field_id: int) -> List[PartitionField]:
        return self.source_id_to_fields_map.get(field_id, [])

    def compatible_with(self, other: PartitionSpec) -> bool:
        """Produce a boolean to return True if two PartitionSpec are considered compatible."""
        if self == other:
            return True
        if len(self.fields) != len(other.fields):
            return False
        return all(
            this_field.source_id == that_field.source_id
            and this_field.transform == that_field.transform
            and this_field.name == that_field.name
            for this_field, that_field in zip(self.fields, other.fields)
        )

    def partition_type(self, schema: Schema) -> StructType:
        """Produce a struct of the PartitionSpec.

        The partition fields should be optional:

        - All partition transforms are required to produce null if the input value is null, so it can
          happen when the source column is optional.
        - Partition fields may be added later, in which case not all files would have the result field,
          and it may be null.

        There is a case where we can guarantee that a partition field in the first and only partition spec
        that uses a required source column will never be null, but it doesn't seem worth tracking this case.

        :param schema: The schema to bind to.
        :return: A StructType that represents the PartitionSpec, with a NestedField for each PartitionField.
        """
        nested_fields = []
        for field in self.fields:
            source_type = schema.find_type(field.source_id)
            result_type = field.transform.result_type(source_type)
            required = schema.find_field(field.source_id).required
            nested_fields.append(NestedField(field.field_id, field.name, result_type, required=required))
        return StructType(*nested_fields)

    def partition_to_path(self, data: Record, schema: Schema) -> str:
        partition_type = self.partition_type(schema)
        field_types = partition_type.fields

        field_strs = []
        value_strs = []
        for pos in range(len(self.fields)):
            partition_field = self.fields[pos]
            value_str = partition_field.transform.to_human_string(field_types[pos].field_type, value=data[pos])

            value_str = quote(value_str, safe="")
            value_strs.append(value_str)
            field_strs.append(partition_field.name)

        path = "/".join([field_str + "=" + value_str for field_str, value_str in zip(field_strs, value_strs)])
        return path

__eq__(other)

Produce a boolean to return True if two objects are considered equal.

Note

Equality of PartitionSpec is determined by spec_id and partition fields only.

Source code in pyiceberg/partitioning.py
def __eq__(self, other: Any) -> bool:
    """
    Produce a boolean to return True if two objects are considered equal.

    Note:
        Equality of PartitionSpec is determined by spec_id and partition fields only.
    """
    if not isinstance(other, PartitionSpec):
        return False
    return self.spec_id == other.spec_id and self.fields == other.fields

__repr__()

Return the string representation of the PartitionSpec class.

Source code in pyiceberg/partitioning.py
def __repr__(self) -> str:
    """Return the string representation of the PartitionSpec class."""
    fields = f"{', '.join(repr(column) for column in self.fields)}, " if self.fields else ""
    return f"PartitionSpec({fields}spec_id={self.spec_id})"

__str__()

Produce a human-readable string representation of PartitionSpec.

Note

Only include list of partition fields in the PartitionSpec's string representation.

Source code in pyiceberg/partitioning.py
def __str__(self) -> str:
    """
    Produce a human-readable string representation of PartitionSpec.

    Note:
        Only include list of partition fields in the PartitionSpec's string representation.
    """
    result_str = "["
    if self.fields:
        result_str += "\n  " + "\n  ".join([str(field) for field in self.fields]) + "\n"
    result_str += "]"
    return result_str

compatible_with(other)

Produce a boolean to return True if two PartitionSpec are considered compatible.

Source code in pyiceberg/partitioning.py
def compatible_with(self, other: PartitionSpec) -> bool:
    """Produce a boolean to return True if two PartitionSpec are considered compatible."""
    if self == other:
        return True
    if len(self.fields) != len(other.fields):
        return False
    return all(
        this_field.source_id == that_field.source_id
        and this_field.transform == that_field.transform
        and this_field.name == that_field.name
        for this_field, that_field in zip(self.fields, other.fields)
    )

partition_type(schema)

Produce a struct of the PartitionSpec.

The partition fields should be optional:

  • All partition transforms are required to produce null if the input value is null, so it can happen when the source column is optional.
  • Partition fields may be added later, in which case not all files would have the result field, and it may be null.

There is a case where we can guarantee that a partition field in the first and only partition spec that uses a required source column will never be null, but it doesn't seem worth tracking this case.

:param schema: The schema to bind to. :return: A StructType that represents the PartitionSpec, with a NestedField for each PartitionField.

Source code in pyiceberg/partitioning.py
def partition_type(self, schema: Schema) -> StructType:
    """Produce a struct of the PartitionSpec.

    The partition fields should be optional:

    - All partition transforms are required to produce null if the input value is null, so it can
      happen when the source column is optional.
    - Partition fields may be added later, in which case not all files would have the result field,
      and it may be null.

    There is a case where we can guarantee that a partition field in the first and only partition spec
    that uses a required source column will never be null, but it doesn't seem worth tracking this case.

    :param schema: The schema to bind to.
    :return: A StructType that represents the PartitionSpec, with a NestedField for each PartitionField.
    """
    nested_fields = []
    for field in self.fields:
        source_type = schema.find_type(field.source_id)
        result_type = field.transform.result_type(source_type)
        required = schema.find_field(field.source_id).required
        nested_fields.append(NestedField(field.field_id, field.name, result_type, required=required))
    return StructType(*nested_fields)

PartitionSpecVisitor

Bases: Generic[T], ABC

Source code in pyiceberg/partitioning.py
class PartitionSpecVisitor(Generic[T], ABC):
    @abstractmethod
    def identity(self, field_id: int, source_name: str, source_id: int) -> T:
        """Visit identity partition field."""

    @abstractmethod
    def bucket(self, field_id: int, source_name: str, source_id: int, num_buckets: int) -> T:
        """Visit bucket partition field."""

    @abstractmethod
    def truncate(self, field_id: int, source_name: str, source_id: int, width: int) -> T:
        """Visit truncate partition field."""

    @abstractmethod
    def year(self, field_id: int, source_name: str, source_id: int) -> T:
        """Visit year partition field."""

    @abstractmethod
    def month(self, field_id: int, source_name: str, source_id: int) -> T:
        """Visit month partition field."""

    @abstractmethod
    def day(self, field_id: int, source_name: str, source_id: int) -> T:
        """Visit day partition field."""

    @abstractmethod
    def hour(self, field_id: int, source_name: str, source_id: int) -> T:
        """Visit hour partition field."""

    @abstractmethod
    def always_null(self, field_id: int, source_name: str, source_id: int) -> T:
        """Visit void partition field."""

    @abstractmethod
    def unknown(self, field_id: int, source_name: str, source_id: int, transform: str) -> T:
        """Visit unknown partition field."""
        raise ValueError(f"Unknown transform is not supported: {transform}")

always_null(field_id, source_name, source_id) abstractmethod

Visit void partition field.

Source code in pyiceberg/partitioning.py
@abstractmethod
def always_null(self, field_id: int, source_name: str, source_id: int) -> T:
    """Visit void partition field."""

bucket(field_id, source_name, source_id, num_buckets) abstractmethod

Visit bucket partition field.

Source code in pyiceberg/partitioning.py
@abstractmethod
def bucket(self, field_id: int, source_name: str, source_id: int, num_buckets: int) -> T:
    """Visit bucket partition field."""

day(field_id, source_name, source_id) abstractmethod

Visit day partition field.

Source code in pyiceberg/partitioning.py
@abstractmethod
def day(self, field_id: int, source_name: str, source_id: int) -> T:
    """Visit day partition field."""

hour(field_id, source_name, source_id) abstractmethod

Visit hour partition field.

Source code in pyiceberg/partitioning.py
@abstractmethod
def hour(self, field_id: int, source_name: str, source_id: int) -> T:
    """Visit hour partition field."""

identity(field_id, source_name, source_id) abstractmethod

Visit identity partition field.

Source code in pyiceberg/partitioning.py
@abstractmethod
def identity(self, field_id: int, source_name: str, source_id: int) -> T:
    """Visit identity partition field."""

month(field_id, source_name, source_id) abstractmethod

Visit month partition field.

Source code in pyiceberg/partitioning.py
@abstractmethod
def month(self, field_id: int, source_name: str, source_id: int) -> T:
    """Visit month partition field."""

truncate(field_id, source_name, source_id, width) abstractmethod

Visit truncate partition field.

Source code in pyiceberg/partitioning.py
@abstractmethod
def truncate(self, field_id: int, source_name: str, source_id: int, width: int) -> T:
    """Visit truncate partition field."""

unknown(field_id, source_name, source_id, transform) abstractmethod

Visit unknown partition field.

Source code in pyiceberg/partitioning.py
@abstractmethod
def unknown(self, field_id: int, source_name: str, source_id: int, transform: str) -> T:
    """Visit unknown partition field."""
    raise ValueError(f"Unknown transform is not supported: {transform}")

year(field_id, source_name, source_id) abstractmethod

Visit year partition field.

Source code in pyiceberg/partitioning.py
@abstractmethod
def year(self, field_id: int, source_name: str, source_id: int) -> T:
    """Visit year partition field."""

partition_record_value(partition_field, value, schema)

Return the Partition Record representation of the value.

The value is first converted to internal partition representation. For example, UUID is converted to bytes[16], DateType to days since epoch, etc.

Then the corresponding PartitionField's transform is applied to return the final partition record value.

Source code in pyiceberg/partitioning.py
def partition_record_value(partition_field: PartitionField, value: Any, schema: Schema) -> Any:
    """
    Return the Partition Record representation of the value.

    The value is first converted to internal partition representation.
    For example, UUID is converted to bytes[16], DateType to days since epoch, etc.

    Then the corresponding PartitionField's transform is applied to return
    the final partition record value.
    """
    iceberg_type = schema.find_field(name_or_id=partition_field.source_id).field_type
    iceberg_typed_value = _to_partition_representation(iceberg_type, value)
    transformed_value = partition_field.transform.transform(iceberg_type)(iceberg_typed_value)
    return transformed_value