Skip to content

visitors

BindVisitor

Bases: BooleanExpressionVisitor[BooleanExpression]

Rewrites a boolean expression by replacing unbound references with references to fields in a struct schema.

Parameters:

Name Type Description Default
schema Schema

A schema to use when binding the expression.

required
case_sensitive bool

Whether to consider case when binding a reference to a field in a schema, defaults to True.

required

Raises:

Type Description
TypeError

In the case a predicate is already bound.

Source code in pyiceberg/expressions/visitors.py
class BindVisitor(BooleanExpressionVisitor[BooleanExpression]):
    """Rewrites a boolean expression by replacing unbound references with references to fields in a struct schema.

    Args:
      schema (Schema): A schema to use when binding the expression.
      case_sensitive (bool): Whether to consider case when binding a reference to a field in a schema, defaults to True.

    Raises:
        TypeError: In the case a predicate is already bound.
    """

    schema: Schema
    case_sensitive: bool

    def __init__(self, schema: Schema, case_sensitive: bool) -> None:
        self.schema = schema
        self.case_sensitive = case_sensitive

    def visit_true(self) -> BooleanExpression:
        return AlwaysTrue()

    def visit_false(self) -> BooleanExpression:
        return AlwaysFalse()

    def visit_not(self, child_result: BooleanExpression) -> BooleanExpression:
        return Not(child=child_result)

    def visit_and(self, left_result: BooleanExpression, right_result: BooleanExpression) -> BooleanExpression:
        return And(left=left_result, right=right_result)

    def visit_or(self, left_result: BooleanExpression, right_result: BooleanExpression) -> BooleanExpression:
        return Or(left=left_result, right=right_result)

    def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> BooleanExpression:
        return predicate.bind(self.schema, case_sensitive=self.case_sensitive)

    def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpression:
        raise TypeError(f"Found already bound predicate: {predicate}")

BooleanExpressionVisitor

Bases: Generic[T], ABC

Source code in pyiceberg/expressions/visitors.py
class BooleanExpressionVisitor(Generic[T], ABC):
    @abstractmethod
    def visit_true(self) -> T:
        """Visit method for an AlwaysTrue boolean expression.

        Note: This visit method has no arguments since AlwaysTrue instances have no context.
        """

    @abstractmethod
    def visit_false(self) -> T:
        """Visit method for an AlwaysFalse boolean expression.

        Note: This visit method has no arguments since AlwaysFalse instances have no context.
        """

    @abstractmethod
    def visit_not(self, child_result: T) -> T:
        """Visit method for a Not boolean expression.

        Args:
            child_result (T): The result of visiting the child of the Not boolean expression.
        """

    @abstractmethod
    def visit_and(self, left_result: T, right_result: T) -> T:
        """Visit method for an And boolean expression.

        Args:
            left_result (T): The result of visiting the left side of the expression.
            right_result (T): The result of visiting the right side of the expression.
        """

    @abstractmethod
    def visit_or(self, left_result: T, right_result: T) -> T:
        """Visit method for an Or boolean expression.

        Args:
            left_result (T): The result of visiting the left side of the expression.
            right_result (T): The result of visiting the right side of the expression.
        """

    @abstractmethod
    def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> T:
        """Visit method for an unbound predicate in an expression tree.

        Args:
            predicate (UnboundPredicate[L): An instance of an UnboundPredicate.
        """

    @abstractmethod
    def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> T:
        """Visit method for a bound predicate in an expression tree.

        Args:
            predicate (BoundPredicate[L]): An instance of a BoundPredicate.
        """

visit_and(left_result, right_result) abstractmethod

Visit method for an And boolean expression.

Parameters:

Name Type Description Default
left_result T

The result of visiting the left side of the expression.

required
right_result T

The result of visiting the right side of the expression.

required
Source code in pyiceberg/expressions/visitors.py
@abstractmethod
def visit_and(self, left_result: T, right_result: T) -> T:
    """Visit method for an And boolean expression.

    Args:
        left_result (T): The result of visiting the left side of the expression.
        right_result (T): The result of visiting the right side of the expression.
    """

visit_bound_predicate(predicate) abstractmethod

Visit method for a bound predicate in an expression tree.

Parameters:

Name Type Description Default
predicate BoundPredicate[L]

An instance of a BoundPredicate.

required
Source code in pyiceberg/expressions/visitors.py
@abstractmethod
def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> T:
    """Visit method for a bound predicate in an expression tree.

    Args:
        predicate (BoundPredicate[L]): An instance of a BoundPredicate.
    """

visit_false() abstractmethod

Visit method for an AlwaysFalse boolean expression.

Note: This visit method has no arguments since AlwaysFalse instances have no context.

Source code in pyiceberg/expressions/visitors.py
@abstractmethod
def visit_false(self) -> T:
    """Visit method for an AlwaysFalse boolean expression.

    Note: This visit method has no arguments since AlwaysFalse instances have no context.
    """

visit_not(child_result) abstractmethod

Visit method for a Not boolean expression.

Parameters:

Name Type Description Default
child_result T

The result of visiting the child of the Not boolean expression.

required
Source code in pyiceberg/expressions/visitors.py
@abstractmethod
def visit_not(self, child_result: T) -> T:
    """Visit method for a Not boolean expression.

    Args:
        child_result (T): The result of visiting the child of the Not boolean expression.
    """

visit_or(left_result, right_result) abstractmethod

Visit method for an Or boolean expression.

Parameters:

Name Type Description Default
left_result T

The result of visiting the left side of the expression.

required
right_result T

The result of visiting the right side of the expression.

required
Source code in pyiceberg/expressions/visitors.py
@abstractmethod
def visit_or(self, left_result: T, right_result: T) -> T:
    """Visit method for an Or boolean expression.

    Args:
        left_result (T): The result of visiting the left side of the expression.
        right_result (T): The result of visiting the right side of the expression.
    """

visit_true() abstractmethod

Visit method for an AlwaysTrue boolean expression.

Note: This visit method has no arguments since AlwaysTrue instances have no context.

Source code in pyiceberg/expressions/visitors.py
@abstractmethod
def visit_true(self) -> T:
    """Visit method for an AlwaysTrue boolean expression.

    Note: This visit method has no arguments since AlwaysTrue instances have no context.
    """

visit_unbound_predicate(predicate) abstractmethod

Visit method for an unbound predicate in an expression tree.

Parameters:

Name Type Description Default
predicate UnboundPredicate[L

An instance of an UnboundPredicate.

required
Source code in pyiceberg/expressions/visitors.py
@abstractmethod
def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> T:
    """Visit method for an unbound predicate in an expression tree.

    Args:
        predicate (UnboundPredicate[L): An instance of an UnboundPredicate.
    """

BoundBooleanExpressionVisitor

Bases: BooleanExpressionVisitor[T], ABC

Source code in pyiceberg/expressions/visitors.py
class BoundBooleanExpressionVisitor(BooleanExpressionVisitor[T], ABC):
    @abstractmethod
    def visit_in(self, term: BoundTerm[L], literals: Set[L]) -> T:
        """Visit a bound In predicate."""

    @abstractmethod
    def visit_not_in(self, term: BoundTerm[L], literals: Set[L]) -> T:
        """Visit a bound NotIn predicate."""

    @abstractmethod
    def visit_is_nan(self, term: BoundTerm[L]) -> T:
        """Visit a bound IsNan predicate."""

    @abstractmethod
    def visit_not_nan(self, term: BoundTerm[L]) -> T:
        """Visit a bound NotNan predicate."""

    @abstractmethod
    def visit_is_null(self, term: BoundTerm[L]) -> T:
        """Visit a bound IsNull predicate."""

    @abstractmethod
    def visit_not_null(self, term: BoundTerm[L]) -> T:
        """Visit a bound NotNull predicate."""

    @abstractmethod
    def visit_equal(self, term: BoundTerm[L], literal: Literal[L]) -> T:
        """Visit a bound Equal predicate."""

    @abstractmethod
    def visit_not_equal(self, term: BoundTerm[L], literal: Literal[L]) -> T:
        """Visit a bound NotEqual predicate."""

    @abstractmethod
    def visit_greater_than_or_equal(self, term: BoundTerm[L], literal: Literal[L]) -> T:
        """Visit a bound GreaterThanOrEqual predicate."""

    @abstractmethod
    def visit_greater_than(self, term: BoundTerm[L], literal: Literal[L]) -> T:
        """Visit a bound GreaterThan predicate."""

    @abstractmethod
    def visit_less_than(self, term: BoundTerm[L], literal: Literal[L]) -> T:
        """Visit a bound LessThan predicate."""

    @abstractmethod
    def visit_less_than_or_equal(self, term: BoundTerm[L], literal: Literal[L]) -> T:
        """Visit a bound LessThanOrEqual predicate."""

    @abstractmethod
    def visit_true(self) -> T:
        """Visit a bound True predicate."""

    @abstractmethod
    def visit_false(self) -> T:
        """Visit a bound False predicate."""

    @abstractmethod
    def visit_not(self, child_result: T) -> T:
        """Visit a bound Not predicate."""

    @abstractmethod
    def visit_and(self, left_result: T, right_result: T) -> T:
        """Visit a bound And predicate."""

    @abstractmethod
    def visit_or(self, left_result: T, right_result: T) -> T:
        """Visit a bound Or predicate."""

    @abstractmethod
    def visit_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> T:
        """Visit bound StartsWith predicate."""

    @abstractmethod
    def visit_not_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> T:
        """Visit bound NotStartsWith predicate."""

    def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> T:
        """Visit an unbound predicate.

        Args:
            predicate (UnboundPredicate[L]): An unbound predicate.
        Raises:
            TypeError: This always raises since an unbound predicate is not expected in a bound boolean expression.
        """
        raise TypeError(f"Not a bound predicate: {predicate}")

    def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> T:
        """Visit a bound predicate.

        Args:
            predicate (BoundPredicate[L]): A bound predicate.
        """
        return visit_bound_predicate(predicate, self)

visit_and(left_result, right_result) abstractmethod

Visit a bound And predicate.

Source code in pyiceberg/expressions/visitors.py
@abstractmethod
def visit_and(self, left_result: T, right_result: T) -> T:
    """Visit a bound And predicate."""

visit_bound_predicate(predicate)

Visit a bound predicate.

Parameters:

Name Type Description Default
predicate BoundPredicate[L]

A bound predicate.

required
Source code in pyiceberg/expressions/visitors.py
def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> T:
    """Visit a bound predicate.

    Args:
        predicate (BoundPredicate[L]): A bound predicate.
    """
    return visit_bound_predicate(predicate, self)

visit_equal(term, literal) abstractmethod

Visit a bound Equal predicate.

Source code in pyiceberg/expressions/visitors.py
@abstractmethod
def visit_equal(self, term: BoundTerm[L], literal: Literal[L]) -> T:
    """Visit a bound Equal predicate."""

visit_false() abstractmethod

Visit a bound False predicate.

Source code in pyiceberg/expressions/visitors.py
@abstractmethod
def visit_false(self) -> T:
    """Visit a bound False predicate."""

visit_greater_than(term, literal) abstractmethod

Visit a bound GreaterThan predicate.

Source code in pyiceberg/expressions/visitors.py
@abstractmethod
def visit_greater_than(self, term: BoundTerm[L], literal: Literal[L]) -> T:
    """Visit a bound GreaterThan predicate."""

visit_greater_than_or_equal(term, literal) abstractmethod

Visit a bound GreaterThanOrEqual predicate.

Source code in pyiceberg/expressions/visitors.py
@abstractmethod
def visit_greater_than_or_equal(self, term: BoundTerm[L], literal: Literal[L]) -> T:
    """Visit a bound GreaterThanOrEqual predicate."""

visit_in(term, literals) abstractmethod

Visit a bound In predicate.

Source code in pyiceberg/expressions/visitors.py
@abstractmethod
def visit_in(self, term: BoundTerm[L], literals: Set[L]) -> T:
    """Visit a bound In predicate."""

visit_is_nan(term) abstractmethod

Visit a bound IsNan predicate.

Source code in pyiceberg/expressions/visitors.py
@abstractmethod
def visit_is_nan(self, term: BoundTerm[L]) -> T:
    """Visit a bound IsNan predicate."""

visit_is_null(term) abstractmethod

Visit a bound IsNull predicate.

Source code in pyiceberg/expressions/visitors.py
@abstractmethod
def visit_is_null(self, term: BoundTerm[L]) -> T:
    """Visit a bound IsNull predicate."""

visit_less_than(term, literal) abstractmethod

Visit a bound LessThan predicate.

Source code in pyiceberg/expressions/visitors.py
@abstractmethod
def visit_less_than(self, term: BoundTerm[L], literal: Literal[L]) -> T:
    """Visit a bound LessThan predicate."""

visit_less_than_or_equal(term, literal) abstractmethod

Visit a bound LessThanOrEqual predicate.

Source code in pyiceberg/expressions/visitors.py
@abstractmethod
def visit_less_than_or_equal(self, term: BoundTerm[L], literal: Literal[L]) -> T:
    """Visit a bound LessThanOrEqual predicate."""

visit_not(child_result) abstractmethod

Visit a bound Not predicate.

Source code in pyiceberg/expressions/visitors.py
@abstractmethod
def visit_not(self, child_result: T) -> T:
    """Visit a bound Not predicate."""

visit_not_equal(term, literal) abstractmethod

Visit a bound NotEqual predicate.

Source code in pyiceberg/expressions/visitors.py
@abstractmethod
def visit_not_equal(self, term: BoundTerm[L], literal: Literal[L]) -> T:
    """Visit a bound NotEqual predicate."""

visit_not_in(term, literals) abstractmethod

Visit a bound NotIn predicate.

Source code in pyiceberg/expressions/visitors.py
@abstractmethod
def visit_not_in(self, term: BoundTerm[L], literals: Set[L]) -> T:
    """Visit a bound NotIn predicate."""

visit_not_nan(term) abstractmethod

Visit a bound NotNan predicate.

Source code in pyiceberg/expressions/visitors.py
@abstractmethod
def visit_not_nan(self, term: BoundTerm[L]) -> T:
    """Visit a bound NotNan predicate."""

visit_not_null(term) abstractmethod

Visit a bound NotNull predicate.

Source code in pyiceberg/expressions/visitors.py
@abstractmethod
def visit_not_null(self, term: BoundTerm[L]) -> T:
    """Visit a bound NotNull predicate."""

visit_not_starts_with(term, literal) abstractmethod

Visit bound NotStartsWith predicate.

Source code in pyiceberg/expressions/visitors.py
@abstractmethod
def visit_not_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> T:
    """Visit bound NotStartsWith predicate."""

visit_or(left_result, right_result) abstractmethod

Visit a bound Or predicate.

Source code in pyiceberg/expressions/visitors.py
@abstractmethod
def visit_or(self, left_result: T, right_result: T) -> T:
    """Visit a bound Or predicate."""

visit_starts_with(term, literal) abstractmethod

Visit bound StartsWith predicate.

Source code in pyiceberg/expressions/visitors.py
@abstractmethod
def visit_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> T:
    """Visit bound StartsWith predicate."""

visit_true() abstractmethod

Visit a bound True predicate.

Source code in pyiceberg/expressions/visitors.py
@abstractmethod
def visit_true(self) -> T:
    """Visit a bound True predicate."""

visit_unbound_predicate(predicate)

Visit an unbound predicate.

Parameters:

Name Type Description Default
predicate UnboundPredicate[L]

An unbound predicate.

required

Raises: TypeError: This always raises since an unbound predicate is not expected in a bound boolean expression.

Source code in pyiceberg/expressions/visitors.py
def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> T:
    """Visit an unbound predicate.

    Args:
        predicate (UnboundPredicate[L]): An unbound predicate.
    Raises:
        TypeError: This always raises since an unbound predicate is not expected in a bound boolean expression.
    """
    raise TypeError(f"Not a bound predicate: {predicate}")

ResidualVisitor

Bases: BoundBooleanExpressionVisitor[BooleanExpression], ABC

Finds the residuals for an Expression the partitions in the given PartitionSpec.

A residual expression is made by partially evaluating an expression using partition values. For example, if a table is partitioned by day(utc_timestamp) and is read with a filter expression utc_timestamp > a and utc_timestamp < b, then there are 4 possible residuals expressions for the partition data, d:

  1. If d > day(a) and d < day(b), the residual is always true
  2. If d == day(a) and d != day(b), the residual is utc_timestamp > a
  3. if d == day(b) and d != day(a), the residual is utc_timestamp < b
  4. If d == day(a) == day(b), the residual is utc_timestamp > a and utc_timestamp < b Partition data is passed using StructLike. Residuals are returned by residualFor(StructLike).
Source code in pyiceberg/expressions/visitors.py
class ResidualVisitor(BoundBooleanExpressionVisitor[BooleanExpression], ABC):
    """Finds the residuals for an Expression the partitions in the given PartitionSpec.

    A residual expression is made by partially evaluating an expression using partition values.
    For example, if a table is partitioned by day(utc_timestamp) and is read with a filter expression
    utc_timestamp > a and utc_timestamp < b, then there are 4 possible residuals expressions
    for the partition data, d:


    1. If d > day(a) and d &lt; day(b), the residual is always true
    2. If d == day(a) and d != day(b), the residual is utc_timestamp > a
    3. if d == day(b) and d != day(a), the residual is utc_timestamp < b
    4. If d == day(a) == day(b), the residual is utc_timestamp > a and utc_timestamp < b
    Partition data is passed using StructLike. Residuals are returned by residualFor(StructLike).
    """

    schema: Schema
    spec: PartitionSpec
    case_sensitive: bool
    expr: BooleanExpression

    def __init__(self, schema: Schema, spec: PartitionSpec, case_sensitive: bool, expr: BooleanExpression) -> None:
        self.schema = schema
        self.spec = spec
        self.case_sensitive = case_sensitive
        self.expr = expr

    def eval(self, partition_data: Record) -> BooleanExpression:
        self.struct = partition_data
        return visit(self.expr, visitor=self)

    def visit_true(self) -> BooleanExpression:
        return AlwaysTrue()

    def visit_false(self) -> BooleanExpression:
        return AlwaysFalse()

    def visit_not(self, child_result: BooleanExpression) -> BooleanExpression:
        return Not(child_result)

    def visit_and(self, left_result: BooleanExpression, right_result: BooleanExpression) -> BooleanExpression:
        return And(left_result, right_result)

    def visit_or(self, left_result: BooleanExpression, right_result: BooleanExpression) -> BooleanExpression:
        return Or(left_result, right_result)

    def visit_is_null(self, term: BoundTerm[L]) -> BooleanExpression:
        if term.eval(self.struct) is None:
            return AlwaysTrue()
        else:
            return AlwaysFalse()

    def visit_not_null(self, term: BoundTerm[L]) -> BooleanExpression:
        if term.eval(self.struct) is not None:
            return AlwaysTrue()
        else:
            return AlwaysFalse()

    def visit_is_nan(self, term: BoundTerm[L]) -> BooleanExpression:
        val = term.eval(self.struct)
        if isinstance(val, SupportsFloat) and math.isnan(val):
            return self.visit_true()
        else:
            return self.visit_false()

    def visit_not_nan(self, term: BoundTerm[L]) -> BooleanExpression:
        val = term.eval(self.struct)
        if isinstance(val, SupportsFloat) and not math.isnan(val):
            return self.visit_true()
        else:
            return self.visit_false()

    def visit_less_than(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression:
        if term.eval(self.struct) < literal.value:
            return self.visit_true()
        else:
            return self.visit_false()

    def visit_less_than_or_equal(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression:
        if term.eval(self.struct) <= literal.value:
            return self.visit_true()
        else:
            return self.visit_false()

    def visit_greater_than(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression:
        if term.eval(self.struct) > literal.value:
            return self.visit_true()
        else:
            return self.visit_false()

    def visit_greater_than_or_equal(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression:
        if term.eval(self.struct) >= literal.value:
            return self.visit_true()
        else:
            return self.visit_false()

    def visit_equal(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression:
        if term.eval(self.struct) == literal.value:
            return self.visit_true()
        else:
            return self.visit_false()

    def visit_not_equal(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression:
        if term.eval(self.struct) != literal.value:
            return self.visit_true()
        else:
            return self.visit_false()

    def visit_in(self, term: BoundTerm[L], literals: Set[L]) -> BooleanExpression:
        if term.eval(self.struct) in literals:
            return self.visit_true()
        else:
            return self.visit_false()

    def visit_not_in(self, term: BoundTerm[L], literals: Set[L]) -> BooleanExpression:
        if term.eval(self.struct) not in literals:
            return self.visit_true()
        else:
            return self.visit_false()

    def visit_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression:
        eval_res = term.eval(self.struct)
        if eval_res is not None and str(eval_res).startswith(str(literal.value)):
            return AlwaysTrue()
        else:
            return AlwaysFalse()

    def visit_not_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression:
        if not self.visit_starts_with(term, literal):
            return AlwaysTrue()
        else:
            return AlwaysFalse()

    def visit_bound_predicate(self, predicate: BoundPredicate[Any]) -> BooleanExpression:
        """
        If there is no strict projection or if it evaluates to false, then return the predicate.

        Get the strict projection and inclusive projection of this predicate in partition data,
        then use them to determine whether to return the original predicate. The strict projection
        returns true iff the original predicate would have returned true, so the predicate can be
        eliminated if the strict projection evaluates to true. Similarly the inclusive projection
        returns false iff the original predicate would have returned false, so the predicate can
        also be eliminated if the inclusive projection evaluates to false.

        """
        parts = self.spec.fields_by_source_id(predicate.term.ref().field.field_id)
        if parts == []:
            return predicate

        def struct_to_schema(struct: StructType) -> Schema:
            return Schema(*struct.fields)

        for part in parts:
            strict_projection = part.transform.strict_project(part.name, predicate)
            strict_result = None

            if strict_projection is not None:
                bound = strict_projection.bind(
                    struct_to_schema(self.spec.partition_type(self.schema)), case_sensitive=self.case_sensitive
                )
                if isinstance(bound, BoundPredicate):
                    strict_result = super().visit_bound_predicate(bound)
                else:
                    # if the result is not a predicate, then it must be a constant like alwaysTrue or alwaysFalse
                    strict_result = bound

            if isinstance(strict_result, AlwaysTrue):
                return AlwaysTrue()

            inclusive_projection = part.transform.project(part.name, predicate)
            inclusive_result = None
            if inclusive_projection is not None:
                bound_inclusive = inclusive_projection.bind(
                    struct_to_schema(self.spec.partition_type(self.schema)), case_sensitive=self.case_sensitive
                )
                if isinstance(bound_inclusive, BoundPredicate):
                    # using predicate method specific to inclusive
                    inclusive_result = super().visit_bound_predicate(bound_inclusive)
                else:
                    # if the result is not a predicate, then it must be a constant like alwaysTrue or
                    # alwaysFalse
                    inclusive_result = bound_inclusive
            if isinstance(inclusive_result, AlwaysFalse):
                return AlwaysFalse()

        return predicate

    def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> BooleanExpression:
        bound = predicate.bind(self.schema, case_sensitive=self.case_sensitive)

        if isinstance(bound, BoundPredicate):
            bound_residual = self.visit_bound_predicate(predicate=bound)
            if not isinstance(bound_residual, (AlwaysFalse, AlwaysTrue)):
                # replace inclusive original unbound predicate
                return predicate

            # use the non-predicate residual (e.g. alwaysTrue)
            return bound_residual

        # if binding didn't result in a Predicate, return the expression
        return bound

visit_bound_predicate(predicate)

If there is no strict projection or if it evaluates to false, then return the predicate.

Get the strict projection and inclusive projection of this predicate in partition data, then use them to determine whether to return the original predicate. The strict projection returns true iff the original predicate would have returned true, so the predicate can be eliminated if the strict projection evaluates to true. Similarly the inclusive projection returns false iff the original predicate would have returned false, so the predicate can also be eliminated if the inclusive projection evaluates to false.

Source code in pyiceberg/expressions/visitors.py
def visit_bound_predicate(self, predicate: BoundPredicate[Any]) -> BooleanExpression:
    """
    If there is no strict projection or if it evaluates to false, then return the predicate.

    Get the strict projection and inclusive projection of this predicate in partition data,
    then use them to determine whether to return the original predicate. The strict projection
    returns true iff the original predicate would have returned true, so the predicate can be
    eliminated if the strict projection evaluates to true. Similarly the inclusive projection
    returns false iff the original predicate would have returned false, so the predicate can
    also be eliminated if the inclusive projection evaluates to false.

    """
    parts = self.spec.fields_by_source_id(predicate.term.ref().field.field_id)
    if parts == []:
        return predicate

    def struct_to_schema(struct: StructType) -> Schema:
        return Schema(*struct.fields)

    for part in parts:
        strict_projection = part.transform.strict_project(part.name, predicate)
        strict_result = None

        if strict_projection is not None:
            bound = strict_projection.bind(
                struct_to_schema(self.spec.partition_type(self.schema)), case_sensitive=self.case_sensitive
            )
            if isinstance(bound, BoundPredicate):
                strict_result = super().visit_bound_predicate(bound)
            else:
                # if the result is not a predicate, then it must be a constant like alwaysTrue or alwaysFalse
                strict_result = bound

        if isinstance(strict_result, AlwaysTrue):
            return AlwaysTrue()

        inclusive_projection = part.transform.project(part.name, predicate)
        inclusive_result = None
        if inclusive_projection is not None:
            bound_inclusive = inclusive_projection.bind(
                struct_to_schema(self.spec.partition_type(self.schema)), case_sensitive=self.case_sensitive
            )
            if isinstance(bound_inclusive, BoundPredicate):
                # using predicate method specific to inclusive
                inclusive_result = super().visit_bound_predicate(bound_inclusive)
            else:
                # if the result is not a predicate, then it must be a constant like alwaysTrue or
                # alwaysFalse
                inclusive_result = bound_inclusive
        if isinstance(inclusive_result, AlwaysFalse):
            return AlwaysFalse()

    return predicate

_ColumnNameTranslator

Bases: BooleanExpressionVisitor[BooleanExpression]

Converts the column names with the ones in the actual file.

Parameters:

Name Type Description Default
file_schema Schema

The schema of the file.

required
case_sensitive bool

Whether to consider case when binding a reference to a field in a schema, defaults to True.

required

Raises:

Type Description
TypeError

In the case of an UnboundPredicate.

ValueError

When a column name cannot be found.

Source code in pyiceberg/expressions/visitors.py
class _ColumnNameTranslator(BooleanExpressionVisitor[BooleanExpression]):
    """Converts the column names with the ones in the actual file.

    Args:
      file_schema (Schema): The schema of the file.
      case_sensitive (bool): Whether to consider case when binding a reference to a field in a schema, defaults to True.

    Raises:
        TypeError: In the case of an UnboundPredicate.
        ValueError: When a column name cannot be found.
    """

    file_schema: Schema
    case_sensitive: bool

    def __init__(self, file_schema: Schema, case_sensitive: bool) -> None:
        self.file_schema = file_schema
        self.case_sensitive = case_sensitive

    def visit_true(self) -> BooleanExpression:
        return AlwaysTrue()

    def visit_false(self) -> BooleanExpression:
        return AlwaysFalse()

    def visit_not(self, child_result: BooleanExpression) -> BooleanExpression:
        return Not(child=child_result)

    def visit_and(self, left_result: BooleanExpression, right_result: BooleanExpression) -> BooleanExpression:
        return And(left=left_result, right=right_result)

    def visit_or(self, left_result: BooleanExpression, right_result: BooleanExpression) -> BooleanExpression:
        return Or(left=left_result, right=right_result)

    def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> BooleanExpression:
        raise TypeError(f"Expected Bound Predicate, got: {predicate.term}")

    def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpression:
        file_column_name = self.file_schema.find_column_name(predicate.term.ref().field.field_id)

        if file_column_name is None:
            # In the case of schema evolution, the column might not be present
            # in the file schema when reading older data
            if isinstance(predicate, BoundIsNull):
                return AlwaysTrue()
            else:
                return AlwaysFalse()

        if isinstance(predicate, BoundUnaryPredicate):
            return predicate.as_unbound(file_column_name)
        elif isinstance(predicate, BoundLiteralPredicate):
            return predicate.as_unbound(file_column_name, predicate.literal)
        elif isinstance(predicate, BoundSetPredicate):
            return predicate.as_unbound(file_column_name, predicate.literals)
        else:
            raise ValueError(f"Unsupported predicate: {predicate}")

_ExpressionFieldIDs

Bases: BooleanExpressionVisitor[Set[int]]

Extracts the field IDs used in the BooleanExpression.

Source code in pyiceberg/expressions/visitors.py
class _ExpressionFieldIDs(BooleanExpressionVisitor[Set[int]]):
    """Extracts the field IDs used in the BooleanExpression."""

    def visit_true(self) -> Set[int]:
        return set()

    def visit_false(self) -> Set[int]:
        return set()

    def visit_not(self, child_result: Set[int]) -> Set[int]:
        return child_result

    def visit_and(self, left_result: Set[int], right_result: Set[int]) -> Set[int]:
        return left_result.union(right_result)

    def visit_or(self, left_result: Set[int], right_result: Set[int]) -> Set[int]:
        return left_result.union(right_result)

    def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> Set[int]:
        raise ValueError("Only works on bound records")

    def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> Set[int]:
        return {predicate.term.ref().field.field_id}

_InclusiveMetricsEvaluator

Bases: _MetricsEvaluator

Source code in pyiceberg/expressions/visitors.py
class _InclusiveMetricsEvaluator(_MetricsEvaluator):
    struct: StructType
    expr: BooleanExpression

    def __init__(
        self, schema: Schema, expr: BooleanExpression, case_sensitive: bool = True, include_empty_files: bool = False
    ) -> None:
        self.struct = schema.as_struct()
        self.include_empty_files = include_empty_files
        self.expr = bind(schema, rewrite_not(expr), case_sensitive)

    def eval(self, file: DataFile) -> bool:
        """Test whether the file may contain records that match the expression."""
        if not self.include_empty_files and file.record_count == 0:
            return ROWS_CANNOT_MATCH

        if file.record_count < 0:
            # Older version don't correctly implement record count from avro file and thus
            # set record count -1 when importing avro tables to iceberg tables. This should
            # be updated once we implemented and set correct record count.
            return ROWS_MIGHT_MATCH

        self.value_counts = file.value_counts or EMPTY_DICT
        self.null_counts = file.null_value_counts or EMPTY_DICT
        self.nan_counts = file.nan_value_counts or EMPTY_DICT
        self.lower_bounds = file.lower_bounds or EMPTY_DICT
        self.upper_bounds = file.upper_bounds or EMPTY_DICT

        return visit(self.expr, self)

    def _may_contain_null(self, field_id: int) -> bool:
        return self.null_counts is None or (field_id in self.null_counts and self.null_counts.get(field_id) is not None)

    def _contains_nans_only(self, field_id: int) -> bool:
        if (nan_count := self.nan_counts.get(field_id)) and (value_count := self.value_counts.get(field_id)):
            return nan_count == value_count
        return False

    def visit_is_null(self, term: BoundTerm[L]) -> bool:
        field_id = term.ref().field.field_id

        if self.null_counts.get(field_id) == 0:
            return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

    def visit_not_null(self, term: BoundTerm[L]) -> bool:
        # no need to check whether the field is required because binding evaluates that case
        # if the column has no non-null values, the expression cannot match
        field_id = term.ref().field.field_id

        if self._contains_nulls_only(field_id):
            return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

    def visit_is_nan(self, term: BoundTerm[L]) -> bool:
        field_id = term.ref().field.field_id

        if self.nan_counts.get(field_id) == 0:
            return ROWS_CANNOT_MATCH

        # when there's no nanCounts information, but we already know the column only contains null,
        # it's guaranteed that there's no NaN value
        if self._contains_nulls_only(field_id):
            return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

    def visit_not_nan(self, term: BoundTerm[L]) -> bool:
        field_id = term.ref().field.field_id

        if self._contains_nans_only(field_id):
            return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

    def visit_less_than(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        field = term.ref().field
        field_id = field.field_id

        if self._contains_nulls_only(field_id) or self._contains_nans_only(field_id):
            return ROWS_CANNOT_MATCH

        if not isinstance(field.field_type, PrimitiveType):
            raise ValueError(f"Expected PrimitiveType: {field.field_type}")

        if lower_bound_bytes := self.lower_bounds.get(field_id):
            lower_bound = from_bytes(field.field_type, lower_bound_bytes)

            if self._is_nan(lower_bound):
                # NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more.
                return ROWS_MIGHT_MATCH

            if lower_bound >= literal.value:  # type: ignore[operator]
                return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

    def visit_less_than_or_equal(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        field = term.ref().field
        field_id = field.field_id

        if self._contains_nulls_only(field_id) or self._contains_nans_only(field_id):
            return ROWS_CANNOT_MATCH

        if not isinstance(field.field_type, PrimitiveType):
            raise ValueError(f"Expected PrimitiveType: {field.field_type}")

        if lower_bound_bytes := self.lower_bounds.get(field_id):
            lower_bound = from_bytes(field.field_type, lower_bound_bytes)
            if self._is_nan(lower_bound):
                # NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more.
                return ROWS_MIGHT_MATCH

            if lower_bound > literal.value:  # type: ignore[operator]
                return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

    def visit_greater_than(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        field = term.ref().field
        field_id = field.field_id

        if self._contains_nulls_only(field_id) or self._contains_nans_only(field_id):
            return ROWS_CANNOT_MATCH

        if not isinstance(field.field_type, PrimitiveType):
            raise ValueError(f"Expected PrimitiveType: {field.field_type}")

        if upper_bound_bytes := self.upper_bounds.get(field_id):
            upper_bound = from_bytes(field.field_type, upper_bound_bytes)
            if upper_bound <= literal.value:  # type: ignore[operator]
                if self._is_nan(upper_bound):
                    # NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more.
                    return ROWS_MIGHT_MATCH

                return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

    def visit_greater_than_or_equal(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        field = term.ref().field
        field_id = field.field_id

        if self._contains_nulls_only(field_id) or self._contains_nans_only(field_id):
            return ROWS_CANNOT_MATCH

        if not isinstance(field.field_type, PrimitiveType):
            raise ValueError(f"Expected PrimitiveType: {field.field_type}")

        if upper_bound_bytes := self.upper_bounds.get(field_id):
            upper_bound = from_bytes(field.field_type, upper_bound_bytes)
            if upper_bound < literal.value:  # type: ignore[operator]
                if self._is_nan(upper_bound):
                    # NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more.
                    return ROWS_MIGHT_MATCH

                return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

    def visit_equal(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        field = term.ref().field
        field_id = field.field_id

        if self._contains_nulls_only(field_id) or self._contains_nans_only(field_id):
            return ROWS_CANNOT_MATCH

        if not isinstance(field.field_type, PrimitiveType):
            raise ValueError(f"Expected PrimitiveType: {field.field_type}")

        if lower_bound_bytes := self.lower_bounds.get(field_id):
            lower_bound = from_bytes(field.field_type, lower_bound_bytes)
            if self._is_nan(lower_bound):
                # NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more.
                return ROWS_MIGHT_MATCH

            if lower_bound > literal.value:  # type: ignore[operator]
                return ROWS_CANNOT_MATCH

        if upper_bound_bytes := self.upper_bounds.get(field_id):
            upper_bound = from_bytes(field.field_type, upper_bound_bytes)
            if self._is_nan(upper_bound):
                # NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more.
                return ROWS_MIGHT_MATCH

            if upper_bound < literal.value:  # type: ignore[operator]
                return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

    def visit_not_equal(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        return ROWS_MIGHT_MATCH

    def visit_in(self, term: BoundTerm[L], literals: Set[L]) -> bool:
        field = term.ref().field
        field_id = field.field_id

        if self._contains_nulls_only(field_id) or self._contains_nans_only(field_id):
            return ROWS_CANNOT_MATCH

        if len(literals) > IN_PREDICATE_LIMIT:
            # skip evaluating the predicate if the number of values is too big
            return ROWS_MIGHT_MATCH

        if not isinstance(field.field_type, PrimitiveType):
            raise ValueError(f"Expected PrimitiveType: {field.field_type}")

        if lower_bound_bytes := self.lower_bounds.get(field_id):
            lower_bound = from_bytes(field.field_type, lower_bound_bytes)
            if self._is_nan(lower_bound):
                # NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more.
                return ROWS_MIGHT_MATCH

            literals = {lit for lit in literals if lower_bound <= lit}  # type: ignore[operator]
            if len(literals) == 0:
                return ROWS_CANNOT_MATCH

        if upper_bound_bytes := self.upper_bounds.get(field_id):
            upper_bound = from_bytes(field.field_type, upper_bound_bytes)
            # this is different from Java, here NaN is always larger
            if self._is_nan(upper_bound):
                return ROWS_MIGHT_MATCH

            literals = {lit for lit in literals if upper_bound >= lit}  # type: ignore[operator]
            if len(literals) == 0:
                return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

    def visit_not_in(self, term: BoundTerm[L], literals: Set[L]) -> bool:
        # because the bounds are not necessarily a min or max value, this cannot be answered using
        # them. notIn(col, {X, ...}) with (X, Y) doesn't guarantee that X is a value in col.
        return ROWS_MIGHT_MATCH

    def visit_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        field = term.ref().field
        field_id: int = field.field_id

        if self._contains_nulls_only(field_id):
            return ROWS_CANNOT_MATCH

        if not isinstance(field.field_type, PrimitiveType):
            raise ValueError(f"Expected PrimitiveType: {field.field_type}")

        prefix = str(literal.value)
        len_prefix = len(prefix)

        if lower_bound_bytes := self.lower_bounds.get(field_id):
            lower_bound = str(from_bytes(field.field_type, lower_bound_bytes))

            # truncate lower bound so that its length is not greater than the length of prefix
            if lower_bound and lower_bound[:len_prefix] > prefix:
                return ROWS_CANNOT_MATCH

        if upper_bound_bytes := self.upper_bounds.get(field_id):
            upper_bound = str(from_bytes(field.field_type, upper_bound_bytes))

            # truncate upper bound so that its length is not greater than the length of prefix
            if upper_bound is not None and upper_bound[:len_prefix] < prefix:
                return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

    def visit_not_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        field = term.ref().field
        field_id: int = field.field_id

        if self._may_contain_null(field_id):
            return ROWS_MIGHT_MATCH

        if not isinstance(field.field_type, PrimitiveType):
            raise ValueError(f"Expected PrimitiveType: {field.field_type}")

        prefix = str(literal.value)
        len_prefix = len(prefix)

        # not_starts_with will match unless all values must start with the prefix. This happens when
        # the lower and upper bounds both start with the prefix.
        if (lower_bound_bytes := self.lower_bounds.get(field_id)) and (upper_bound_bytes := self.upper_bounds.get(field_id)):
            lower_bound = str(from_bytes(field.field_type, lower_bound_bytes))
            upper_bound = str(from_bytes(field.field_type, upper_bound_bytes))

            # if lower is shorter than the prefix then lower doesn't start with the prefix
            if len(lower_bound) < len_prefix:
                return ROWS_MIGHT_MATCH

            if lower_bound[:len_prefix] == prefix:
                # if upper is shorter than the prefix then upper can't start with the prefix
                if len(upper_bound) < len_prefix:
                    return ROWS_MIGHT_MATCH

                if upper_bound[:len_prefix] == prefix:
                    return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

eval(file)

Test whether the file may contain records that match the expression.

Source code in pyiceberg/expressions/visitors.py
def eval(self, file: DataFile) -> bool:
    """Test whether the file may contain records that match the expression."""
    if not self.include_empty_files and file.record_count == 0:
        return ROWS_CANNOT_MATCH

    if file.record_count < 0:
        # Older version don't correctly implement record count from avro file and thus
        # set record count -1 when importing avro tables to iceberg tables. This should
        # be updated once we implemented and set correct record count.
        return ROWS_MIGHT_MATCH

    self.value_counts = file.value_counts or EMPTY_DICT
    self.null_counts = file.null_value_counts or EMPTY_DICT
    self.nan_counts = file.nan_value_counts or EMPTY_DICT
    self.lower_bounds = file.lower_bounds or EMPTY_DICT
    self.upper_bounds = file.upper_bounds or EMPTY_DICT

    return visit(self.expr, self)

_RewriteNotVisitor

Bases: BooleanExpressionVisitor[BooleanExpression]

Inverts the negations.

Source code in pyiceberg/expressions/visitors.py
class _RewriteNotVisitor(BooleanExpressionVisitor[BooleanExpression]):
    """Inverts the negations."""

    def visit_true(self) -> BooleanExpression:
        return AlwaysTrue()

    def visit_false(self) -> BooleanExpression:
        return AlwaysFalse()

    def visit_not(self, child_result: BooleanExpression) -> BooleanExpression:
        return ~child_result

    def visit_and(self, left_result: BooleanExpression, right_result: BooleanExpression) -> BooleanExpression:
        return And(left=left_result, right=right_result)

    def visit_or(self, left_result: BooleanExpression, right_result: BooleanExpression) -> BooleanExpression:
        return Or(left=left_result, right=right_result)

    def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> BooleanExpression:
        return predicate

    def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpression:
        return predicate

_StrictMetricsEvaluator

Bases: _MetricsEvaluator

Source code in pyiceberg/expressions/visitors.py
class _StrictMetricsEvaluator(_MetricsEvaluator):
    struct: StructType
    expr: BooleanExpression

    def __init__(
        self, schema: Schema, expr: BooleanExpression, case_sensitive: bool = True, include_empty_files: bool = False
    ) -> None:
        self.struct = schema.as_struct()
        self.include_empty_files = include_empty_files
        self.expr = bind(schema, rewrite_not(expr), case_sensitive)

    def eval(self, file: DataFile) -> bool:
        """Test whether all records within the file match the expression.

        Args:
            file: A data file

        Returns: false if the file may contain any row that doesn't match
                    the expression, true otherwise.
        """
        if file.record_count <= 0:
            # Older version don't correctly implement record count from avro file and thus
            # set record count -1 when importing avro tables to iceberg tables. This should
            # be updated once we implemented and set correct record count.
            return ROWS_MUST_MATCH

        self.value_counts = file.value_counts or EMPTY_DICT
        self.null_counts = file.null_value_counts or EMPTY_DICT
        self.nan_counts = file.nan_value_counts or EMPTY_DICT
        self.lower_bounds = file.lower_bounds or EMPTY_DICT
        self.upper_bounds = file.upper_bounds or EMPTY_DICT

        return visit(self.expr, self)

    def visit_is_null(self, term: BoundTerm[L]) -> bool:
        # no need to check whether the field is required because binding evaluates that case
        # if the column has any non-null values, the expression does not match
        field_id = term.ref().field.field_id

        if self._contains_nulls_only(field_id):
            return ROWS_MUST_MATCH
        else:
            return ROWS_MIGHT_NOT_MATCH

    def visit_not_null(self, term: BoundTerm[L]) -> bool:
        # no need to check whether the field is required because binding evaluates that case
        # if the column has any non-null values, the expression does not match
        field_id = term.ref().field.field_id

        if (null_count := self.null_counts.get(field_id)) is not None and null_count == 0:
            return ROWS_MUST_MATCH
        else:
            return ROWS_MIGHT_NOT_MATCH

    def visit_is_nan(self, term: BoundTerm[L]) -> bool:
        field_id = term.ref().field.field_id

        if self._contains_nans_only(field_id):
            return ROWS_MUST_MATCH
        else:
            return ROWS_MIGHT_NOT_MATCH

    def visit_not_nan(self, term: BoundTerm[L]) -> bool:
        field_id = term.ref().field.field_id

        if (nan_count := self.nan_counts.get(field_id)) is not None and nan_count == 0:
            return ROWS_MUST_MATCH

        if self._contains_nulls_only(field_id):
            return ROWS_MUST_MATCH

        return ROWS_MIGHT_NOT_MATCH

    def visit_less_than(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        # Rows must match when: <----------Min----Max---X------->

        field_id = term.ref().field.field_id

        if self._can_contain_nulls(field_id) or self._can_contain_nans(field_id):
            return ROWS_MIGHT_NOT_MATCH

        if upper_bytes := self.upper_bounds.get(field_id):
            field = self._get_field(field_id)
            upper = _from_byte_buffer(field.field_type, upper_bytes)

            if upper < literal.value:
                return ROWS_MUST_MATCH

        return ROWS_MIGHT_NOT_MATCH

    def visit_less_than_or_equal(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        # Rows must match when: <----------Min----Max---X------->

        field_id = term.ref().field.field_id

        if self._can_contain_nulls(field_id) or self._can_contain_nans(field_id):
            return ROWS_MIGHT_NOT_MATCH

        if upper_bytes := self.upper_bounds.get(field_id):
            field = self._get_field(field_id)
            upper = _from_byte_buffer(field.field_type, upper_bytes)

            if upper <= literal.value:
                return ROWS_MUST_MATCH

        return ROWS_MIGHT_NOT_MATCH

    def visit_greater_than(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        # Rows must match when: <-------X---Min----Max---------->

        field_id = term.ref().field.field_id

        if self._can_contain_nulls(field_id) or self._can_contain_nans(field_id):
            return ROWS_MIGHT_NOT_MATCH

        if lower_bytes := self.lower_bounds.get(field_id):
            field = self._get_field(field_id)
            lower = _from_byte_buffer(field.field_type, lower_bytes)

            if self._is_nan(lower):
                # NaN indicates unreliable bounds.
                # See the _StrictMetricsEvaluator docs for more.
                return ROWS_MIGHT_NOT_MATCH

            if lower > literal.value:
                return ROWS_MUST_MATCH

        return ROWS_MIGHT_NOT_MATCH

    def visit_greater_than_or_equal(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        # Rows must match when: <-------X---Min----Max---------->
        field_id = term.ref().field.field_id

        if self._can_contain_nulls(field_id) or self._can_contain_nans(field_id):
            return ROWS_MIGHT_NOT_MATCH

        if lower_bytes := self.lower_bounds.get(field_id):
            field = self._get_field(field_id)
            lower = _from_byte_buffer(field.field_type, lower_bytes)

            if self._is_nan(lower):
                # NaN indicates unreliable bounds.
                # See the _StrictMetricsEvaluator docs for more.
                return ROWS_MIGHT_NOT_MATCH

            if lower >= literal.value:
                return ROWS_MUST_MATCH

        return ROWS_MIGHT_NOT_MATCH

    def visit_equal(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        # Rows must match when Min == X == Max
        field_id = term.ref().field.field_id

        if self._can_contain_nulls(field_id) or self._can_contain_nans(field_id):
            return ROWS_MIGHT_NOT_MATCH

        if (lower_bytes := self.lower_bounds.get(field_id)) and (upper_bytes := self.upper_bounds.get(field_id)):
            field = self._get_field(field_id)
            lower = _from_byte_buffer(field.field_type, lower_bytes)
            upper = _from_byte_buffer(field.field_type, upper_bytes)

            if lower != literal.value or upper != literal.value:
                return ROWS_MIGHT_NOT_MATCH
            else:
                return ROWS_MUST_MATCH

        return ROWS_MIGHT_NOT_MATCH

    def visit_not_equal(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        # Rows must match when X < Min or Max < X because it is not in the range
        field_id = term.ref().field.field_id

        if self._can_contain_nulls(field_id) or self._can_contain_nans(field_id):
            return ROWS_MUST_MATCH

        field = self._get_field(field_id)

        if lower_bytes := self.lower_bounds.get(field_id):
            lower = _from_byte_buffer(field.field_type, lower_bytes)

            if self._is_nan(lower):
                # NaN indicates unreliable bounds.
                # See the _StrictMetricsEvaluator docs for more.
                return ROWS_MIGHT_NOT_MATCH

            if lower > literal.value:
                return ROWS_MUST_MATCH

        if upper_bytes := self.upper_bounds.get(field_id):
            upper = _from_byte_buffer(field.field_type, upper_bytes)

            if upper < literal.value:
                return ROWS_MUST_MATCH

        return ROWS_MIGHT_NOT_MATCH

    def visit_in(self, term: BoundTerm[L], literals: Set[L]) -> bool:
        field_id = term.ref().field.field_id

        if self._can_contain_nulls(field_id) or self._can_contain_nans(field_id):
            return ROWS_MIGHT_NOT_MATCH

        field = self._get_field(field_id)

        if (lower_bytes := self.lower_bounds.get(field_id)) and (upper_bytes := self.upper_bounds.get(field_id)):
            # similar to the implementation in eq, first check if the lower bound is in the set
            lower = _from_byte_buffer(field.field_type, lower_bytes)
            if lower not in literals:
                return ROWS_MIGHT_NOT_MATCH

            # check if the upper bound is in the set
            upper = _from_byte_buffer(field.field_type, upper_bytes)
            if upper not in literals:
                return ROWS_MIGHT_NOT_MATCH

            # finally check if the lower bound and the upper bound are equal
            if lower != upper:
                return ROWS_MIGHT_NOT_MATCH

            # All values must be in the set if the lower bound and the upper bound are
            # in the set and are equal.
            return ROWS_MUST_MATCH

        return ROWS_MIGHT_NOT_MATCH

    def visit_not_in(self, term: BoundTerm[L], literals: Set[L]) -> bool:
        field_id = term.ref().field.field_id

        if self._can_contain_nulls(field_id) or self._can_contain_nans(field_id):
            return ROWS_MUST_MATCH

        field = self._get_field(field_id)

        if lower_bytes := self.lower_bounds.get(field_id):
            lower = _from_byte_buffer(field.field_type, lower_bytes)

            if self._is_nan(lower):
                # NaN indicates unreliable bounds.
                # See the StrictMetricsEvaluator docs for more.
                return ROWS_MIGHT_NOT_MATCH

            literals = {val for val in literals if lower <= val}
            if len(literals) == 0:
                return ROWS_MUST_MATCH

        if upper_bytes := self.upper_bounds.get(field_id):
            upper = _from_byte_buffer(field.field_type, upper_bytes)

            literals = {val for val in literals if upper >= val}

            if len(literals) == 0:
                return ROWS_MUST_MATCH

        return ROWS_MIGHT_NOT_MATCH

    def visit_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        return ROWS_MIGHT_NOT_MATCH

    def visit_not_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        return ROWS_MIGHT_NOT_MATCH

    def _get_field(self, field_id: int) -> NestedField:
        field = self.struct.field(field_id=field_id)
        if field is None:
            raise ValueError(f"Cannot find field, might be nested or missing: {field_id}")

        return field

    def _can_contain_nulls(self, field_id: int) -> bool:
        return (null_count := self.null_counts.get(field_id)) is not None and null_count > 0

    def _can_contain_nans(self, field_id: int) -> bool:
        return (nan_count := self.nan_counts.get(field_id)) is not None and nan_count > 0

eval(file)

Test whether all records within the file match the expression.

Parameters:

Name Type Description Default
file DataFile

A data file

required

false if the file may contain any row that doesn't match

Type Description
bool

the expression, true otherwise.

Source code in pyiceberg/expressions/visitors.py
def eval(self, file: DataFile) -> bool:
    """Test whether all records within the file match the expression.

    Args:
        file: A data file

    Returns: false if the file may contain any row that doesn't match
                the expression, true otherwise.
    """
    if file.record_count <= 0:
        # Older version don't correctly implement record count from avro file and thus
        # set record count -1 when importing avro tables to iceberg tables. This should
        # be updated once we implemented and set correct record count.
        return ROWS_MUST_MATCH

    self.value_counts = file.value_counts or EMPTY_DICT
    self.null_counts = file.null_value_counts or EMPTY_DICT
    self.nan_counts = file.nan_value_counts or EMPTY_DICT
    self.lower_bounds = file.lower_bounds or EMPTY_DICT
    self.upper_bounds = file.upper_bounds or EMPTY_DICT

    return visit(self.expr, self)

bind(schema, expression, case_sensitive)

Travers over an expression to bind the predicates to the schema.

Parameters:

Name Type Description Default
schema Schema

A schema to use when binding the expression.

required
expression BooleanExpression

An expression containing UnboundPredicates that can be bound.

required
case_sensitive bool

Whether to consider case when binding a reference to a field in a schema, defaults to True.

required

Raises:

Type Description
TypeError

In the case a predicate is already bound.

Source code in pyiceberg/expressions/visitors.py
def bind(schema: Schema, expression: BooleanExpression, case_sensitive: bool) -> BooleanExpression:
    """Travers over an expression to bind the predicates to the schema.

    Args:
      schema (Schema): A schema to use when binding the expression.
      expression (BooleanExpression): An expression containing UnboundPredicates that can be bound.
      case_sensitive (bool): Whether to consider case when binding a reference to a field in a schema, defaults to True.

    Raises:
        TypeError: In the case a predicate is already bound.
    """
    return visit(expression, BindVisitor(schema, case_sensitive))

expression_to_plain_format(expressions, cast_int_to_datetime=False)

Format a Disjunctive Normal Form expression.

These are the formats that the expression can be fed into:

  • https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html
  • https://docs.dask.org/en/stable/generated/dask.dataframe.read_parquet.html

Contrary to normal DNF that may contain Not expressions, but here they should have been rewritten. This can be done using rewrite_not(...).

Keep in mind that this is only used for page skipping, and still needs to filter on a row level.

Parameters:

Name Type Description Default
expressions Tuple[BooleanExpression, ...]

Expression in Disjunctive Normal Form.

required

Returns:

Type Description
List[List[Tuple[str, str, Any]]]

Formatter filter compatible with Dask and PyArrow.

Source code in pyiceberg/expressions/visitors.py
def expression_to_plain_format(
    expressions: Tuple[BooleanExpression, ...], cast_int_to_datetime: bool = False
) -> List[List[Tuple[str, str, Any]]]:
    """Format a Disjunctive Normal Form expression.

    These are the formats that the expression can be fed into:

    - https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html
    - https://docs.dask.org/en/stable/generated/dask.dataframe.read_parquet.html

    Contrary to normal DNF that may contain Not expressions, but here they should have
    been rewritten. This can be done using ``rewrite_not(...)``.

    Keep in mind that this is only used for page skipping, and still needs to filter
    on a row level.

    Args:
        expressions: Expression in Disjunctive Normal Form.

    Returns:
        Formatter filter compatible with Dask and PyArrow.
    """
    # In the form of expr1 ∨ expr2 ∨ ... ∨ exprN
    visitor = ExpressionToPlainFormat(cast_int_to_datetime)
    return [visit(expression, visitor) for expression in expressions]

visit(obj, visitor)

Apply a boolean expression visitor to any point within an expression.

The function traverses the expression in post-order fashion.

Parameters:

Name Type Description Default
obj BooleanExpression

An instance of a BooleanExpression.

required
visitor BooleanExpressionVisitor[T]

An instance of an implementation of the generic BooleanExpressionVisitor base class.

required

Raises:

Type Description
NotImplementedError

If attempting to visit an unsupported expression.

Source code in pyiceberg/expressions/visitors.py
@singledispatch
def visit(obj: BooleanExpression, visitor: BooleanExpressionVisitor[T]) -> T:
    """Apply a boolean expression visitor to any point within an expression.

    The function traverses the expression in post-order fashion.

    Args:
        obj (BooleanExpression): An instance of a BooleanExpression.
        visitor (BooleanExpressionVisitor[T]): An instance of an implementation of the generic BooleanExpressionVisitor base class.

    Raises:
        NotImplementedError: If attempting to visit an unsupported expression.
    """
    raise NotImplementedError(f"Cannot visit unsupported expression: {obj}")