Collects position delete files and indexes them by sequence number.
Source code in pyiceberg/table/delete_file_index.py
| class PositionDeletes:
"""Collects position delete files and indexes them by sequence number."""
__slots__ = ("_buffer", "_seqs", "_files")
def __init__(self) -> None:
self._buffer: list[tuple[DataFile, int]] | None = []
self._seqs: list[int] = []
self._files: list[tuple[DataFile, int]] = []
def add(self, delete_file: DataFile, seq_num: int) -> None:
if self._buffer is None:
raise ValueError("Cannot add files after indexing")
self._buffer.append((delete_file, seq_num))
def _ensure_indexed(self) -> None:
if self._buffer is not None:
self._files = sorted(self._buffer, key=lambda file: file[1])
self._seqs = [seq for _, seq in self._files]
self._buffer = None
def filter_by_seq(self, seq: int) -> list[DataFile]:
self._ensure_indexed()
if not self._files:
return []
start_idx = bisect_left(self._seqs, seq)
return [delete_file for delete_file, _ in self._files[start_idx:]]
|