Skip to content

decoder

BinaryDecoder

Bases: ABC

Decodes bytes into Python physical primitives.

Source code in pyiceberg/avro/decoder.py
class BinaryDecoder(ABC):
    """Decodes bytes into Python physical primitives."""

    @abstractmethod
    def tell(self) -> int:
        """Return the current position."""

    @abstractmethod
    def read(self, n: int) -> bytes:
        """Read n bytes."""

    @abstractmethod
    def skip(self, n: int) -> None:
        """Skip n bytes."""

    def read_boolean(self) -> bool:
        """Read a value from the stream as a boolean.

        A boolean is written as a single byte
        whose value is either 0 (false) or 1 (true).
        """
        return ord(self.read(1)) == 1

    def read_int(self) -> int:
        """Read an int/long value.

        int/long values are written using variable-length, zigzag coding.
        """
        b = ord(self.read(1))
        n = b & 0x7F
        shift = 7
        while (b & 0x80) != 0:
            b = ord(self.read(1))
            n |= (b & 0x7F) << shift
            shift += 7
        datum = (n >> 1) ^ -(n & 1)
        return datum

    def read_ints(self, n: int) -> Tuple[int, ...]:
        """Read a list of integers."""
        return tuple(self.read_int() for _ in range(n))

    def read_int_bytes_dict(self, n: int, dest: Dict[int, bytes]) -> None:
        """Read a dictionary of integers for keys and bytes for values into a destination dictionary."""
        for _ in range(n):
            k = self.read_int()
            v = self.read_bytes()
            dest[k] = v

    def read_float(self) -> float:
        """Read a value from the stream as a float.

        A float is written as 4 bytes.
        The float is converted into a 32-bit integer using a method equivalent to
        Java's floatToIntBits and then encoded in little-endian format.
        """
        return float(cast(Tuple[float, ...], STRUCT_FLOAT.unpack(self.read(4)))[0])

    def read_double(self) -> float:
        """Read a value from the stream as a double.

        A double is written as 8 bytes.
        The double is converted into a 64-bit integer using a method equivalent to
        Java's doubleToLongBits and then encoded in little-endian format.
        """
        return float(cast(Tuple[float, ...], STRUCT_DOUBLE.unpack(self.read(8)))[0])

    def read_bytes(self) -> bytes:
        """Bytes are encoded as a long followed by that many bytes of data."""
        num_bytes = self.read_int()
        return self.read(num_bytes) if num_bytes > 0 else b""

    def read_utf8(self) -> str:
        """Read an utf-8 encoded string from the stream.

        A string is encoded as a long followed by
        that many bytes of UTF-8 encoded character data.
        """
        return self.read_bytes().decode(UTF8)

    def skip_boolean(self) -> None:
        self.skip(1)

    def skip_int(self) -> None:
        b = ord(self.read(1))
        while (b & 0x80) != 0:
            b = ord(self.read(1))

    def skip_float(self) -> None:
        self.skip(4)

    def skip_double(self) -> None:
        self.skip(8)

    def skip_bytes(self) -> None:
        self.skip(self.read_int())

    def skip_utf8(self) -> None:
        self.skip_bytes()

read(n) abstractmethod

Read n bytes.

Source code in pyiceberg/avro/decoder.py
@abstractmethod
def read(self, n: int) -> bytes:
    """Read n bytes."""

read_boolean()

Read a value from the stream as a boolean.

A boolean is written as a single byte whose value is either 0 (false) or 1 (true).

Source code in pyiceberg/avro/decoder.py
def read_boolean(self) -> bool:
    """Read a value from the stream as a boolean.

    A boolean is written as a single byte
    whose value is either 0 (false) or 1 (true).
    """
    return ord(self.read(1)) == 1

read_bytes()

Bytes are encoded as a long followed by that many bytes of data.

Source code in pyiceberg/avro/decoder.py
def read_bytes(self) -> bytes:
    """Bytes are encoded as a long followed by that many bytes of data."""
    num_bytes = self.read_int()
    return self.read(num_bytes) if num_bytes > 0 else b""

read_double()

Read a value from the stream as a double.

A double is written as 8 bytes. The double is converted into a 64-bit integer using a method equivalent to Java's doubleToLongBits and then encoded in little-endian format.

Source code in pyiceberg/avro/decoder.py
def read_double(self) -> float:
    """Read a value from the stream as a double.

    A double is written as 8 bytes.
    The double is converted into a 64-bit integer using a method equivalent to
    Java's doubleToLongBits and then encoded in little-endian format.
    """
    return float(cast(Tuple[float, ...], STRUCT_DOUBLE.unpack(self.read(8)))[0])

read_float()

Read a value from the stream as a float.

A float is written as 4 bytes. The float is converted into a 32-bit integer using a method equivalent to Java's floatToIntBits and then encoded in little-endian format.

Source code in pyiceberg/avro/decoder.py
def read_float(self) -> float:
    """Read a value from the stream as a float.

    A float is written as 4 bytes.
    The float is converted into a 32-bit integer using a method equivalent to
    Java's floatToIntBits and then encoded in little-endian format.
    """
    return float(cast(Tuple[float, ...], STRUCT_FLOAT.unpack(self.read(4)))[0])

read_int()

Read an int/long value.

int/long values are written using variable-length, zigzag coding.

Source code in pyiceberg/avro/decoder.py
def read_int(self) -> int:
    """Read an int/long value.

    int/long values are written using variable-length, zigzag coding.
    """
    b = ord(self.read(1))
    n = b & 0x7F
    shift = 7
    while (b & 0x80) != 0:
        b = ord(self.read(1))
        n |= (b & 0x7F) << shift
        shift += 7
    datum = (n >> 1) ^ -(n & 1)
    return datum

read_int_bytes_dict(n, dest)

Read a dictionary of integers for keys and bytes for values into a destination dictionary.

Source code in pyiceberg/avro/decoder.py
def read_int_bytes_dict(self, n: int, dest: Dict[int, bytes]) -> None:
    """Read a dictionary of integers for keys and bytes for values into a destination dictionary."""
    for _ in range(n):
        k = self.read_int()
        v = self.read_bytes()
        dest[k] = v

read_ints(n)

Read a list of integers.

Source code in pyiceberg/avro/decoder.py
def read_ints(self, n: int) -> Tuple[int, ...]:
    """Read a list of integers."""
    return tuple(self.read_int() for _ in range(n))

read_utf8()

Read an utf-8 encoded string from the stream.

A string is encoded as a long followed by that many bytes of UTF-8 encoded character data.

Source code in pyiceberg/avro/decoder.py
def read_utf8(self) -> str:
    """Read an utf-8 encoded string from the stream.

    A string is encoded as a long followed by
    that many bytes of UTF-8 encoded character data.
    """
    return self.read_bytes().decode(UTF8)

skip(n) abstractmethod

Skip n bytes.

Source code in pyiceberg/avro/decoder.py
@abstractmethod
def skip(self, n: int) -> None:
    """Skip n bytes."""

tell() abstractmethod

Return the current position.

Source code in pyiceberg/avro/decoder.py
@abstractmethod
def tell(self) -> int:
    """Return the current position."""

StreamingBinaryDecoder

Bases: BinaryDecoder

Decodes bytes into Python physical primitives.

Source code in pyiceberg/avro/decoder.py
class StreamingBinaryDecoder(BinaryDecoder):
    """Decodes bytes into Python physical primitives."""

    __slots__ = "_input_stream"
    _input_stream: InputStream

    def __init__(self, input_stream: Union[bytes, InputStream]) -> None:
        """Reader is a Python object on which we can call read, seek, and tell."""
        if isinstance(input_stream, bytes):
            # In the case of bytes, we wrap it into a BytesIO to make it a stream
            self._input_stream = io.BytesIO(input_stream)
        else:
            self._input_stream = input_stream

    def tell(self) -> int:
        """Return the current stream position."""
        return self._input_stream.tell()

    def read(self, n: int) -> bytes:
        """Read n bytes."""
        if n < 0:
            raise ValueError(f"Requested {n} bytes to read, expected positive integer.")
        data: List[bytes] = []

        n_remaining = n
        while n_remaining > 0:
            data_read = self._input_stream.read(n_remaining)
            read_len = len(data_read)
            if read_len == n:
                # If we read everything, we return directly
                # otherwise we'll continue to fetch the rest
                return data_read
            elif read_len <= 0:
                raise EOFError(f"EOF: read {read_len} bytes")
            data.append(data_read)
            n_remaining -= read_len

        return b"".join(data)

    def skip(self, n: int) -> None:
        self._input_stream.seek(n, SEEK_CUR)

__init__(input_stream)

Reader is a Python object on which we can call read, seek, and tell.

Source code in pyiceberg/avro/decoder.py
def __init__(self, input_stream: Union[bytes, InputStream]) -> None:
    """Reader is a Python object on which we can call read, seek, and tell."""
    if isinstance(input_stream, bytes):
        # In the case of bytes, we wrap it into a BytesIO to make it a stream
        self._input_stream = io.BytesIO(input_stream)
    else:
        self._input_stream = input_stream

read(n)

Read n bytes.

Source code in pyiceberg/avro/decoder.py
def read(self, n: int) -> bytes:
    """Read n bytes."""
    if n < 0:
        raise ValueError(f"Requested {n} bytes to read, expected positive integer.")
    data: List[bytes] = []

    n_remaining = n
    while n_remaining > 0:
        data_read = self._input_stream.read(n_remaining)
        read_len = len(data_read)
        if read_len == n:
            # If we read everything, we return directly
            # otherwise we'll continue to fetch the rest
            return data_read
        elif read_len <= 0:
            raise EOFError(f"EOF: read {read_len} bytes")
        data.append(data_read)
        n_remaining -= read_len

    return b"".join(data)

tell()

Return the current stream position.

Source code in pyiceberg/avro/decoder.py
def tell(self) -> int:
    """Return the current stream position."""
    return self._input_stream.tell()