Skip to content

manifest

DataFile

Bases: Record

Source code in pyiceberg/manifest.py
class DataFile(Record):
    @classmethod
    def from_args(cls, _table_format_version: TableVersion = DEFAULT_READ_VERSION, **arguments: Any) -> DataFile:
        struct = DATA_FILE_TYPE[_table_format_version]
        return super()._bind(struct, **arguments)

    @property
    def content(self) -> DataFileContent:
        return self._data[0]

    @property
    def file_path(self) -> str:
        return self._data[1]

    @property
    def file_format(self) -> FileFormat:
        return self._data[2]

    @property
    def partition(self) -> Record:
        return self._data[3]

    @property
    def record_count(self) -> int:
        return self._data[4]

    @property
    def file_size_in_bytes(self) -> int:
        return self._data[5]

    @property
    def column_sizes(self) -> Dict[int, int]:
        return self._data[6]

    @property
    def value_counts(self) -> Dict[int, int]:
        return self._data[7]

    @property
    def null_value_counts(self) -> Dict[int, int]:
        return self._data[8]

    @property
    def nan_value_counts(self) -> Dict[int, int]:
        return self._data[9]

    @property
    def lower_bounds(self) -> Dict[int, bytes]:
        return self._data[10]

    @property
    def upper_bounds(self) -> Dict[int, bytes]:
        return self._data[11]

    @property
    def key_metadata(self) -> Optional[bytes]:
        return self._data[12]

    @property
    def split_offsets(self) -> Optional[List[int]]:
        return self._data[13]

    @property
    def equality_ids(self) -> Optional[List[int]]:
        return self._data[14]

    @property
    def sort_order_id(self) -> Optional[int]:
        return self._data[15]

    # Spec ID should not be stored in the file
    _spec_id: int

    @property
    def spec_id(self) -> int:
        return self._spec_id

    @spec_id.setter
    def spec_id(self, value: int) -> None:
        self._spec_id = value

    def __setattr__(self, name: str, value: Any) -> None:
        """Assign a key/value to a DataFile."""
        # The file_format is written as a string, so we need to cast it to the Enum
        if name == "file_format":
            value = FileFormat[value]
        super().__setattr__(name, value)

    def __hash__(self) -> int:
        """Return the hash of the file path."""
        return hash(self.file_path)

    def __eq__(self, other: Any) -> bool:
        """Compare the datafile with another object.

        If it is a datafile, it will compare based on the file_path.
        """
        return self.file_path == other.file_path if isinstance(other, DataFile) else False

__eq__(other)

Compare the datafile with another object.

If it is a datafile, it will compare based on the file_path.

Source code in pyiceberg/manifest.py
def __eq__(self, other: Any) -> bool:
    """Compare the datafile with another object.

    If it is a datafile, it will compare based on the file_path.
    """
    return self.file_path == other.file_path if isinstance(other, DataFile) else False

__hash__()

Return the hash of the file path.

Source code in pyiceberg/manifest.py
def __hash__(self) -> int:
    """Return the hash of the file path."""
    return hash(self.file_path)

__setattr__(name, value)

Assign a key/value to a DataFile.

Source code in pyiceberg/manifest.py
def __setattr__(self, name: str, value: Any) -> None:
    """Assign a key/value to a DataFile."""
    # The file_format is written as a string, so we need to cast it to the Enum
    if name == "file_format":
        value = FileFormat[value]
    super().__setattr__(name, value)

DataFileContent

Bases: int, Enum

Source code in pyiceberg/manifest.py
class DataFileContent(int, Enum):
    DATA = 0
    POSITION_DELETES = 1
    EQUALITY_DELETES = 2

    def __repr__(self) -> str:
        """Return the string representation of the DataFileContent class."""
        return f"DataFileContent.{self.name}"

__repr__()

Return the string representation of the DataFileContent class.

Source code in pyiceberg/manifest.py
def __repr__(self) -> str:
    """Return the string representation of the DataFileContent class."""
    return f"DataFileContent.{self.name}"

FileFormat

Bases: str, Enum

Source code in pyiceberg/manifest.py
class FileFormat(str, Enum):
    AVRO = "AVRO"
    PARQUET = "PARQUET"
    ORC = "ORC"
    PUFFIN = "PUFFIN"

    @classmethod
    def _missing_(cls, value: object) -> Union[None, str]:
        for member in cls:
            if member.value == str(value).upper():
                return member
        return None

    def __repr__(self) -> str:
        """Return the string representation of the FileFormat class."""
        return f"FileFormat.{self.name}"

__repr__()

Return the string representation of the FileFormat class.

Source code in pyiceberg/manifest.py
def __repr__(self) -> str:
    """Return the string representation of the FileFormat class."""
    return f"FileFormat.{self.name}"

ManifestContent

Bases: int, Enum

Source code in pyiceberg/manifest.py
class ManifestContent(int, Enum):
    DATA = 0
    DELETES = 1

    def __repr__(self) -> str:
        """Return the string representation of the ManifestContent class."""
        return f"ManifestContent.{self.name}"

__repr__()

Return the string representation of the ManifestContent class.

Source code in pyiceberg/manifest.py
def __repr__(self) -> str:
    """Return the string representation of the ManifestContent class."""
    return f"ManifestContent.{self.name}"

ManifestEntryStatus

Bases: int, Enum

Source code in pyiceberg/manifest.py
class ManifestEntryStatus(int, Enum):
    EXISTING = 0
    ADDED = 1
    DELETED = 2

    def __repr__(self) -> str:
        """Return the string representation of the ManifestEntryStatus class."""
        return f"ManifestEntryStatus.{self.name}"

__repr__()

Return the string representation of the ManifestEntryStatus class.

Source code in pyiceberg/manifest.py
def __repr__(self) -> str:
    """Return the string representation of the ManifestEntryStatus class."""
    return f"ManifestEntryStatus.{self.name}"

ManifestFile

Bases: Record

Source code in pyiceberg/manifest.py
class ManifestFile(Record):
    @classmethod
    def from_args(cls, _table_format_version: TableVersion = DEFAULT_READ_VERSION, **arguments: Any) -> ManifestFile:
        return super()._bind(**arguments, struct=MANIFEST_LIST_FILE_SCHEMAS[_table_format_version])

    @property
    def manifest_path(self) -> str:
        return self._data[0]

    @property
    def manifest_length(self) -> int:
        return self._data[1]

    @property
    def partition_spec_id(self) -> int:
        return self._data[2]

    @property
    def content(self) -> ManifestContent:
        return self._data[3]

    @property
    def sequence_number(self) -> int:
        return self._data[4]

    @sequence_number.setter
    def sequence_number(self, value: int) -> None:
        self._data[4] = value

    @property
    def min_sequence_number(self) -> int:
        return self._data[5]

    @min_sequence_number.setter
    def min_sequence_number(self, value: int) -> None:
        self._data[5] = value

    @property
    def added_snapshot_id(self) -> Optional[int]:
        return self._data[6]

    @property
    def added_files_count(self) -> Optional[int]:
        return self._data[7]

    @property
    def existing_files_count(self) -> Optional[int]:
        return self._data[8]

    @property
    def deleted_files_count(self) -> Optional[int]:
        return self._data[9]

    @property
    def added_rows_count(self) -> Optional[int]:
        return self._data[10]

    @property
    def existing_rows_count(self) -> Optional[int]:
        return self._data[11]

    @property
    def deleted_rows_count(self) -> Optional[int]:
        return self._data[12]

    @property
    def partitions(self) -> Optional[List[PartitionFieldSummary]]:
        return self._data[13]

    @property
    def key_metadata(self) -> Optional[bytes]:
        return self._data[14]

    def has_added_files(self) -> bool:
        return self.added_files_count is None or self.added_files_count > 0

    def has_existing_files(self) -> bool:
        return self.existing_files_count is None or self.existing_files_count > 0

    def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> List[ManifestEntry]:
        """
        Read the manifest entries from the manifest file.

        Args:
            io: The FileIO to fetch the file.
            discard_deleted: Filter on live entries.

        Returns:
            An Iterator of manifest entries.
        """
        input_file = io.new_input(self.manifest_path)
        with AvroFile[ManifestEntry](
            input_file,
            MANIFEST_ENTRY_SCHEMAS[DEFAULT_READ_VERSION],
            read_types={-1: ManifestEntry, 2: DataFile},
            read_enums={0: ManifestEntryStatus, 101: FileFormat, 134: DataFileContent},
        ) as reader:
            return [
                _inherit_from_manifest(entry, self)
                for entry in reader
                if not discard_deleted or entry.status != ManifestEntryStatus.DELETED
            ]

    def __eq__(self, other: Any) -> bool:
        """Return the equality of two instances of the ManifestFile class."""
        return self.manifest_path == other.manifest_path if isinstance(other, ManifestFile) else False

    def __hash__(self) -> int:
        """Return the hash of manifest_path."""
        return hash(self.manifest_path)

__eq__(other)

Return the equality of two instances of the ManifestFile class.

Source code in pyiceberg/manifest.py
def __eq__(self, other: Any) -> bool:
    """Return the equality of two instances of the ManifestFile class."""
    return self.manifest_path == other.manifest_path if isinstance(other, ManifestFile) else False

__hash__()

Return the hash of manifest_path.

Source code in pyiceberg/manifest.py
def __hash__(self) -> int:
    """Return the hash of manifest_path."""
    return hash(self.manifest_path)

fetch_manifest_entry(io, discard_deleted=True)

Read the manifest entries from the manifest file.

Parameters:

Name Type Description Default
io FileIO

The FileIO to fetch the file.

required
discard_deleted bool

Filter on live entries.

True

Returns:

Type Description
List[ManifestEntry]

An Iterator of manifest entries.

Source code in pyiceberg/manifest.py
def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> List[ManifestEntry]:
    """
    Read the manifest entries from the manifest file.

    Args:
        io: The FileIO to fetch the file.
        discard_deleted: Filter on live entries.

    Returns:
        An Iterator of manifest entries.
    """
    input_file = io.new_input(self.manifest_path)
    with AvroFile[ManifestEntry](
        input_file,
        MANIFEST_ENTRY_SCHEMAS[DEFAULT_READ_VERSION],
        read_types={-1: ManifestEntry, 2: DataFile},
        read_enums={0: ManifestEntryStatus, 101: FileFormat, 134: DataFileContent},
    ) as reader:
        return [
            _inherit_from_manifest(entry, self)
            for entry in reader
            if not discard_deleted or entry.status != ManifestEntryStatus.DELETED
        ]

ManifestListWriter

Bases: ABC

Source code in pyiceberg/manifest.py
class ManifestListWriter(ABC):
    _format_version: TableVersion
    _output_file: OutputFile
    _meta: Dict[str, str]
    _manifest_files: List[ManifestFile]
    _commit_snapshot_id: int
    _writer: AvroOutputFile[ManifestFile]

    def __init__(self, format_version: TableVersion, output_file: OutputFile, meta: Dict[str, Any]):
        self._format_version = format_version
        self._output_file = output_file
        self._meta = meta
        self._manifest_files = []

    def __enter__(self) -> ManifestListWriter:
        """Open the writer for writing."""
        self._writer = AvroOutputFile[ManifestFile](
            output_file=self._output_file,
            record_schema=MANIFEST_LIST_FILE_SCHEMAS[DEFAULT_READ_VERSION],
            file_schema=MANIFEST_LIST_FILE_SCHEMAS[self._format_version],
            schema_name="manifest_file",
            metadata=self._meta,
        )
        self._writer.__enter__()
        return self

    def __exit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_value: Optional[BaseException],
        traceback: Optional[TracebackType],
    ) -> None:
        """Close the writer."""
        self._writer.__exit__(exc_type, exc_value, traceback)
        return

    @abstractmethod
    def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: ...

    def add_manifests(self, manifest_files: List[ManifestFile]) -> ManifestListWriter:
        self._writer.write_block([self.prepare_manifest(manifest_file) for manifest_file in manifest_files])
        return self

__enter__()

Open the writer for writing.

Source code in pyiceberg/manifest.py
def __enter__(self) -> ManifestListWriter:
    """Open the writer for writing."""
    self._writer = AvroOutputFile[ManifestFile](
        output_file=self._output_file,
        record_schema=MANIFEST_LIST_FILE_SCHEMAS[DEFAULT_READ_VERSION],
        file_schema=MANIFEST_LIST_FILE_SCHEMAS[self._format_version],
        schema_name="manifest_file",
        metadata=self._meta,
    )
    self._writer.__enter__()
    return self

__exit__(exc_type, exc_value, traceback)

Close the writer.

Source code in pyiceberg/manifest.py
def __exit__(
    self,
    exc_type: Optional[Type[BaseException]],
    exc_value: Optional[BaseException],
    traceback: Optional[TracebackType],
) -> None:
    """Close the writer."""
    self._writer.__exit__(exc_type, exc_value, traceback)
    return

ManifestWriter

Bases: ABC

Source code in pyiceberg/manifest.py
class ManifestWriter(ABC):
    closed: bool
    _spec: PartitionSpec
    _schema: Schema
    _output_file: OutputFile
    _writer: AvroOutputFile[ManifestEntry]
    _snapshot_id: int
    _added_files: int
    _added_rows: int
    _existing_files: int
    _existing_rows: int
    _deleted_files: int
    _deleted_rows: int
    _min_sequence_number: Optional[int]
    _partitions: List[Record]
    _compression: AvroCompressionCodec

    def __init__(
        self,
        spec: PartitionSpec,
        schema: Schema,
        output_file: OutputFile,
        snapshot_id: int,
        avro_compression: AvroCompressionCodec,
    ) -> None:
        self.closed = False
        self._spec = spec
        self._schema = schema
        self._output_file = output_file
        self._snapshot_id = snapshot_id

        self._added_files = 0
        self._added_rows = 0
        self._existing_files = 0
        self._existing_rows = 0
        self._deleted_files = 0
        self._deleted_rows = 0
        self._min_sequence_number = None
        self._partitions = []
        self._compression = avro_compression

    def __enter__(self) -> ManifestWriter:
        """Open the writer."""
        self._writer = self.new_writer()
        self._writer.__enter__()
        return self

    def __exit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_value: Optional[BaseException],
        traceback: Optional[TracebackType],
    ) -> None:
        """Close the writer."""
        if (self._added_files + self._existing_files + self._deleted_files) == 0:
            # This is just a guard to ensure that we don't write empty manifest files
            raise ValueError("An empty manifest file has been written")

        self.closed = True
        self._writer.__exit__(exc_type, exc_value, traceback)

    @abstractmethod
    def content(self) -> ManifestContent: ...

    @property
    @abstractmethod
    def version(self) -> TableVersion: ...

    @property
    def _meta(self) -> Dict[str, str]:
        return {
            "schema": self._schema.model_dump_json(),
            "partition-spec": to_json(self._spec.fields).decode("utf-8"),
            "partition-spec-id": str(self._spec.spec_id),
            "format-version": str(self.version),
            AVRO_CODEC_KEY: self._compression,
        }

    def _with_partition(self, format_version: TableVersion) -> Schema:
        data_file_type = data_file_with_partition(
            format_version=format_version, partition_type=self._spec.partition_type(self._schema)
        )
        return manifest_entry_schema_with_data_file(format_version=format_version, data_file=data_file_type)

    def new_writer(self) -> AvroOutputFile[ManifestEntry]:
        return AvroOutputFile[ManifestEntry](
            output_file=self._output_file,
            file_schema=self._with_partition(self.version),
            record_schema=self._with_partition(DEFAULT_READ_VERSION),
            schema_name="manifest_entry",
            metadata=self._meta,
        )

    @abstractmethod
    def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry: ...

    def to_manifest_file(self) -> ManifestFile:
        """Return the manifest file."""
        # once the manifest file is generated, no more entries can be added
        self.closed = True
        min_sequence_number = self._min_sequence_number or UNASSIGNED_SEQ
        return ManifestFile.from_args(
            manifest_path=self._output_file.location,
            manifest_length=len(self._writer.output_file),
            partition_spec_id=self._spec.spec_id,
            content=self.content(),
            sequence_number=UNASSIGNED_SEQ,
            min_sequence_number=min_sequence_number,
            added_snapshot_id=self._snapshot_id,
            added_files_count=self._added_files,
            existing_files_count=self._existing_files,
            deleted_files_count=self._deleted_files,
            added_rows_count=self._added_rows,
            existing_rows_count=self._existing_rows,
            deleted_rows_count=self._deleted_rows,
            partitions=construct_partition_summaries(self._spec, self._schema, self._partitions),
            key_metadata=None,
        )

    def add_entry(self, entry: ManifestEntry) -> ManifestWriter:
        if self.closed:
            raise RuntimeError("Cannot add entry to closed manifest writer")
        if entry.status == ManifestEntryStatus.ADDED:
            self._added_files += 1
            self._added_rows += entry.data_file.record_count
        elif entry.status == ManifestEntryStatus.EXISTING:
            self._existing_files += 1
            self._existing_rows += entry.data_file.record_count
        elif entry.status == ManifestEntryStatus.DELETED:
            self._deleted_files += 1
            self._deleted_rows += entry.data_file.record_count
        else:
            raise ValueError(f"Unknown entry: {entry.status}")

        self._partitions.append(entry.data_file.partition)

        if (
            (entry.status == ManifestEntryStatus.ADDED or entry.status == ManifestEntryStatus.EXISTING)
            and entry.sequence_number is not None
            and (self._min_sequence_number is None or entry.sequence_number < self._min_sequence_number)
        ):
            self._min_sequence_number = entry.sequence_number

        self._writer.write_block([self.prepare_entry(entry)])
        return self

    def add(self, entry: ManifestEntry) -> ManifestWriter:
        self.add_entry(
            ManifestEntry.from_args(
                status=ManifestEntryStatus.ADDED,
                snapshot_id=self._snapshot_id,
                sequence_number=entry.sequence_number if entry.sequence_number != UNASSIGNED_SEQ else None,
                data_file=entry.data_file,
            )
        )

        return self

    def delete(self, entry: ManifestEntry) -> ManifestWriter:
        self.add_entry(
            ManifestEntry.from_args(
                status=ManifestEntryStatus.DELETED,
                snapshot_id=self._snapshot_id,
                sequence_number=entry.sequence_number,
                file_sequence_number=entry.file_sequence_number,
                data_file=entry.data_file,
            )
        )
        return self

    def existing(self, entry: ManifestEntry) -> ManifestWriter:
        self.add_entry(
            ManifestEntry.from_args(
                status=ManifestEntryStatus.EXISTING,
                snapshot_id=entry.snapshot_id,
                sequence_number=entry.sequence_number,
                file_sequence_number=entry.file_sequence_number,
                data_file=entry.data_file,
            )
        )
        return self

__enter__()

Open the writer.

Source code in pyiceberg/manifest.py
def __enter__(self) -> ManifestWriter:
    """Open the writer."""
    self._writer = self.new_writer()
    self._writer.__enter__()
    return self

__exit__(exc_type, exc_value, traceback)

Close the writer.

Source code in pyiceberg/manifest.py
def __exit__(
    self,
    exc_type: Optional[Type[BaseException]],
    exc_value: Optional[BaseException],
    traceback: Optional[TracebackType],
) -> None:
    """Close the writer."""
    if (self._added_files + self._existing_files + self._deleted_files) == 0:
        # This is just a guard to ensure that we don't write empty manifest files
        raise ValueError("An empty manifest file has been written")

    self.closed = True
    self._writer.__exit__(exc_type, exc_value, traceback)

to_manifest_file()

Return the manifest file.

Source code in pyiceberg/manifest.py
def to_manifest_file(self) -> ManifestFile:
    """Return the manifest file."""
    # once the manifest file is generated, no more entries can be added
    self.closed = True
    min_sequence_number = self._min_sequence_number or UNASSIGNED_SEQ
    return ManifestFile.from_args(
        manifest_path=self._output_file.location,
        manifest_length=len(self._writer.output_file),
        partition_spec_id=self._spec.spec_id,
        content=self.content(),
        sequence_number=UNASSIGNED_SEQ,
        min_sequence_number=min_sequence_number,
        added_snapshot_id=self._snapshot_id,
        added_files_count=self._added_files,
        existing_files_count=self._existing_files,
        deleted_files_count=self._deleted_files,
        added_rows_count=self._added_rows,
        existing_rows_count=self._existing_rows,
        deleted_rows_count=self._deleted_rows,
        partitions=construct_partition_summaries(self._spec, self._schema, self._partitions),
        key_metadata=None,
    )

read_manifest_list(input_file)

Read the manifests from the manifest list.

Parameters:

Name Type Description Default
input_file InputFile

The input file where the stream can be read from.

required

Returns:

Type Description
Iterator[ManifestFile]

An iterator of ManifestFiles that are part of the list.

Source code in pyiceberg/manifest.py
def read_manifest_list(input_file: InputFile) -> Iterator[ManifestFile]:
    """
    Read the manifests from the manifest list.

    Args:
        input_file: The input file where the stream can be read from.

    Returns:
        An iterator of ManifestFiles that are part of the list.
    """
    with AvroFile[ManifestFile](
        input_file,
        MANIFEST_LIST_FILE_SCHEMAS[DEFAULT_READ_VERSION],
        read_types={-1: ManifestFile, 508: PartitionFieldSummary},
        read_enums={517: ManifestContent},
    ) as reader:
        yield from reader