Skip to content

transforms

BoundTransform

Bases: BoundTerm[L]

A transform expression.

Source code in pyiceberg/transforms.py
class BoundTransform(BoundTerm[L]):
    """A transform expression."""

    transform: Transform[L, Any]

    def __init__(self, term: BoundTerm[L], transform: Transform[L, Any]):
        self.term: BoundTerm[L] = term
        self.transform = transform

BucketTransform

Bases: Transform[S, int]

Base Transform class to transform a value into a bucket partition value.

Transforms are parameterized by a number of buckets. Bucket partition transforms use a 32-bit hash of the source value to produce a positive value by mod the bucket number.

Parameters:

Name Type Description Default
num_buckets int

The number of buckets.

required
Source code in pyiceberg/transforms.py
class BucketTransform(Transform[S, int]):
    """Base Transform class to transform a value into a bucket partition value.

    Transforms are parameterized by a number of buckets. Bucket partition transforms use a 32-bit
    hash of the source value to produce a positive value by mod the bucket number.

    Args:
      num_buckets (int): The number of buckets.
    """

    root: str = Field()
    _num_buckets: PositiveInt = PrivateAttr()

    def __init__(self, num_buckets: int, **data: Any) -> None:
        self._num_buckets = num_buckets
        super().__init__(f"bucket[{num_buckets}]", **data)

    @property
    def num_buckets(self) -> int:
        return self._num_buckets

    def hash(self, value: S) -> int:
        raise NotImplementedError()

    def apply(self, value: Optional[S]) -> Optional[int]:
        return (self.hash(value) & IntegerType.max) % self._num_buckets if value else None

    def result_type(self, source: IcebergType) -> IcebergType:
        return IntegerType()

    def project(self, name: str, pred: BoundPredicate[L]) -> Optional[UnboundPredicate[Any]]:
        transformer = self.transform(pred.term.ref().field.field_type)

        if isinstance(pred.term, BoundTransform):
            return _project_transform_predicate(self, name, pred)
        elif isinstance(pred, BoundUnaryPredicate):
            return pred.as_unbound(Reference(name))
        elif isinstance(pred, BoundEqualTo):
            return pred.as_unbound(Reference(name), _transform_literal(transformer, pred.literal))
        elif isinstance(pred, BoundIn):  # NotIn can't be projected
            return pred.as_unbound(Reference(name), {_transform_literal(transformer, literal) for literal in pred.literals})
        else:
            # - Comparison predicates can't be projected, notEq can't be projected
            # - Small ranges can be projected:
            #   For example, (x > 0) and (x < 3) can be turned into in({1, 2}) and projected.
            return None

    def can_transform(self, source: IcebergType) -> bool:
        return isinstance(
            source,
            (
                IntegerType,
                DateType,
                LongType,
                TimeType,
                TimestampType,
                TimestamptzType,
                DecimalType,
                StringType,
                FixedType,
                BinaryType,
                UUIDType,
            ),
        )

    def transform(self, source: IcebergType, bucket: bool = True) -> Callable[[Optional[Any]], Optional[int]]:
        if isinstance(source, (IntegerType, LongType, DateType, TimeType, TimestampType, TimestamptzType)):

            def hash_func(v: Any) -> int:
                return mmh3.hash(struct.pack("<q", v))

        elif isinstance(source, DecimalType):

            def hash_func(v: Any) -> int:
                return mmh3.hash(decimal_to_bytes(v))

        elif isinstance(source, (StringType, FixedType, BinaryType)):

            def hash_func(v: Any) -> int:
                return mmh3.hash(v)

        elif isinstance(source, UUIDType):

            def hash_func(v: Any) -> int:
                if isinstance(v, UUID):
                    return mmh3.hash(v.bytes)
                return mmh3.hash(v)

        else:
            raise ValueError(f"Unknown type {source}")

        if bucket:
            return lambda v: (hash_func(v) & IntegerType.max) % self._num_buckets if v is not None else None
        return hash_func

    def __repr__(self) -> str:
        """Return the string representation of the BucketTransform class."""
        return f"BucketTransform(num_buckets={self._num_buckets})"

__repr__()

Return the string representation of the BucketTransform class.

Source code in pyiceberg/transforms.py
def __repr__(self) -> str:
    """Return the string representation of the BucketTransform class."""
    return f"BucketTransform(num_buckets={self._num_buckets})"

DayTransform

Bases: TimeTransform[S]

Transforms a datetime value into a day value.

Example

transform = MonthTransform() transform.transform(DateType())(17501) 17501

Source code in pyiceberg/transforms.py
class DayTransform(TimeTransform[S]):
    """Transforms a datetime value into a day value.

    Example:
        >>> transform = MonthTransform()
        >>> transform.transform(DateType())(17501)
        17501
    """

    root: LiteralType["day"] = Field(default="day")  # noqa: F821

    def transform(self, source: IcebergType) -> Callable[[Optional[S]], Optional[int]]:
        if isinstance(source, DateType):

            def day_func(v: Any) -> int:
                return v

        elif isinstance(source, (TimestampType, TimestamptzType)):

            def day_func(v: Any) -> int:
                return datetime.micros_to_days(v)

        else:
            raise ValueError(f"Cannot apply day transform for type: {source}")

        return lambda v: day_func(v) if v is not None else None

    def can_transform(self, source: IcebergType) -> bool:
        return isinstance(source, (DateType, TimestampType, TimestamptzType))

    def result_type(self, source: IcebergType) -> IcebergType:
        return DateType()

    @property
    def granularity(self) -> TimeResolution:
        return TimeResolution.DAY

    def to_human_string(self, _: IcebergType, value: Optional[S]) -> str:
        return datetime.to_human_day(value) if isinstance(value, int) else "null"

    def __repr__(self) -> str:
        """Return the string representation of the DayTransform class."""
        return "DayTransform()"

__repr__()

Return the string representation of the DayTransform class.

Source code in pyiceberg/transforms.py
def __repr__(self) -> str:
    """Return the string representation of the DayTransform class."""
    return "DayTransform()"

HourTransform

Bases: TimeTransform[S]

Transforms a datetime value into a hour value.

Example

transform = HourTransform() transform.transform(TimestampType())(1512151975038194) 420042

Source code in pyiceberg/transforms.py
class HourTransform(TimeTransform[S]):
    """Transforms a datetime value into a hour value.

    Example:
        >>> transform = HourTransform()
        >>> transform.transform(TimestampType())(1512151975038194)
        420042
    """

    root: LiteralType["hour"] = Field(default="hour")  # noqa: F821

    def transform(self, source: IcebergType) -> Callable[[Optional[S]], Optional[int]]:
        if isinstance(source, (TimestampType, TimestamptzType)):

            def hour_func(v: Any) -> int:
                return datetime.micros_to_hours(v)

        else:
            raise ValueError(f"Cannot apply hour transform for type: {source}")

        return lambda v: hour_func(v) if v is not None else None

    def can_transform(self, source: IcebergType) -> bool:
        return isinstance(source, (TimestampType, TimestamptzType))

    @property
    def granularity(self) -> TimeResolution:
        return TimeResolution.HOUR

    def to_human_string(self, _: IcebergType, value: Optional[S]) -> str:
        return datetime.to_human_hour(value) if isinstance(value, int) else "null"

    def __repr__(self) -> str:
        """Return the string representation of the HourTransform class."""
        return "HourTransform()"

__repr__()

Return the string representation of the HourTransform class.

Source code in pyiceberg/transforms.py
def __repr__(self) -> str:
    """Return the string representation of the HourTransform class."""
    return "HourTransform()"

IdentityTransform

Bases: Transform[S, S]

Transforms a value into itself.

Example

transform = IdentityTransform() transform.transform(StringType())('hello-world') 'hello-world'

Source code in pyiceberg/transforms.py
class IdentityTransform(Transform[S, S]):
    """Transforms a value into itself.

    Example:
        >>> transform = IdentityTransform()
        >>> transform.transform(StringType())('hello-world')
        'hello-world'
    """

    root: LiteralType["identity"] = Field(default="identity")  # noqa: F821

    def __init__(self) -> None:
        super().__init__("identity")

    def transform(self, source: IcebergType) -> Callable[[Optional[S]], Optional[S]]:
        return lambda v: v

    def can_transform(self, source: IcebergType) -> bool:
        return source.is_primitive

    def result_type(self, source: IcebergType) -> IcebergType:
        return source

    def project(self, name: str, pred: BoundPredicate[L]) -> Optional[UnboundPredicate[Any]]:
        if isinstance(pred.term, BoundTransform):
            return _project_transform_predicate(self, name, pred)
        elif isinstance(pred, BoundUnaryPredicate):
            return pred.as_unbound(Reference(name))
        elif isinstance(pred, BoundLiteralPredicate):
            return pred.as_unbound(Reference(name), pred.literal)
        elif isinstance(pred, (BoundIn, BoundNotIn)):
            return pred.as_unbound(Reference(name), pred.literals)
        else:
            raise ValueError(f"Could not project: {pred}")

    @property
    def preserves_order(self) -> bool:
        return True

    def satisfies_order_of(self, other: Transform[S, T]) -> bool:
        """Ordering by value is the same as long as the other preserves order."""
        return other.preserves_order

    def to_human_string(self, source_type: IcebergType, value: Optional[S]) -> str:
        return _human_string(value, source_type) if value is not None else "null"

    def __str__(self) -> str:
        """Return the string representation of the IdentityTransform class."""
        return "identity"

    def __repr__(self) -> str:
        """Return the string representation of the IdentityTransform class."""
        return "IdentityTransform()"

__repr__()

Return the string representation of the IdentityTransform class.

Source code in pyiceberg/transforms.py
def __repr__(self) -> str:
    """Return the string representation of the IdentityTransform class."""
    return "IdentityTransform()"

__str__()

Return the string representation of the IdentityTransform class.

Source code in pyiceberg/transforms.py
def __str__(self) -> str:
    """Return the string representation of the IdentityTransform class."""
    return "identity"

satisfies_order_of(other)

Ordering by value is the same as long as the other preserves order.

Source code in pyiceberg/transforms.py
def satisfies_order_of(self, other: Transform[S, T]) -> bool:
    """Ordering by value is the same as long as the other preserves order."""
    return other.preserves_order

MonthTransform

Bases: TimeTransform[S]

Transforms a datetime value into a month value.

Example

transform = MonthTransform() transform.transform(DateType())(17501) 575

Source code in pyiceberg/transforms.py
class MonthTransform(TimeTransform[S]):
    """Transforms a datetime value into a month value.

    Example:
        >>> transform = MonthTransform()
        >>> transform.transform(DateType())(17501)
        575
    """

    root: LiteralType["month"] = Field(default="month")  # noqa: F821

    def transform(self, source: IcebergType) -> Callable[[Optional[S]], Optional[int]]:
        if isinstance(source, DateType):

            def month_func(v: Any) -> int:
                return datetime.days_to_months(v)

        elif isinstance(source, (TimestampType, TimestamptzType)):

            def month_func(v: Any) -> int:
                return datetime.micros_to_months(v)

        else:
            raise ValueError(f"Cannot apply month transform for type: {source}")

        return lambda v: month_func(v) if v is not None else None

    def can_transform(self, source: IcebergType) -> bool:
        return isinstance(source, (DateType, TimestampType, TimestamptzType))

    @property
    def granularity(self) -> TimeResolution:
        return TimeResolution.MONTH

    def to_human_string(self, _: IcebergType, value: Optional[S]) -> str:
        return datetime.to_human_month(value) if isinstance(value, int) else "null"

    def __repr__(self) -> str:
        """Return the string representation of the MonthTransform class."""
        return "MonthTransform()"

__repr__()

Return the string representation of the MonthTransform class.

Source code in pyiceberg/transforms.py
def __repr__(self) -> str:
    """Return the string representation of the MonthTransform class."""
    return "MonthTransform()"

Transform

Bases: IcebergRootModel[str], ABC, Generic[S, T]

Transform base class for concrete transforms.

A base class to transform values and project predicates on partition values. This class is not used directly. Instead, use one of module method to create the child classes.

Source code in pyiceberg/transforms.py
class Transform(IcebergRootModel[str], ABC, Generic[S, T]):
    """Transform base class for concrete transforms.

    A base class to transform values and project predicates on partition values.
    This class is not used directly. Instead, use one of module method to create the child classes.
    """

    root: str = Field()

    @abstractmethod
    def transform(self, source: IcebergType) -> Callable[[Optional[S]], Optional[T]]: ...

    @abstractmethod
    def can_transform(self, source: IcebergType) -> bool:
        return False

    @abstractmethod
    def result_type(self, source: IcebergType) -> IcebergType: ...

    @abstractmethod
    def project(self, name: str, pred: BoundPredicate[L]) -> Optional[UnboundPredicate[Any]]: ...

    @property
    def preserves_order(self) -> bool:
        return False

    def satisfies_order_of(self, other: Any) -> bool:
        return self == other

    def to_human_string(self, _: IcebergType, value: Optional[S]) -> str:
        return str(value) if value is not None else "null"

    @property
    def dedup_name(self) -> str:
        return self.__str__()

    def __str__(self) -> str:
        """Return the string representation of the Transform class."""
        return self.root

    def __eq__(self, other: Any) -> bool:
        """Return the equality of two instances of the Transform class."""
        if isinstance(other, Transform):
            return self.root == other.root
        return False

__eq__(other)

Return the equality of two instances of the Transform class.

Source code in pyiceberg/transforms.py
def __eq__(self, other: Any) -> bool:
    """Return the equality of two instances of the Transform class."""
    if isinstance(other, Transform):
        return self.root == other.root
    return False

__str__()

Return the string representation of the Transform class.

Source code in pyiceberg/transforms.py
def __str__(self) -> str:
    """Return the string representation of the Transform class."""
    return self.root

TruncateTransform

Bases: Transform[S, S]

A transform for truncating a value to a specified width.

Parameters:

Name Type Description Default
width int

The truncate width, should be positive.

required

Raises: ValueError: If a type is provided that is incompatible with a Truncate transform.

Source code in pyiceberg/transforms.py
class TruncateTransform(Transform[S, S]):
    """A transform for truncating a value to a specified width.

    Args:
      width (int): The truncate width, should be positive.
    Raises:
      ValueError: If a type is provided that is incompatible with a Truncate transform.
    """

    root: str = Field()
    _source_type: IcebergType = PrivateAttr()
    _width: PositiveInt = PrivateAttr()

    def __init__(self, width: int, **data: Any):
        super().__init__(root=f"truncate[{width}]", **data)
        self._width = width

    def can_transform(self, source: IcebergType) -> bool:
        return isinstance(source, (IntegerType, LongType, StringType, BinaryType, DecimalType))

    def result_type(self, source: IcebergType) -> IcebergType:
        return source

    @property
    def preserves_order(self) -> bool:
        return True

    @property
    def source_type(self) -> IcebergType:
        return self._source_type

    def project(self, name: str, pred: BoundPredicate[L]) -> Optional[UnboundPredicate[Any]]:
        field_type = pred.term.ref().field.field_type

        if isinstance(pred.term, BoundTransform):
            return _project_transform_predicate(self, name, pred)

        if isinstance(pred, BoundUnaryPredicate):
            return pred.as_unbound(Reference(name))
        elif isinstance(pred, BoundIn):
            return _set_apply_transform(name, pred, self.transform(field_type))
        elif isinstance(field_type, (IntegerType, LongType, DecimalType)):
            if isinstance(pred, BoundLiteralPredicate):
                return _truncate_number(name, pred, self.transform(field_type))
        elif isinstance(field_type, (BinaryType, StringType)):
            if isinstance(pred, BoundLiteralPredicate):
                return _truncate_array(name, pred, self.transform(field_type))
        return None

    @property
    def width(self) -> int:
        return self._width

    def transform(self, source: IcebergType) -> Callable[[Optional[S]], Optional[S]]:
        if isinstance(source, (IntegerType, LongType)):

            def truncate_func(v: Any) -> Any:
                return v - v % self._width

        elif isinstance(source, (StringType, BinaryType)):

            def truncate_func(v: Any) -> Any:
                return v[0 : min(self._width, len(v))]

        elif isinstance(source, DecimalType):

            def truncate_func(v: Any) -> Any:
                return truncate_decimal(v, self._width)

        else:
            raise ValueError(f"Cannot truncate for type: {source}")

        return lambda v: truncate_func(v) if v is not None else None

    def satisfies_order_of(self, other: Transform[S, T]) -> bool:
        if self == other:
            return True
        elif (
            isinstance(self.source_type, StringType)
            and isinstance(other, TruncateTransform)
            and isinstance(other.source_type, StringType)
        ):
            return self.width >= other.width

        return False

    def to_human_string(self, _: IcebergType, value: Optional[S]) -> str:
        if value is None:
            return "null"
        elif isinstance(value, bytes):
            return _base64encode(value)
        else:
            return str(value)

    def __repr__(self) -> str:
        """Return the string representation of the TruncateTransform class."""
        return f"TruncateTransform(width={self._width})"

__repr__()

Return the string representation of the TruncateTransform class.

Source code in pyiceberg/transforms.py
def __repr__(self) -> str:
    """Return the string representation of the TruncateTransform class."""
    return f"TruncateTransform(width={self._width})"

UnknownTransform

Bases: Transform[S, T]

A transform that represents when an unknown transform is provided.

Parameters:

Name Type Description Default
transform str

A string name of a transform.

required

Other Parameters:

Name Type Description
source_type IcebergType

An Iceberg Type.

Source code in pyiceberg/transforms.py
class UnknownTransform(Transform[S, T]):
    """A transform that represents when an unknown transform is provided.

    Args:
      transform (str): A string name of a transform.

    Keyword Args:
      source_type (IcebergType): An Iceberg `Type`.
    """

    root: LiteralType["unknown"] = Field(default="unknown")  # noqa: F821
    _transform: str = PrivateAttr()

    def __init__(self, transform: str, **data: Any):
        super().__init__(**data)
        self._transform = transform

    def transform(self, source: IcebergType) -> Callable[[Optional[S]], Optional[T]]:
        raise AttributeError(f"Cannot apply unsupported transform: {self}")

    def can_transform(self, source: IcebergType) -> bool:
        return False

    def result_type(self, source: IcebergType) -> StringType:
        return StringType()

    def project(self, name: str, pred: BoundPredicate[L]) -> Optional[UnboundPredicate[Any]]:
        return None

    def __repr__(self) -> str:
        """Return the string representation of the UnknownTransform class."""
        return f"UnknownTransform(transform={repr(self._transform)})"

__repr__()

Return the string representation of the UnknownTransform class.

Source code in pyiceberg/transforms.py
def __repr__(self) -> str:
    """Return the string representation of the UnknownTransform class."""
    return f"UnknownTransform(transform={repr(self._transform)})"

VoidTransform

Bases: Transform[S, None], Singleton

A transform that always returns None.

Source code in pyiceberg/transforms.py
class VoidTransform(Transform[S, None], Singleton):
    """A transform that always returns None."""

    root: str = "void"

    def transform(self, source: IcebergType) -> Callable[[Optional[S]], Optional[T]]:
        return lambda v: None

    def can_transform(self, _: IcebergType) -> bool:
        return True

    def result_type(self, source: IcebergType) -> IcebergType:
        return source

    def project(self, name: str, pred: BoundPredicate[L]) -> Optional[UnboundPredicate[Any]]:
        return None

    def to_human_string(self, _: IcebergType, value: Optional[S]) -> str:
        return "null"

    def __repr__(self) -> str:
        """Return the string representation of the VoidTransform class."""
        return "VoidTransform()"

__repr__()

Return the string representation of the VoidTransform class.

Source code in pyiceberg/transforms.py
def __repr__(self) -> str:
    """Return the string representation of the VoidTransform class."""
    return "VoidTransform()"

YearTransform

Bases: TimeTransform[S]

Transforms a datetime value into a year value.

Example

transform = YearTransform() transform.transform(TimestampType())(1512151975038194) 47

Source code in pyiceberg/transforms.py
class YearTransform(TimeTransform[S]):
    """Transforms a datetime value into a year value.

    Example:
        >>> transform = YearTransform()
        >>> transform.transform(TimestampType())(1512151975038194)
        47
    """

    root: LiteralType["year"] = Field(default="year")  # noqa: F821

    def transform(self, source: IcebergType) -> Callable[[Optional[S]], Optional[int]]:
        if isinstance(source, DateType):

            def year_func(v: Any) -> int:
                return datetime.days_to_years(v)

        elif isinstance(source, (TimestampType, TimestamptzType)):

            def year_func(v: Any) -> int:
                return datetime.micros_to_years(v)

        else:
            raise ValueError(f"Cannot apply year transform for type: {source}")

        return lambda v: year_func(v) if v is not None else None

    def can_transform(self, source: IcebergType) -> bool:
        return isinstance(source, (DateType, TimestampType, TimestamptzType))

    @property
    def granularity(self) -> TimeResolution:
        return TimeResolution.YEAR

    def to_human_string(self, _: IcebergType, value: Optional[S]) -> str:
        return datetime.to_human_year(value) if isinstance(value, int) else "null"

    def __repr__(self) -> str:
        """Return the string representation of the YearTransform class."""
        return "YearTransform()"

__repr__()

Return the string representation of the YearTransform class.

Source code in pyiceberg/transforms.py
def __repr__(self) -> str:
    """Return the string representation of the YearTransform class."""
    return "YearTransform()"