Skip to content

serializers

Compressor

Bases: ABC

Source code in pyiceberg/serializers.py
class Compressor(ABC):
    @staticmethod
    def get_compressor(location: str) -> Compressor:
        return GzipCompressor() if location.endswith(".gz.metadata.json") else NOOP_COMPRESSOR

    @abstractmethod
    def stream_decompressor(self, inp: InputStream) -> InputStream:
        """Return a stream decompressor.

        Args:
            inp: The input stream that needs decompressing.

        Returns:
            The wrapped stream
        """

    @abstractmethod
    def bytes_compressor(self) -> Callable[[bytes], bytes]:
        """Return a function to compress bytes.

        Returns:
            A function that can be used to compress bytes.
        """

bytes_compressor() abstractmethod

Return a function to compress bytes.

Returns:

Type Description
Callable[[bytes], bytes]

A function that can be used to compress bytes.

Source code in pyiceberg/serializers.py
@abstractmethod
def bytes_compressor(self) -> Callable[[bytes], bytes]:
    """Return a function to compress bytes.

    Returns:
        A function that can be used to compress bytes.
    """

stream_decompressor(inp) abstractmethod

Return a stream decompressor.

Parameters:

Name Type Description Default
inp InputStream

The input stream that needs decompressing.

required

Returns:

Type Description
InputStream

The wrapped stream

Source code in pyiceberg/serializers.py
@abstractmethod
def stream_decompressor(self, inp: InputStream) -> InputStream:
    """Return a stream decompressor.

    Args:
        inp: The input stream that needs decompressing.

    Returns:
        The wrapped stream
    """

FromByteStream

A collection of methods that deserialize dictionaries into Iceberg objects.

Source code in pyiceberg/serializers.py
class FromByteStream:
    """A collection of methods that deserialize dictionaries into Iceberg objects."""

    @staticmethod
    def table_metadata(
        byte_stream: InputStream, encoding: str = UTF8, compression: Compressor = NOOP_COMPRESSOR
    ) -> TableMetadata:
        """Instantiate a TableMetadata object from a byte stream.

        Args:
            byte_stream: A file-like byte stream object.
            encoding (default "utf-8"): The byte encoder to use for the reader.
            compression: Optional compression method
        """
        with compression.stream_decompressor(byte_stream) as byte_stream:
            reader = codecs.getreader(encoding)
            json_bytes = reader(byte_stream)
            metadata = json_bytes.read()

        return TableMetadataUtil.parse_raw(metadata)

table_metadata(byte_stream, encoding=UTF8, compression=NOOP_COMPRESSOR) staticmethod

Instantiate a TableMetadata object from a byte stream.

Parameters:

Name Type Description Default
byte_stream InputStream

A file-like byte stream object.

required
encoding default "utf-8"

The byte encoder to use for the reader.

UTF8
compression Compressor

Optional compression method

NOOP_COMPRESSOR
Source code in pyiceberg/serializers.py
@staticmethod
def table_metadata(
    byte_stream: InputStream, encoding: str = UTF8, compression: Compressor = NOOP_COMPRESSOR
) -> TableMetadata:
    """Instantiate a TableMetadata object from a byte stream.

    Args:
        byte_stream: A file-like byte stream object.
        encoding (default "utf-8"): The byte encoder to use for the reader.
        compression: Optional compression method
    """
    with compression.stream_decompressor(byte_stream) as byte_stream:
        reader = codecs.getreader(encoding)
        json_bytes = reader(byte_stream)
        metadata = json_bytes.read()

    return TableMetadataUtil.parse_raw(metadata)

FromInputFile

A collection of methods that deserialize InputFiles into Iceberg objects.

Source code in pyiceberg/serializers.py
class FromInputFile:
    """A collection of methods that deserialize InputFiles into Iceberg objects."""

    @staticmethod
    def table_metadata(input_file: InputFile, encoding: str = UTF8) -> TableMetadata:
        """Create a TableMetadata instance from an input file.

        Args:
            input_file (InputFile): A custom implementation of the iceberg.io.file.InputFile abstract base class.
            encoding (str): Encoding to use when loading bytestream.

        Returns:
            TableMetadata: A table metadata instance.

        """
        with input_file.open() as input_stream:
            return FromByteStream.table_metadata(
                byte_stream=input_stream, encoding=encoding, compression=Compressor.get_compressor(location=input_file.location)
            )

table_metadata(input_file, encoding=UTF8) staticmethod

Create a TableMetadata instance from an input file.

Parameters:

Name Type Description Default
input_file InputFile

A custom implementation of the iceberg.io.file.InputFile abstract base class.

required
encoding str

Encoding to use when loading bytestream.

UTF8

Returns:

Name Type Description
TableMetadata TableMetadata

A table metadata instance.

Source code in pyiceberg/serializers.py
@staticmethod
def table_metadata(input_file: InputFile, encoding: str = UTF8) -> TableMetadata:
    """Create a TableMetadata instance from an input file.

    Args:
        input_file (InputFile): A custom implementation of the iceberg.io.file.InputFile abstract base class.
        encoding (str): Encoding to use when loading bytestream.

    Returns:
        TableMetadata: A table metadata instance.

    """
    with input_file.open() as input_stream:
        return FromByteStream.table_metadata(
            byte_stream=input_stream, encoding=encoding, compression=Compressor.get_compressor(location=input_file.location)
        )

ToOutputFile

A collection of methods that serialize Iceberg objects into files given an OutputFile instance.

Source code in pyiceberg/serializers.py
class ToOutputFile:
    """A collection of methods that serialize Iceberg objects into files given an OutputFile instance."""

    @staticmethod
    def table_metadata(metadata: TableMetadata, output_file: OutputFile, overwrite: bool = False) -> None:
        """Write a TableMetadata instance to an output file.

        Args:
            output_file (OutputFile): A custom implementation of the iceberg.io.file.OutputFile abstract base class.
            overwrite (bool): Where to overwrite the file if it already exists. Defaults to `False`.
        """
        with output_file.create(overwrite=overwrite) as output_stream:
            # We need to serialize None values, in order to dump `None` current-snapshot-id as `-1`
            exclude_none = False if Config().get_bool("legacy-current-snapshot-id") else True

            json_bytes = metadata.model_dump_json(exclude_none=exclude_none).encode(UTF8)
            json_bytes = Compressor.get_compressor(output_file.location).bytes_compressor()(json_bytes)
            output_stream.write(json_bytes)

table_metadata(metadata, output_file, overwrite=False) staticmethod

Write a TableMetadata instance to an output file.

Parameters:

Name Type Description Default
output_file OutputFile

A custom implementation of the iceberg.io.file.OutputFile abstract base class.

required
overwrite bool

Where to overwrite the file if it already exists. Defaults to False.

False
Source code in pyiceberg/serializers.py
@staticmethod
def table_metadata(metadata: TableMetadata, output_file: OutputFile, overwrite: bool = False) -> None:
    """Write a TableMetadata instance to an output file.

    Args:
        output_file (OutputFile): A custom implementation of the iceberg.io.file.OutputFile abstract base class.
        overwrite (bool): Where to overwrite the file if it already exists. Defaults to `False`.
    """
    with output_file.create(overwrite=overwrite) as output_stream:
        # We need to serialize None values, in order to dump `None` current-snapshot-id as `-1`
        exclude_none = False if Config().get_bool("legacy-current-snapshot-id") else True

        json_bytes = metadata.model_dump_json(exclude_none=exclude_none).encode(UTF8)
        json_bytes = Compressor.get_compressor(output_file.location).bytes_compressor()(json_bytes)
        output_stream.write(json_bytes)