Skip to content

resolver

ConstructWriter

Bases: SchemaVisitorPerPrimitiveType[Writer]

Construct a writer tree from an Iceberg schema.

Source code in pyiceberg/avro/resolver.py
class ConstructWriter(SchemaVisitorPerPrimitiveType[Writer]):
    """Construct a writer tree from an Iceberg schema."""

    def schema(self, schema: Schema, struct_result: Writer) -> Writer:
        return struct_result

    def struct(self, struct: StructType, field_results: List[Writer]) -> Writer:
        return StructWriter(tuple((pos, result) for pos, result in enumerate(field_results)))

    def field(self, field: NestedField, field_result: Writer) -> Writer:
        return field_result if field.required else OptionWriter(field_result)

    def list(self, list_type: ListType, element_result: Writer) -> Writer:
        return ListWriter(element_result)

    def map(self, map_type: MapType, key_result: Writer, value_result: Writer) -> Writer:
        return MapWriter(key_result, value_result)

    def visit_fixed(self, fixed_type: FixedType) -> Writer:
        return FixedWriter(len(fixed_type))

    def visit_decimal(self, decimal_type: DecimalType) -> Writer:
        return DecimalWriter(decimal_type.precision, decimal_type.scale)

    def visit_boolean(self, boolean_type: BooleanType) -> Writer:
        return BooleanWriter()

    def visit_integer(self, integer_type: IntegerType) -> Writer:
        return IntegerWriter()

    def visit_long(self, long_type: LongType) -> Writer:
        return IntegerWriter()

    def visit_float(self, float_type: FloatType) -> Writer:
        return FloatWriter()

    def visit_double(self, double_type: DoubleType) -> Writer:
        return DoubleWriter()

    def visit_date(self, date_type: DateType) -> Writer:
        return DateWriter()

    def visit_time(self, time_type: TimeType) -> Writer:
        return TimeWriter()

    def visit_timestamp(self, timestamp_type: TimestampType) -> Writer:
        return TimestampWriter()

    def visit_timestamptz(self, timestamptz_type: TimestamptzType) -> Writer:
        return TimestamptzWriter()

    def visit_string(self, string_type: StringType) -> Writer:
        return StringWriter()

    def visit_uuid(self, uuid_type: UUIDType) -> Writer:
        return UUIDWriter()

    def visit_binary(self, binary_type: BinaryType) -> Writer:
        return BinaryWriter()

EnumReader

Bases: Reader

An Enum reader to wrap primitive values into an Enum.

Source code in pyiceberg/avro/resolver.py
class EnumReader(Reader):
    """An Enum reader to wrap primitive values into an Enum."""

    __slots__ = ("enum", "reader")

    enum: Callable[..., Enum]
    reader: Reader

    def __init__(self, enum: Callable[..., Enum], reader: Reader) -> None:
        self.enum = enum
        self.reader = reader

    def read(self, decoder: BinaryDecoder) -> Enum:
        return self.enum(self.reader.read(decoder))

    def skip(self, decoder: BinaryDecoder) -> None:
        pass

construct_reader(file_schema, read_types=EMPTY_DICT)

Construct a reader from a file schema.

Parameters:

Name Type Description Default
file_schema Schema | IcebergType

The schema of the Avro file.

required
read_types Dict[int, Callable[..., StructProtocol]]

Constructors for structs for certain field-ids

EMPTY_DICT

Raises:

Type Description
NotImplementedError

If attempting to resolve an unrecognized object type.

Source code in pyiceberg/avro/resolver.py
def construct_reader(
    file_schema: Union[Schema, IcebergType], read_types: Dict[int, Callable[..., StructProtocol]] = EMPTY_DICT
) -> Reader:
    """Construct a reader from a file schema.

    Args:
        file_schema (Schema | IcebergType): The schema of the Avro file.
        read_types (Dict[int, Callable[..., StructProtocol]]): Constructors for structs for certain field-ids

    Raises:
        NotImplementedError: If attempting to resolve an unrecognized object type.
    """
    return resolve_reader(file_schema, file_schema, read_types)

construct_writer(file_schema)

Construct a writer from a file schema.

Parameters:

Name Type Description Default
file_schema Schema | IcebergType

The schema of the Avro file.

required

Raises:

Type Description
NotImplementedError

If attempting to resolve an unrecognized object type.

Source code in pyiceberg/avro/resolver.py
def construct_writer(file_schema: Union[Schema, IcebergType]) -> Writer:
    """Construct a writer from a file schema.

    Args:
        file_schema (Schema | IcebergType): The schema of the Avro file.

    Raises:
        NotImplementedError: If attempting to resolve an unrecognized object type.
    """
    return visit(file_schema, CONSTRUCT_WRITER_VISITOR)

resolve_reader(file_schema, read_schema, read_types=EMPTY_DICT, read_enums=EMPTY_DICT)

Resolve the file and read schema to produce a reader.

Parameters:

Name Type Description Default
file_schema Schema | IcebergType

The schema of the Avro file.

required
read_schema Schema | IcebergType

The requested read schema which is equal, subset or superset of the file schema.

required
read_types Dict[int, Callable[..., StructProtocol]]

A dict of types to use for struct data.

EMPTY_DICT
read_enums Dict[int, Callable[..., Enum]]

A dict of fields that have to be converted to an enum.

EMPTY_DICT

Raises:

Type Description
NotImplementedError

If attempting to resolve an unrecognized object type.

Source code in pyiceberg/avro/resolver.py
def resolve_reader(
    file_schema: Union[Schema, IcebergType],
    read_schema: Union[Schema, IcebergType],
    read_types: Dict[int, Callable[..., StructProtocol]] = EMPTY_DICT,
    read_enums: Dict[int, Callable[..., Enum]] = EMPTY_DICT,
) -> Reader:
    """Resolve the file and read schema to produce a reader.

    Args:
        file_schema (Schema | IcebergType): The schema of the Avro file.
        read_schema (Schema | IcebergType): The requested read schema which is equal, subset or superset of the file schema.
        read_types (Dict[int, Callable[..., StructProtocol]]): A dict of types to use for struct data.
        read_enums (Dict[int, Callable[..., Enum]]): A dict of fields that have to be converted to an enum.

    Raises:
        NotImplementedError: If attempting to resolve an unrecognized object type.
    """
    return visit_with_partner(file_schema, read_schema, ReadSchemaResolver(read_types, read_enums), SchemaPartnerAccessor())  # type: ignore

resolve_writer(record_schema, file_schema)

Resolve the file and read schema to produce a reader.

Parameters:

Name Type Description Default
record_schema Schema | IcebergType

The schema of the record in memory.

required
file_schema Schema | IcebergType

The schema of the file that will be written

required

Raises:

Type Description
NotImplementedError

If attempting to resolve an unrecognized object type.

Source code in pyiceberg/avro/resolver.py
def resolve_writer(
    record_schema: Union[Schema, IcebergType],
    file_schema: Union[Schema, IcebergType],
) -> Writer:
    """Resolve the file and read schema to produce a reader.

    Args:
        record_schema (Schema | IcebergType): The schema of the record in memory.
        file_schema (Schema | IcebergType): The schema of the file that will be written

    Raises:
        NotImplementedError: If attempting to resolve an unrecognized object type.
    """
    if record_schema == file_schema:
        return construct_writer(file_schema)
    return visit_with_partner(file_schema, record_schema, WriteSchemaResolver(), SchemaPartnerAccessor())  # type: ignore