Skip to content

file

Avro reader for reading Avro files.

AvroFile

Bases: Generic[D]

Source code in pyiceberg/avro/file.py
class AvroFile(Generic[D]):
    __slots__ = (
        "input_file",
        "read_schema",
        "read_types",
        "read_enums",
        "header",
        "schema",
        "reader",
        "decoder",
        "block",
    )
    input_file: InputFile
    read_schema: Optional[Schema]
    read_types: Dict[int, Callable[..., StructProtocol]]
    read_enums: Dict[int, Callable[..., Enum]]
    header: AvroFileHeader
    schema: Schema
    reader: Reader

    decoder: BinaryDecoder
    block: Optional[Block[D]]

    def __init__(
        self,
        input_file: InputFile,
        read_schema: Optional[Schema] = None,
        read_types: Dict[int, Callable[..., StructProtocol]] = EMPTY_DICT,
        read_enums: Dict[int, Callable[..., Enum]] = EMPTY_DICT,
    ) -> None:
        self.input_file = input_file
        self.read_schema = read_schema
        self.read_types = read_types
        self.read_enums = read_enums
        self.block = None

    def __enter__(self) -> AvroFile[D]:
        """Generate a reader tree for the payload within an avro file.

        Return:
            A generator returning the AvroStructs.
        """
        with self.input_file.open() as f:
            self.decoder = new_decoder(f.read())
        self.header = self._read_header()
        self.schema = self.header.get_schema()
        if not self.read_schema:
            self.read_schema = self.schema

        self.reader = resolve_reader(self.schema, self.read_schema, self.read_types, self.read_enums)

        return self

    def __exit__(
        self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType]
    ) -> None:
        """Perform cleanup when exiting the scope of a 'with' statement."""

    def __iter__(self) -> AvroFile[D]:
        """Return an iterator for the AvroFile class."""
        return self

    def _read_block(self) -> int:
        # If there is already a block, we'll have the sync bytes
        if self.block:
            sync_marker = self.decoder.read(SYNC_SIZE)
            if sync_marker != self.header.sync:
                raise ValueError(f"Expected sync bytes {self.header.sync!r}, but got {sync_marker!r}")
        block_records = self.decoder.read_int()

        block_bytes = self.decoder.read_bytes()
        if codec := self.header.compression_codec():
            block_bytes = codec.decompress(block_bytes)

        self.block = Block(reader=self.reader, block_records=block_records, block_decoder=new_decoder(block_bytes))
        return block_records

    def __next__(self) -> D:
        """Return the next item when iterating over the AvroFile class."""
        if self.block and self.block.has_next():
            return next(self.block)

        try:
            new_block = self._read_block()
        except EOFError as exc:
            raise StopIteration from exc

        if new_block > 0:
            return self.__next__()
        raise StopIteration

    def _read_header(self) -> AvroFileHeader:
        return construct_reader(META_SCHEMA, {-1: AvroFileHeader}).read(self.decoder)

__enter__()

Generate a reader tree for the payload within an avro file.

Return

A generator returning the AvroStructs.

Source code in pyiceberg/avro/file.py
def __enter__(self) -> AvroFile[D]:
    """Generate a reader tree for the payload within an avro file.

    Return:
        A generator returning the AvroStructs.
    """
    with self.input_file.open() as f:
        self.decoder = new_decoder(f.read())
    self.header = self._read_header()
    self.schema = self.header.get_schema()
    if not self.read_schema:
        self.read_schema = self.schema

    self.reader = resolve_reader(self.schema, self.read_schema, self.read_types, self.read_enums)

    return self

__exit__(exctype, excinst, exctb)

Perform cleanup when exiting the scope of a 'with' statement.

Source code in pyiceberg/avro/file.py
def __exit__(
    self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType]
) -> None:
    """Perform cleanup when exiting the scope of a 'with' statement."""

__iter__()

Return an iterator for the AvroFile class.

Source code in pyiceberg/avro/file.py
def __iter__(self) -> AvroFile[D]:
    """Return an iterator for the AvroFile class."""
    return self

__next__()

Return the next item when iterating over the AvroFile class.

Source code in pyiceberg/avro/file.py
def __next__(self) -> D:
    """Return the next item when iterating over the AvroFile class."""
    if self.block and self.block.has_next():
        return next(self.block)

    try:
        new_block = self._read_block()
    except EOFError as exc:
        raise StopIteration from exc

    if new_block > 0:
        return self.__next__()
    raise StopIteration

AvroFileHeader

Bases: Record

Source code in pyiceberg/avro/file.py
class AvroFileHeader(Record):
    __slots__ = ("magic", "meta", "sync")
    magic: bytes
    meta: Dict[str, str]
    sync: bytes

    def compression_codec(self) -> Optional[Type[Codec]]:
        """Get the file's compression codec algorithm from the file's metadata.

        In the case of a null codec, we return a None indicating that we
        don't need to compress/decompress.
        """
        codec_name = self.meta.get(_CODEC_KEY, "null")
        if codec_name not in KNOWN_CODECS:
            raise ValueError(f"Unsupported codec: {codec_name}")

        return KNOWN_CODECS[codec_name]

    def get_schema(self) -> Schema:
        if _SCHEMA_KEY in self.meta:
            avro_schema_string = self.meta[_SCHEMA_KEY]
            avro_schema = json.loads(avro_schema_string)
            return AvroSchemaConversion().avro_to_iceberg(avro_schema)
        else:
            raise ValueError("No schema found in Avro file headers")

compression_codec()

Get the file's compression codec algorithm from the file's metadata.

In the case of a null codec, we return a None indicating that we don't need to compress/decompress.

Source code in pyiceberg/avro/file.py
def compression_codec(self) -> Optional[Type[Codec]]:
    """Get the file's compression codec algorithm from the file's metadata.

    In the case of a null codec, we return a None indicating that we
    don't need to compress/decompress.
    """
    codec_name = self.meta.get(_CODEC_KEY, "null")
    if codec_name not in KNOWN_CODECS:
        raise ValueError(f"Unsupported codec: {codec_name}")

    return KNOWN_CODECS[codec_name]

AvroOutputFile

Bases: Generic[D]

Source code in pyiceberg/avro/file.py
class AvroOutputFile(Generic[D]):
    output_file: OutputFile
    output_stream: OutputStream
    file_schema: Schema
    schema_name: str
    encoder: BinaryEncoder
    sync_bytes: bytes
    writer: Writer

    def __init__(
        self,
        output_file: OutputFile,
        file_schema: Schema,
        schema_name: str,
        record_schema: Optional[Schema] = None,
        metadata: Dict[str, str] = EMPTY_DICT,
    ) -> None:
        self.output_file = output_file
        self.file_schema = file_schema
        self.schema_name = schema_name
        self.sync_bytes = os.urandom(SYNC_SIZE)
        self.writer = (
            construct_writer(file_schema=self.file_schema)
            if record_schema is None
            else resolve_writer(record_schema=record_schema, file_schema=self.file_schema)
        )
        self.metadata = metadata

    def __enter__(self) -> AvroOutputFile[D]:
        """
        Open the file and writes the header.

        Returns:
            The file object to write records to
        """
        self.output_stream = self.output_file.create(overwrite=True)
        self.encoder = BinaryEncoder(self.output_stream)

        self._write_header()

        return self

    def __exit__(
        self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType]
    ) -> None:
        """Perform cleanup when exiting the scope of a 'with' statement."""
        self.output_stream.close()

    def _write_header(self) -> None:
        json_schema = json.dumps(AvroSchemaConversion().iceberg_to_avro(self.file_schema, schema_name=self.schema_name))
        meta = {**self.metadata, _SCHEMA_KEY: json_schema, _CODEC_KEY: "null"}
        header = AvroFileHeader(magic=MAGIC, meta=meta, sync=self.sync_bytes)
        construct_writer(META_SCHEMA).write(self.encoder, header)

    def write_block(self, objects: List[D]) -> None:
        in_memory = io.BytesIO()
        block_content_encoder = BinaryEncoder(output_stream=in_memory)
        for obj in objects:
            self.writer.write(block_content_encoder, obj)
        block_content = in_memory.getvalue()

        self.encoder.write_int(len(objects))
        self.encoder.write_int(len(block_content))
        self.encoder.write(block_content)
        self.encoder.write(self.sync_bytes)

__enter__()

Open the file and writes the header.

Returns:

Type Description
AvroOutputFile[D]

The file object to write records to

Source code in pyiceberg/avro/file.py
def __enter__(self) -> AvroOutputFile[D]:
    """
    Open the file and writes the header.

    Returns:
        The file object to write records to
    """
    self.output_stream = self.output_file.create(overwrite=True)
    self.encoder = BinaryEncoder(self.output_stream)

    self._write_header()

    return self

__exit__(exctype, excinst, exctb)

Perform cleanup when exiting the scope of a 'with' statement.

Source code in pyiceberg/avro/file.py
def __exit__(
    self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType]
) -> None:
    """Perform cleanup when exiting the scope of a 'with' statement."""
    self.output_stream.close()

Block dataclass

Bases: Generic[D]

Source code in pyiceberg/avro/file.py
@dataclass
class Block(Generic[D]):
    reader: Reader
    block_records: int
    block_decoder: BinaryDecoder
    position: int = 0

    def __iter__(self) -> Block[D]:
        """Return an iterator for the Block class."""
        return self

    def has_next(self) -> bool:
        return self.position < self.block_records

    def __next__(self) -> D:
        """Return the next item when iterating over the Block class."""
        if self.has_next():
            self.position += 1
            return self.reader.read(self.block_decoder)
        raise StopIteration

__iter__()

Return an iterator for the Block class.

Source code in pyiceberg/avro/file.py
def __iter__(self) -> Block[D]:
    """Return an iterator for the Block class."""
    return self

__next__()

Return the next item when iterating over the Block class.

Source code in pyiceberg/avro/file.py
def __next__(self) -> D:
    """Return the next item when iterating over the Block class."""
    if self.has_next():
        self.position += 1
        return self.reader.read(self.block_decoder)
    raise StopIteration