Skip to content

manifest

DataFile

Bases: Record

Source code in pyiceberg/manifest.py
class DataFile(Record):
    __slots__ = (
        "content",
        "file_path",
        "file_format",
        "partition",
        "record_count",
        "file_size_in_bytes",
        "column_sizes",
        "value_counts",
        "null_value_counts",
        "nan_value_counts",
        "lower_bounds",
        "upper_bounds",
        "key_metadata",
        "split_offsets",
        "equality_ids",
        "sort_order_id",
        "spec_id",
    )
    content: DataFileContent
    file_path: str
    file_format: FileFormat
    partition: Record
    record_count: int
    file_size_in_bytes: int
    column_sizes: Dict[int, int]
    value_counts: Dict[int, int]
    null_value_counts: Dict[int, int]
    nan_value_counts: Dict[int, int]
    lower_bounds: Dict[int, bytes]
    upper_bounds: Dict[int, bytes]
    key_metadata: Optional[bytes]
    split_offsets: Optional[List[int]]
    equality_ids: Optional[List[int]]
    sort_order_id: Optional[int]
    spec_id: int

    def __setattr__(self, name: str, value: Any) -> None:
        """Assign a key/value to a DataFile."""
        # The file_format is written as a string, so we need to cast it to the Enum
        if name == "file_format":
            value = FileFormat[value]
        super().__setattr__(name, value)

    def __init__(self, format_version: TableVersion = DEFAULT_READ_VERSION, *data: Any, **named_data: Any) -> None:
        super().__init__(
            *data,
            **{"struct": DATA_FILE_TYPE[format_version], **named_data},
        )

    def __hash__(self) -> int:
        """Return the hash of the file path."""
        return hash(self.file_path)

    def __eq__(self, other: Any) -> bool:
        """Compare the datafile with another object.

        If it is a datafile, it will compare based on the file_path.
        """
        return self.file_path == other.file_path if isinstance(other, DataFile) else False

__eq__(other)

Compare the datafile with another object.

If it is a datafile, it will compare based on the file_path.

Source code in pyiceberg/manifest.py
def __eq__(self, other: Any) -> bool:
    """Compare the datafile with another object.

    If it is a datafile, it will compare based on the file_path.
    """
    return self.file_path == other.file_path if isinstance(other, DataFile) else False

__hash__()

Return the hash of the file path.

Source code in pyiceberg/manifest.py
def __hash__(self) -> int:
    """Return the hash of the file path."""
    return hash(self.file_path)

__setattr__(name, value)

Assign a key/value to a DataFile.

Source code in pyiceberg/manifest.py
def __setattr__(self, name: str, value: Any) -> None:
    """Assign a key/value to a DataFile."""
    # The file_format is written as a string, so we need to cast it to the Enum
    if name == "file_format":
        value = FileFormat[value]
    super().__setattr__(name, value)

DataFileContent

Bases: int, Enum

Source code in pyiceberg/manifest.py
class DataFileContent(int, Enum):
    DATA = 0
    POSITION_DELETES = 1
    EQUALITY_DELETES = 2

    def __repr__(self) -> str:
        """Return the string representation of the DataFileContent class."""
        return f"DataFileContent.{self.name}"

__repr__()

Return the string representation of the DataFileContent class.

Source code in pyiceberg/manifest.py
def __repr__(self) -> str:
    """Return the string representation of the DataFileContent class."""
    return f"DataFileContent.{self.name}"

FileFormat

Bases: str, Enum

Source code in pyiceberg/manifest.py
class FileFormat(str, Enum):
    AVRO = "AVRO"
    PARQUET = "PARQUET"
    ORC = "ORC"

    def __repr__(self) -> str:
        """Return the string representation of the FileFormat class."""
        return f"FileFormat.{self.name}"

__repr__()

Return the string representation of the FileFormat class.

Source code in pyiceberg/manifest.py
def __repr__(self) -> str:
    """Return the string representation of the FileFormat class."""
    return f"FileFormat.{self.name}"

ManifestContent

Bases: int, Enum

Source code in pyiceberg/manifest.py
class ManifestContent(int, Enum):
    DATA = 0
    DELETES = 1

    def __repr__(self) -> str:
        """Return the string representation of the ManifestContent class."""
        return f"ManifestContent.{self.name}"

__repr__()

Return the string representation of the ManifestContent class.

Source code in pyiceberg/manifest.py
def __repr__(self) -> str:
    """Return the string representation of the ManifestContent class."""
    return f"ManifestContent.{self.name}"

ManifestEntryStatus

Bases: int, Enum

Source code in pyiceberg/manifest.py
class ManifestEntryStatus(int, Enum):
    EXISTING = 0
    ADDED = 1
    DELETED = 2

    def __repr__(self) -> str:
        """Return the string representation of the ManifestEntryStatus class."""
        return f"ManifestEntryStatus.{self.name}"

__repr__()

Return the string representation of the ManifestEntryStatus class.

Source code in pyiceberg/manifest.py
def __repr__(self) -> str:
    """Return the string representation of the ManifestEntryStatus class."""
    return f"ManifestEntryStatus.{self.name}"

ManifestFile

Bases: Record

Source code in pyiceberg/manifest.py
class ManifestFile(Record):
    __slots__ = (
        "manifest_path",
        "manifest_length",
        "partition_spec_id",
        "content",
        "sequence_number",
        "min_sequence_number",
        "added_snapshot_id",
        "added_files_count",
        "existing_files_count",
        "deleted_files_count",
        "added_rows_count",
        "existing_rows_count",
        "deleted_rows_count",
        "partitions",
        "key_metadata",
    )
    manifest_path: str
    manifest_length: int
    partition_spec_id: int
    content: ManifestContent
    sequence_number: int
    min_sequence_number: int
    added_snapshot_id: int
    added_files_count: Optional[int]
    existing_files_count: Optional[int]
    deleted_files_count: Optional[int]
    added_rows_count: Optional[int]
    existing_rows_count: Optional[int]
    deleted_rows_count: Optional[int]
    partitions: Optional[List[PartitionFieldSummary]]
    key_metadata: Optional[bytes]

    def __init__(self, *data: Any, **named_data: Any) -> None:
        super().__init__(*data, **{"struct": MANIFEST_LIST_FILE_STRUCTS[DEFAULT_READ_VERSION], **named_data})

    def has_added_files(self) -> bool:
        return self.added_files_count is None or self.added_files_count > 0

    def has_existing_files(self) -> bool:
        return self.existing_files_count is None or self.existing_files_count > 0

    def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> List[ManifestEntry]:
        """
        Read the manifest entries from the manifest file.

        Args:
            io: The FileIO to fetch the file.
            discard_deleted: Filter on live entries.

        Returns:
            An Iterator of manifest entries.
        """
        input_file = io.new_input(self.manifest_path)
        with AvroFile[ManifestEntry](
            input_file,
            MANIFEST_ENTRY_SCHEMAS[DEFAULT_READ_VERSION],
            read_types={-1: ManifestEntry, 2: DataFile},
            read_enums={0: ManifestEntryStatus, 101: FileFormat, 134: DataFileContent},
        ) as reader:
            return [
                _inherit_from_manifest(entry, self)
                for entry in reader
                if not discard_deleted or entry.status != ManifestEntryStatus.DELETED
            ]

fetch_manifest_entry(io, discard_deleted=True)

Read the manifest entries from the manifest file.

Parameters:

Name Type Description Default
io FileIO

The FileIO to fetch the file.

required
discard_deleted bool

Filter on live entries.

True

Returns:

Type Description
List[ManifestEntry]

An Iterator of manifest entries.

Source code in pyiceberg/manifest.py
def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> List[ManifestEntry]:
    """
    Read the manifest entries from the manifest file.

    Args:
        io: The FileIO to fetch the file.
        discard_deleted: Filter on live entries.

    Returns:
        An Iterator of manifest entries.
    """
    input_file = io.new_input(self.manifest_path)
    with AvroFile[ManifestEntry](
        input_file,
        MANIFEST_ENTRY_SCHEMAS[DEFAULT_READ_VERSION],
        read_types={-1: ManifestEntry, 2: DataFile},
        read_enums={0: ManifestEntryStatus, 101: FileFormat, 134: DataFileContent},
    ) as reader:
        return [
            _inherit_from_manifest(entry, self)
            for entry in reader
            if not discard_deleted or entry.status != ManifestEntryStatus.DELETED
        ]

ManifestListWriter

Bases: ABC

Source code in pyiceberg/manifest.py
class ManifestListWriter(ABC):
    _format_version: TableVersion
    _output_file: OutputFile
    _meta: Dict[str, str]
    _manifest_files: List[ManifestFile]
    _commit_snapshot_id: int
    _writer: AvroOutputFile[ManifestFile]

    def __init__(self, format_version: TableVersion, output_file: OutputFile, meta: Dict[str, Any]):
        self._format_version = format_version
        self._output_file = output_file
        self._meta = meta
        self._manifest_files = []

    def __enter__(self) -> ManifestListWriter:
        """Open the writer for writing."""
        self._writer = AvroOutputFile[ManifestFile](
            output_file=self._output_file,
            record_schema=MANIFEST_LIST_FILE_SCHEMAS[DEFAULT_READ_VERSION],
            file_schema=MANIFEST_LIST_FILE_SCHEMAS[self._format_version],
            schema_name="manifest_file",
            metadata=self._meta,
        )
        self._writer.__enter__()
        return self

    def __exit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_value: Optional[BaseException],
        traceback: Optional[TracebackType],
    ) -> None:
        """Close the writer."""
        self._writer.__exit__(exc_type, exc_value, traceback)
        return

    @abstractmethod
    def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: ...

    def add_manifests(self, manifest_files: List[ManifestFile]) -> ManifestListWriter:
        self._writer.write_block([self.prepare_manifest(manifest_file) for manifest_file in manifest_files])
        return self

__enter__()

Open the writer for writing.

Source code in pyiceberg/manifest.py
def __enter__(self) -> ManifestListWriter:
    """Open the writer for writing."""
    self._writer = AvroOutputFile[ManifestFile](
        output_file=self._output_file,
        record_schema=MANIFEST_LIST_FILE_SCHEMAS[DEFAULT_READ_VERSION],
        file_schema=MANIFEST_LIST_FILE_SCHEMAS[self._format_version],
        schema_name="manifest_file",
        metadata=self._meta,
    )
    self._writer.__enter__()
    return self

__exit__(exc_type, exc_value, traceback)

Close the writer.

Source code in pyiceberg/manifest.py
def __exit__(
    self,
    exc_type: Optional[Type[BaseException]],
    exc_value: Optional[BaseException],
    traceback: Optional[TracebackType],
) -> None:
    """Close the writer."""
    self._writer.__exit__(exc_type, exc_value, traceback)
    return

ManifestWriter

Bases: ABC

Source code in pyiceberg/manifest.py
class ManifestWriter(ABC):
    closed: bool
    _spec: PartitionSpec
    _schema: Schema
    _output_file: OutputFile
    _writer: AvroOutputFile[ManifestEntry]
    _snapshot_id: int
    _added_files: int
    _added_rows: int
    _existing_files: int
    _existing_rows: int
    _deleted_files: int
    _deleted_rows: int
    _min_sequence_number: Optional[int]
    _partitions: List[Record]
    _reused_entry_wrapper: ManifestEntry

    def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int) -> None:
        self.closed = False
        self._spec = spec
        self._schema = schema
        self._output_file = output_file
        self._snapshot_id = snapshot_id

        self._added_files = 0
        self._added_rows = 0
        self._existing_files = 0
        self._existing_rows = 0
        self._deleted_files = 0
        self._deleted_rows = 0
        self._min_sequence_number = None
        self._partitions = []
        self._reused_entry_wrapper = ManifestEntry()

    def __enter__(self) -> ManifestWriter:
        """Open the writer."""
        self._writer = self.new_writer()
        self._writer.__enter__()
        return self

    def __exit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_value: Optional[BaseException],
        traceback: Optional[TracebackType],
    ) -> None:
        """Close the writer."""
        if (self._added_files + self._existing_files + self._deleted_files) == 0:
            # This is just a guard to ensure that we don't write empty manifest files
            raise ValueError("An empty manifest file has been written")

        self.closed = True
        self._writer.__exit__(exc_type, exc_value, traceback)

    @abstractmethod
    def content(self) -> ManifestContent: ...

    @property
    @abstractmethod
    def version(self) -> TableVersion: ...

    @property
    def _meta(self) -> Dict[str, str]:
        return {
            "schema": self._schema.model_dump_json(),
            "partition-spec": to_json(self._spec.fields).decode("utf-8"),
            "partition-spec-id": str(self._spec.spec_id),
            "format-version": str(self.version),
        }

    def _with_partition(self, format_version: TableVersion) -> Schema:
        data_file_type = data_file_with_partition(
            format_version=format_version, partition_type=self._spec.partition_type(self._schema)
        )
        return manifest_entry_schema_with_data_file(format_version=format_version, data_file=data_file_type)

    def new_writer(self) -> AvroOutputFile[ManifestEntry]:
        return AvroOutputFile[ManifestEntry](
            output_file=self._output_file,
            file_schema=self._with_partition(self.version),
            record_schema=self._with_partition(DEFAULT_READ_VERSION),
            schema_name="manifest_entry",
            metadata=self._meta,
        )

    @abstractmethod
    def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry: ...

    def to_manifest_file(self) -> ManifestFile:
        """Return the manifest file."""
        # once the manifest file is generated, no more entries can be added
        self.closed = True
        min_sequence_number = self._min_sequence_number or UNASSIGNED_SEQ
        return ManifestFile(
            manifest_path=self._output_file.location,
            manifest_length=len(self._writer.output_file),
            partition_spec_id=self._spec.spec_id,
            content=self.content(),
            sequence_number=UNASSIGNED_SEQ,
            min_sequence_number=min_sequence_number,
            added_snapshot_id=self._snapshot_id,
            added_files_count=self._added_files,
            existing_files_count=self._existing_files,
            deleted_files_count=self._deleted_files,
            added_rows_count=self._added_rows,
            existing_rows_count=self._existing_rows,
            deleted_rows_count=self._deleted_rows,
            partitions=construct_partition_summaries(self._spec, self._schema, self._partitions),
            key_metadata=None,
        )

    def add_entry(self, entry: ManifestEntry) -> ManifestWriter:
        if self.closed:
            raise RuntimeError("Cannot add entry to closed manifest writer")
        if entry.status == ManifestEntryStatus.ADDED:
            self._added_files += 1
            self._added_rows += entry.data_file.record_count
        elif entry.status == ManifestEntryStatus.EXISTING:
            self._existing_files += 1
            self._existing_rows += entry.data_file.record_count
        elif entry.status == ManifestEntryStatus.DELETED:
            self._deleted_files += 1
            self._deleted_rows += entry.data_file.record_count
        else:
            raise ValueError(f"Unknown entry: {entry.status}")

        self._partitions.append(entry.data_file.partition)

        if (
            (entry.status == ManifestEntryStatus.ADDED or entry.status == ManifestEntryStatus.EXISTING)
            and entry.sequence_number is not None
            and (self._min_sequence_number is None or entry.sequence_number < self._min_sequence_number)
        ):
            self._min_sequence_number = entry.sequence_number

        self._writer.write_block([self.prepare_entry(entry)])
        return self

    def add(self, entry: ManifestEntry) -> ManifestWriter:
        if entry.sequence_number is not None and entry.sequence_number >= 0:
            self.add_entry(self._reused_entry_wrapper._wrap_append(self._snapshot_id, entry.sequence_number, entry.data_file))
        else:
            self.add_entry(self._reused_entry_wrapper._wrap_append(self._snapshot_id, None, entry.data_file))
        return self

    def delete(self, entry: ManifestEntry) -> ManifestWriter:
        self.add_entry(
            self._reused_entry_wrapper._wrap_delete(
                self._snapshot_id, entry.sequence_number, entry.file_sequence_number, entry.data_file
            )
        )
        return self

    def existing(self, entry: ManifestEntry) -> ManifestWriter:
        self.add_entry(
            self._reused_entry_wrapper._wrap_existing(
                entry.snapshot_id, entry.sequence_number, entry.file_sequence_number, entry.data_file
            )
        )
        return self

__enter__()

Open the writer.

Source code in pyiceberg/manifest.py
def __enter__(self) -> ManifestWriter:
    """Open the writer."""
    self._writer = self.new_writer()
    self._writer.__enter__()
    return self

__exit__(exc_type, exc_value, traceback)

Close the writer.

Source code in pyiceberg/manifest.py
def __exit__(
    self,
    exc_type: Optional[Type[BaseException]],
    exc_value: Optional[BaseException],
    traceback: Optional[TracebackType],
) -> None:
    """Close the writer."""
    if (self._added_files + self._existing_files + self._deleted_files) == 0:
        # This is just a guard to ensure that we don't write empty manifest files
        raise ValueError("An empty manifest file has been written")

    self.closed = True
    self._writer.__exit__(exc_type, exc_value, traceback)

to_manifest_file()

Return the manifest file.

Source code in pyiceberg/manifest.py
def to_manifest_file(self) -> ManifestFile:
    """Return the manifest file."""
    # once the manifest file is generated, no more entries can be added
    self.closed = True
    min_sequence_number = self._min_sequence_number or UNASSIGNED_SEQ
    return ManifestFile(
        manifest_path=self._output_file.location,
        manifest_length=len(self._writer.output_file),
        partition_spec_id=self._spec.spec_id,
        content=self.content(),
        sequence_number=UNASSIGNED_SEQ,
        min_sequence_number=min_sequence_number,
        added_snapshot_id=self._snapshot_id,
        added_files_count=self._added_files,
        existing_files_count=self._existing_files,
        deleted_files_count=self._deleted_files,
        added_rows_count=self._added_rows,
        existing_rows_count=self._existing_rows,
        deleted_rows_count=self._deleted_rows,
        partitions=construct_partition_summaries(self._spec, self._schema, self._partitions),
        key_metadata=None,
    )

read_manifest_list(input_file)

Read the manifests from the manifest list.

Parameters:

Name Type Description Default
input_file InputFile

The input file where the stream can be read from.

required

Returns:

Type Description
Iterator[ManifestFile]

An iterator of ManifestFiles that are part of the list.

Source code in pyiceberg/manifest.py
def read_manifest_list(input_file: InputFile) -> Iterator[ManifestFile]:
    """
    Read the manifests from the manifest list.

    Args:
        input_file: The input file where the stream can be read from.

    Returns:
        An iterator of ManifestFiles that are part of the list.
    """
    with AvroFile[ManifestFile](
        input_file,
        MANIFEST_LIST_FILE_SCHEMAS[DEFAULT_READ_VERSION],
        read_types={-1: ManifestFile, 508: PartitionFieldSummary},
        read_enums={517: ManifestContent},
    ) as reader:
        yield from reader