Skip to content

reader

Classes for building the Reader tree.

Constructing a reader tree from the schema makes it easy to decouple the reader implementation from the schema.

The reader tree can be changed in such a way that the read schema is different, while respecting the read schema.

BinaryReader

Bases: Reader

Read a binary value.

First reads an integer, to get the length of the binary value, then reads the binary field itself.

Source code in pyiceberg/avro/reader.py
class BinaryReader(Reader):
    """Read a binary value.

    First reads an integer, to get the length of the binary value,
    then reads the binary field itself.
    """

    def read(self, decoder: BinaryDecoder) -> bytes:
        return decoder.read_bytes()

    def skip(self, decoder: BinaryDecoder) -> None:
        decoder.skip_bytes()

DateReader

Bases: IntegerReader

Reads a day granularity date from the stream.

The number of days from 1 January 1970.

Source code in pyiceberg/avro/reader.py
class DateReader(IntegerReader):
    """Reads a day granularity date from the stream.

    The number of days from 1 January 1970.
    """

DecimalReader dataclass

Bases: Reader

Reads a value as a decimal.

Decimal bytes are decoded as signed short, int or long depending on the size of bytes.

Source code in pyiceberg/avro/reader.py
@dataclass(frozen=True, init=False)
class DecimalReader(Reader):
    """Reads a value as a decimal.

    Decimal bytes are decoded as signed short, int or long depending on the
    size of bytes.
    """

    precision: int = dataclassfield()
    scale: int = dataclassfield()
    _length: int

    def __init__(self, precision: int, scale: int):
        object.__setattr__(self, "precision", precision)
        object.__setattr__(self, "scale", scale)
        object.__setattr__(self, "_length", decimal_required_bytes(precision))

    def read(self, decoder: BinaryDecoder) -> Decimal:
        return bytes_to_decimal(decoder.read(self._length), self.scale)

    def skip(self, decoder: BinaryDecoder) -> None:
        decoder.skip_bytes()

    def __repr__(self) -> str:
        """Return the string representation of the DecimalReader class."""
        return f"DecimalReader({self.precision}, {self.scale})"

__repr__()

Return the string representation of the DecimalReader class.

Source code in pyiceberg/avro/reader.py
def __repr__(self) -> str:
    """Return the string representation of the DecimalReader class."""
    return f"DecimalReader({self.precision}, {self.scale})"

FixedReader dataclass

Bases: Reader

Source code in pyiceberg/avro/reader.py
@dataclass(frozen=True)
class FixedReader(Reader):
    _len: int = dataclassfield()

    def read(self, decoder: BinaryDecoder) -> bytes:
        return decoder.read(len(self))

    def skip(self, decoder: BinaryDecoder) -> None:
        decoder.skip(len(self))

    def __len__(self) -> int:
        """Return the length of an instance of the FixedReader class."""
        return self._len

    def __repr__(self) -> str:
        """Return the string representation of the FixedReader class."""
        return f"FixedReader({self._len})"

__len__()

Return the length of an instance of the FixedReader class.

Source code in pyiceberg/avro/reader.py
def __len__(self) -> int:
    """Return the length of an instance of the FixedReader class."""
    return self._len

__repr__()

Return the string representation of the FixedReader class.

Source code in pyiceberg/avro/reader.py
def __repr__(self) -> str:
    """Return the string representation of the FixedReader class."""
    return f"FixedReader({self._len})"

IntegerReader

Bases: Reader

Longs and ints are encoded the same way, and there is no long in Python.

Source code in pyiceberg/avro/reader.py
class IntegerReader(Reader):
    """Longs and ints are encoded the same way, and there is no long in Python."""

    def read(self, decoder: BinaryDecoder) -> int:
        return decoder.read_int()

    def skip(self, decoder: BinaryDecoder) -> None:
        decoder.skip_int()

ListReader dataclass

Bases: Reader

Source code in pyiceberg/avro/reader.py
@dataclass(frozen=False, init=False)
class ListReader(Reader):
    __slots__ = ("element", "_is_int_list", "_hash")
    element: Reader

    def __init__(self, element: Reader) -> None:
        super().__init__()
        self.element = element
        self._hash = hash(self.element)
        self._is_int_list = isinstance(self.element, IntegerReader)

    def read(self, decoder: BinaryDecoder) -> List[Any]:
        read_items: List[Any] = []
        block_count = decoder.read_int()
        while block_count != 0:
            if block_count < 0:
                block_count = -block_count
                _ = decoder.read_int()
            if self._is_int_list:
                read_items.extend(decoder.read_ints(block_count))
            else:
                for _ in range(block_count):
                    read_items.append(self.element.read(decoder))
            block_count = decoder.read_int()
        return read_items

    def skip(self, decoder: BinaryDecoder) -> None:
        _skip_map_array(decoder, lambda: self.element.skip(decoder))

    def __hash__(self) -> int:
        """Return a hashed representation of the ListReader class."""
        return self._hash

__hash__()

Return a hashed representation of the ListReader class.

Source code in pyiceberg/avro/reader.py
def __hash__(self) -> int:
    """Return a hashed representation of the ListReader class."""
    return self._hash

MapReader dataclass

Bases: Reader

Source code in pyiceberg/avro/reader.py
@dataclass(frozen=False, init=False)
class MapReader(Reader):
    __slots__ = ("key", "value", "_is_int_int", "_is_int_bytes", "_key_reader", "_value_reader", "_hash")
    key: Reader
    value: Reader

    def __init__(self, key: Reader, value: Reader) -> None:
        super().__init__()
        self.key = key
        self.value = value
        if isinstance(self.key, IntegerReader):
            self._is_int_int = isinstance(self.value, IntegerReader)
            self._is_int_bytes = isinstance(self.value, BinaryReader)
        else:
            self._is_int_int = False
            self._is_int_bytes = False
            self._key_reader = self.key.read
            self._value_reader = self.value.read
        self._hash = hash((self.key, self.value))

    def _read_int_int(self, decoder: BinaryDecoder) -> Mapping[int, int]:
        """Read a mapping from int to int from the decoder.

        Read a map of ints to ints from the decoder, since this is such a common
        data type, it is optimized to be faster than the generic map reader, by
        using a lazy dict.

        The time it takes to create the python dictionary is much larger than
        the time it takes to read the data from the decoder as an array, so the
        lazy dict defers creating the python dictionary until it is actually
        accessed.

        """
        block_count = decoder.read_int()

        # Often times the map is empty, so we can just return an empty dict without
        # instancing the LazyDict
        if block_count == 0:
            return EMPTY_DICT

        contents_array: List[Tuple[int, ...]] = []

        while block_count != 0:
            if block_count < 0:
                block_count = -block_count
                # We ignore the block size for now
                decoder.skip_int()

            # Since the integers are encoding right next to each other
            # just read them all at once.
            contents_array.append(decoder.read_ints(block_count * 2))
            block_count = decoder.read_int()

        return LazyDict(contents_array)

    def read(self, decoder: BinaryDecoder) -> Mapping[Any, Any]:
        read_items: dict[Any, Any] = {}

        if self._is_int_int or self._is_int_bytes:
            if self._is_int_int:
                return self._read_int_int(decoder)

            block_count = decoder.read_int()
            while block_count != 0:
                if block_count < 0:
                    block_count = -block_count
                    # We ignore the block size for now
                    _ = decoder.read_int()
                decoder.read_int_bytes_dict(block_count, read_items)
                block_count = decoder.read_int()
        else:
            block_count = decoder.read_int()
            while block_count != 0:
                if block_count < 0:
                    block_count = -block_count
                    # We ignore the block size for now
                    _ = decoder.read_int()
                for _ in range(block_count):
                    key = self._key_reader(decoder)
                    read_items[key] = self._value_reader(decoder)
                block_count = decoder.read_int()

        return read_items

    def skip(self, decoder: BinaryDecoder) -> None:
        def skip() -> None:
            self.key.skip(decoder)
            self.value.skip(decoder)

        _skip_map_array(decoder, skip)

    def __hash__(self) -> int:
        """Return a hashed representation of the MapReader class."""
        return self._hash

__hash__()

Return a hashed representation of the MapReader class.

Source code in pyiceberg/avro/reader.py
def __hash__(self) -> int:
    """Return a hashed representation of the MapReader class."""
    return self._hash

Reader

Bases: Singleton

Source code in pyiceberg/avro/reader.py
class Reader(Singleton):
    @abstractmethod
    def read(self, decoder: BinaryDecoder) -> Any: ...

    @abstractmethod
    def skip(self, decoder: BinaryDecoder) -> None: ...

    def __repr__(self) -> str:
        """Return the string representation of the Reader class."""
        return f"{self.__class__.__name__}()"

__repr__()

Return the string representation of the Reader class.

Source code in pyiceberg/avro/reader.py
def __repr__(self) -> str:
    """Return the string representation of the Reader class."""
    return f"{self.__class__.__name__}()"

StructReader

Bases: Reader

Source code in pyiceberg/avro/reader.py
class StructReader(Reader):
    __slots__ = ("field_readers", "create_struct", "struct", "_create_with_keyword", "_field_reader_functions", "_hash")
    field_readers: Tuple[Tuple[Optional[int], Reader], ...]
    create_struct: Callable[..., StructProtocol]
    struct: StructType
    field_reader_functions = Tuple[Tuple[Optional[str], int, Optional[Callable[[BinaryDecoder], Any]]], ...]

    def __init__(
        self,
        field_readers: Tuple[Tuple[Optional[int], Reader], ...],
        create_struct: Callable[..., StructProtocol],
        struct: StructType,
    ) -> None:
        self.field_readers = field_readers
        self.create_struct = create_struct
        self.struct = struct

        try:
            # Try initializing the struct, first with the struct keyword argument
            created_struct = self.create_struct(struct=self.struct)
            self._create_with_keyword = True
        except TypeError as e:
            if "'struct' is an invalid keyword argument for" in str(e):
                created_struct = self.create_struct()
                self._create_with_keyword = False
            else:
                raise ValueError(f"Unable to initialize struct: {self.create_struct}") from e

        if not isinstance(created_struct, StructProtocol):
            raise ValueError(f"Incompatible with StructProtocol: {self.create_struct}")

        reading_callbacks: List[Tuple[Optional[int], Callable[[BinaryDecoder], Any]]] = []
        for pos, field in field_readers:
            if pos is not None:
                reading_callbacks.append((pos, field.read))
            else:
                reading_callbacks.append((None, field.skip))

        self._field_reader_functions = tuple(reading_callbacks)
        self._hash = hash(self._field_reader_functions)

    def read(self, decoder: BinaryDecoder) -> StructProtocol:
        struct = self.create_struct(struct=self.struct) if self._create_with_keyword else self.create_struct()
        for pos, field_reader in self._field_reader_functions:
            if pos is not None:
                struct[pos] = field_reader(decoder)  # later: pass reuse in here
            else:
                field_reader(decoder)

        return struct

    def skip(self, decoder: BinaryDecoder) -> None:
        for _, field in self.field_readers:
            field.skip(decoder)

    def __eq__(self, other: Any) -> bool:
        """Return the equality of two instances of the StructReader class."""
        return (
            self.field_readers == other.field_readers and self.create_struct == other.create_struct
            if isinstance(other, StructReader)
            else False
        )

    def __repr__(self) -> str:
        """Return the string representation of the StructReader class."""
        return f"StructReader(({','.join(repr(field) for field in self.field_readers)}), {repr(self.create_struct)})"

    def __hash__(self) -> int:
        """Return a hashed representation of the StructReader class."""
        return self._hash

__eq__(other)

Return the equality of two instances of the StructReader class.

Source code in pyiceberg/avro/reader.py
def __eq__(self, other: Any) -> bool:
    """Return the equality of two instances of the StructReader class."""
    return (
        self.field_readers == other.field_readers and self.create_struct == other.create_struct
        if isinstance(other, StructReader)
        else False
    )

__hash__()

Return a hashed representation of the StructReader class.

Source code in pyiceberg/avro/reader.py
def __hash__(self) -> int:
    """Return a hashed representation of the StructReader class."""
    return self._hash

__repr__()

Return the string representation of the StructReader class.

Source code in pyiceberg/avro/reader.py
def __repr__(self) -> str:
    """Return the string representation of the StructReader class."""
    return f"StructReader(({','.join(repr(field) for field in self.field_readers)}), {repr(self.create_struct)})"

TimeReader

Bases: IntegerReader

Reads a microsecond granularity timestamp from the stream.

Long is decoded as an integer which represents the number of microseconds from the unix epoch, 1 January 1970.

Source code in pyiceberg/avro/reader.py
class TimeReader(IntegerReader):
    """Reads a microsecond granularity timestamp from the stream.

    Long is decoded as an integer which represents
    the number of microseconds from the unix epoch, 1 January 1970.
    """

TimestampReader

Bases: IntegerReader

Reads a microsecond granularity timestamp from the stream.

Long is decoded as python integer which represents the number of microseconds from the unix epoch, 1 January 1970.

Source code in pyiceberg/avro/reader.py
class TimestampReader(IntegerReader):
    """Reads a microsecond granularity timestamp from the stream.

    Long is decoded as python integer which represents
    the number of microseconds from the unix epoch, 1 January 1970.
    """

TimestamptzReader

Bases: IntegerReader

Reads a microsecond granularity timestamptz from the stream.

Long is decoded as python integer which represents the number of microseconds from the unix epoch, 1 January 1970.

Adjusted to UTC.

Source code in pyiceberg/avro/reader.py
class TimestamptzReader(IntegerReader):
    """Reads a microsecond granularity timestamptz from the stream.

    Long is decoded as python integer which represents
    the number of microseconds from the unix epoch, 1 January 1970.

    Adjusted to UTC.
    """