Skip to content

delete_file_index

DeleteFileIndex

Indexes position delete files by partition and by exact data file path.

Source code in pyiceberg/table/delete_file_index.py
class DeleteFileIndex:
    """Indexes position delete files by partition and by exact data file path."""

    def __init__(self) -> None:
        self._by_partition: dict[tuple[int, Record], PositionDeletes] = {}
        self._by_path: dict[str, PositionDeletes] = {}

    def is_empty(self) -> bool:
        return not self._by_partition and not self._by_path

    def add_delete_file(self, manifest_entry: ManifestEntry, partition_key: Record | None = None) -> None:
        delete_file = manifest_entry.data_file
        seq = manifest_entry.sequence_number or INITIAL_SEQUENCE_NUMBER
        target_path = _referenced_data_file_path(delete_file)

        if target_path:
            deletes = self._by_path.setdefault(target_path, PositionDeletes())
            deletes.add(delete_file, seq)
        else:
            key = _partition_key(delete_file.spec_id or 0, partition_key)
            deletes = self._by_partition.setdefault(key, PositionDeletes())
            deletes.add(delete_file, seq)

    def for_data_file(self, seq_num: int, data_file: DataFile, partition_key: Record | None = None) -> set[DataFile]:
        if self.is_empty():
            return set()

        deletes: set[DataFile] = set()
        spec_id = data_file.spec_id or 0

        key = _partition_key(spec_id, partition_key)
        partition_deletes = self._by_partition.get(key)
        if partition_deletes:
            for delete_file in partition_deletes.filter_by_seq(seq_num):
                if _applies_to_data_file(delete_file, data_file):
                    deletes.add(delete_file)

        path_deletes = self._by_path.get(data_file.file_path)
        if path_deletes:
            deletes.update(path_deletes.filter_by_seq(seq_num))

        return deletes

PositionDeletes

Collects position delete files and indexes them by sequence number.

Source code in pyiceberg/table/delete_file_index.py
class PositionDeletes:
    """Collects position delete files and indexes them by sequence number."""

    __slots__ = ("_buffer", "_seqs", "_files")

    def __init__(self) -> None:
        self._buffer: list[tuple[DataFile, int]] | None = []
        self._seqs: list[int] = []
        self._files: list[tuple[DataFile, int]] = []

    def add(self, delete_file: DataFile, seq_num: int) -> None:
        if self._buffer is None:
            raise ValueError("Cannot add files after indexing")
        self._buffer.append((delete_file, seq_num))

    def _ensure_indexed(self) -> None:
        if self._buffer is not None:
            self._files = sorted(self._buffer, key=lambda file: file[1])
            self._seqs = [seq for _, seq in self._files]
            self._buffer = None

    def filter_by_seq(self, seq: int) -> list[DataFile]:
        self._ensure_indexed()
        if not self._files:
            return []
        start_idx = bisect_left(self._seqs, seq)
        return [delete_file for delete_file, _ in self._files[start_idx:]]