Skip to content

pyarrow

FileIO implementation for reading and writing table files that uses pyarrow.fs.

This file contains a FileIO implementation that relies on the filesystem interface provided by PyArrow. It relies on PyArrow's from_uri method that infers the correct filesystem type to use. Theoretically, this allows the supported storage types to grow naturally with the pyarrow library.

ArrowScan

Source code in pyiceberg/io/pyarrow.py
class ArrowScan:
    _table_metadata: TableMetadata
    _io: FileIO
    _projected_schema: Schema
    _bound_row_filter: BooleanExpression
    _case_sensitive: bool
    _limit: Optional[int]
    _downcast_ns_timestamp_to_us: Optional[bool]
    """Scan the Iceberg Table and create an Arrow construct.

    Attributes:
        _table_metadata: Current table metadata of the Iceberg table
        _io: PyIceberg FileIO implementation from which to fetch the io properties
        _projected_schema: Iceberg Schema to project onto the data files
        _bound_row_filter: Schema bound row expression to filter the data with
        _case_sensitive: Case sensitivity when looking up column names
        _limit: Limit the number of records.
    """

    def __init__(
        self,
        table_metadata: TableMetadata,
        io: FileIO,
        projected_schema: Schema,
        row_filter: BooleanExpression,
        case_sensitive: bool = True,
        limit: Optional[int] = None,
    ) -> None:
        self._table_metadata = table_metadata
        self._io = io
        self._projected_schema = projected_schema
        self._bound_row_filter = bind(table_metadata.schema(), row_filter, case_sensitive=case_sensitive)
        self._case_sensitive = case_sensitive
        self._limit = limit
        self._downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE)

    @property
    def _projected_field_ids(self) -> Set[int]:
        """Set of field IDs that should be projected from the data files."""
        return {
            id
            for id in self._projected_schema.field_ids
            if not isinstance(self._projected_schema.find_type(id), (MapType, ListType))
        }.union(extract_field_ids(self._bound_row_filter))

    def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
        """Scan the Iceberg table and return a pa.Table.

        Returns a pa.Table with data from the Iceberg table by resolving the
        right columns that match the current table schema. Only data that
        matches the provided row_filter expression is returned.

        Args:
            tasks: FileScanTasks representing the data files and delete files to read from.

        Returns:
            A PyArrow table. Total number of rows will be capped if specified.

        Raises:
            ResolveError: When a required field cannot be found in the file
            ValueError: When a field type in the file cannot be projected to the schema type
        """
        arrow_schema = schema_to_pyarrow(self._projected_schema, include_field_ids=False)

        batches = self.to_record_batches(tasks)
        try:
            first_batch = next(batches)
        except StopIteration:
            # Empty
            return arrow_schema.empty_table()

        # Note: cannot use pa.Table.from_batches(itertools.chain([first_batch], batches)))
        #       as different batches can use different schema's (due to large_ types)
        result = pa.concat_tables(
            (pa.Table.from_batches([batch]) for batch in itertools.chain([first_batch], batches)), promote_options="permissive"
        )

        if property_as_bool(self._io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, False):
            deprecation_message(
                deprecated_in="0.10.0",
                removed_in="0.11.0",
                help_message=f"Property `{PYARROW_USE_LARGE_TYPES_ON_READ}` will be removed.",
            )
            result = result.cast(arrow_schema)

        return result

    def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.RecordBatch]:
        """Scan the Iceberg table and return an Iterator[pa.RecordBatch].

        Returns an Iterator of pa.RecordBatch with data from the Iceberg table
        by resolving the right columns that match the current table schema.
        Only data that matches the provided row_filter expression is returned.

        Args:
            tasks: FileScanTasks representing the data files and delete files to read from.

        Returns:
            An Iterator of PyArrow RecordBatches.
            Total number of rows will be capped if specified.

        Raises:
            ResolveError: When a required field cannot be found in the file
            ValueError: When a field type in the file cannot be projected to the schema type
        """
        deletes_per_file = _read_all_delete_files(self._io, tasks)

        total_row_count = 0
        executor = ExecutorFactory.get_or_create()

        def batches_for_task(task: FileScanTask) -> List[pa.RecordBatch]:
            # Materialize the iterator here to ensure execution happens within the executor.
            # Otherwise, the iterator would be lazily consumed later (in the main thread),
            # defeating the purpose of using executor.map.
            return list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file))

        limit_reached = False
        for batches in executor.map(batches_for_task, tasks):
            for batch in batches:
                current_batch_size = len(batch)
                if self._limit is not None and total_row_count + current_batch_size >= self._limit:
                    yield batch.slice(0, self._limit - total_row_count)

                    limit_reached = True
                    break
                else:
                    yield batch
                    total_row_count += current_batch_size

            if limit_reached:
                # This break will also cancel all running tasks in the executor
                break

    def _record_batches_from_scan_tasks_and_deletes(
        self, tasks: Iterable[FileScanTask], deletes_per_file: Dict[str, List[ChunkedArray]]
    ) -> Iterator[pa.RecordBatch]:
        total_row_count = 0
        for task in tasks:
            if self._limit is not None and total_row_count >= self._limit:
                break
            batches = _task_to_record_batches(
                self._io,
                task,
                self._bound_row_filter,
                self._projected_schema,
                self._projected_field_ids,
                deletes_per_file.get(task.file.file_path),
                self._case_sensitive,
                self._table_metadata.name_mapping(),
                self._table_metadata.specs().get(task.file.spec_id),
                self._table_metadata.format_version,
                self._downcast_ns_timestamp_to_us,
            )
            for batch in batches:
                if self._limit is not None:
                    if total_row_count >= self._limit:
                        break
                    elif total_row_count + len(batch) >= self._limit:
                        batch = batch.slice(0, self._limit - total_row_count)
                yield batch
                total_row_count += len(batch)

to_record_batches(tasks)

Scan the Iceberg table and return an Iterator[pa.RecordBatch].

Returns an Iterator of pa.RecordBatch with data from the Iceberg table by resolving the right columns that match the current table schema. Only data that matches the provided row_filter expression is returned.

Parameters:

Name Type Description Default
tasks Iterable[FileScanTask]

FileScanTasks representing the data files and delete files to read from.

required

Returns:

Type Description
Iterator[RecordBatch]

An Iterator of PyArrow RecordBatches.

Iterator[RecordBatch]

Total number of rows will be capped if specified.

Raises:

Type Description
ResolveError

When a required field cannot be found in the file

ValueError

When a field type in the file cannot be projected to the schema type

Source code in pyiceberg/io/pyarrow.py
def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.RecordBatch]:
    """Scan the Iceberg table and return an Iterator[pa.RecordBatch].

    Returns an Iterator of pa.RecordBatch with data from the Iceberg table
    by resolving the right columns that match the current table schema.
    Only data that matches the provided row_filter expression is returned.

    Args:
        tasks: FileScanTasks representing the data files and delete files to read from.

    Returns:
        An Iterator of PyArrow RecordBatches.
        Total number of rows will be capped if specified.

    Raises:
        ResolveError: When a required field cannot be found in the file
        ValueError: When a field type in the file cannot be projected to the schema type
    """
    deletes_per_file = _read_all_delete_files(self._io, tasks)

    total_row_count = 0
    executor = ExecutorFactory.get_or_create()

    def batches_for_task(task: FileScanTask) -> List[pa.RecordBatch]:
        # Materialize the iterator here to ensure execution happens within the executor.
        # Otherwise, the iterator would be lazily consumed later (in the main thread),
        # defeating the purpose of using executor.map.
        return list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file))

    limit_reached = False
    for batches in executor.map(batches_for_task, tasks):
        for batch in batches:
            current_batch_size = len(batch)
            if self._limit is not None and total_row_count + current_batch_size >= self._limit:
                yield batch.slice(0, self._limit - total_row_count)

                limit_reached = True
                break
            else:
                yield batch
                total_row_count += current_batch_size

        if limit_reached:
            # This break will also cancel all running tasks in the executor
            break

to_table(tasks)

Scan the Iceberg table and return a pa.Table.

Returns a pa.Table with data from the Iceberg table by resolving the right columns that match the current table schema. Only data that matches the provided row_filter expression is returned.

Parameters:

Name Type Description Default
tasks Iterable[FileScanTask]

FileScanTasks representing the data files and delete files to read from.

required

Returns:

Type Description
Table

A PyArrow table. Total number of rows will be capped if specified.

Raises:

Type Description
ResolveError

When a required field cannot be found in the file

ValueError

When a field type in the file cannot be projected to the schema type

Source code in pyiceberg/io/pyarrow.py
def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
    """Scan the Iceberg table and return a pa.Table.

    Returns a pa.Table with data from the Iceberg table by resolving the
    right columns that match the current table schema. Only data that
    matches the provided row_filter expression is returned.

    Args:
        tasks: FileScanTasks representing the data files and delete files to read from.

    Returns:
        A PyArrow table. Total number of rows will be capped if specified.

    Raises:
        ResolveError: When a required field cannot be found in the file
        ValueError: When a field type in the file cannot be projected to the schema type
    """
    arrow_schema = schema_to_pyarrow(self._projected_schema, include_field_ids=False)

    batches = self.to_record_batches(tasks)
    try:
        first_batch = next(batches)
    except StopIteration:
        # Empty
        return arrow_schema.empty_table()

    # Note: cannot use pa.Table.from_batches(itertools.chain([first_batch], batches)))
    #       as different batches can use different schema's (due to large_ types)
    result = pa.concat_tables(
        (pa.Table.from_batches([batch]) for batch in itertools.chain([first_batch], batches)), promote_options="permissive"
    )

    if property_as_bool(self._io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, False):
        deprecation_message(
            deprecated_in="0.10.0",
            removed_in="0.11.0",
            help_message=f"Property `{PYARROW_USE_LARGE_TYPES_ON_READ}` will be removed.",
        )
        result = result.cast(arrow_schema)

    return result

PyArrowFile

Bases: InputFile, OutputFile

A combined InputFile and OutputFile implementation that uses a pyarrow filesystem to generate pyarrow.lib.NativeFile instances.

Parameters:

Name Type Description Default
location str

A URI or a path to a local file.

required

Attributes:

Name Type Description
location(str)

The URI or path to a local file for a PyArrowFile instance.

Examples:

>>> from pyiceberg.io.pyarrow import PyArrowFile
>>> # input_file = PyArrowFile("s3://foo/bar.txt")
>>> # Read the contents of the PyArrowFile instance
>>> # Make sure that you have permissions to read/write
>>> # file_content = input_file.open().read()
>>> # output_file = PyArrowFile("s3://baz/qux.txt")
>>> # Write bytes to a file
>>> # Make sure that you have permissions to read/write
>>> # output_file.create().write(b'foobytes')
Source code in pyiceberg/io/pyarrow.py
class PyArrowFile(InputFile, OutputFile):
    """A combined InputFile and OutputFile implementation that uses a pyarrow filesystem to generate pyarrow.lib.NativeFile instances.

    Args:
        location (str): A URI or a path to a local file.

    Attributes:
        location(str): The URI or path to a local file for a PyArrowFile instance.

    Examples:
        >>> from pyiceberg.io.pyarrow import PyArrowFile
        >>> # input_file = PyArrowFile("s3://foo/bar.txt")
        >>> # Read the contents of the PyArrowFile instance
        >>> # Make sure that you have permissions to read/write
        >>> # file_content = input_file.open().read()

        >>> # output_file = PyArrowFile("s3://baz/qux.txt")
        >>> # Write bytes to a file
        >>> # Make sure that you have permissions to read/write
        >>> # output_file.create().write(b'foobytes')
    """

    _filesystem: FileSystem
    _path: str
    _buffer_size: int

    def __init__(self, location: str, path: str, fs: FileSystem, buffer_size: int = ONE_MEGABYTE):
        self._filesystem = fs
        self._path = path
        self._buffer_size = buffer_size
        super().__init__(location=location)

    def _file_info(self) -> FileInfo:
        """Retrieve a pyarrow.fs.FileInfo object for the location.

        Raises:
            PermissionError: If the file at self.location cannot be accessed due to a permission error such as
                an AWS error code 15.
        """
        try:
            file_info = self._filesystem.get_file_info(self._path)
        except OSError as e:
            if e.errno == 13 or "AWS Error [code 15]" in str(e):
                raise PermissionError(f"Cannot get file info, access denied: {self.location}") from e
            raise  # pragma: no cover - If some other kind of OSError, raise the raw error

        if file_info.type == FileType.NotFound:
            raise FileNotFoundError(f"Cannot get file info, file not found: {self.location}")
        return file_info

    def __len__(self) -> int:
        """Return the total length of the file, in bytes."""
        file_info = self._file_info()
        return file_info.size

    def exists(self) -> bool:
        """Check whether the location exists."""
        try:
            self._file_info()  # raises FileNotFoundError if it does not exist
            return True
        except FileNotFoundError:
            return False

    def open(self, seekable: bool = True) -> InputStream:
        """Open the location using a PyArrow FileSystem inferred from the location.

        Args:
            seekable: If the stream should support seek, or if it is consumed sequential.

        Returns:
            pyarrow.lib.NativeFile: A NativeFile instance for the file located at `self.location`.

        Raises:
            FileNotFoundError: If the file at self.location does not exist.
            PermissionError: If the file at self.location cannot be accessed due to a permission error such as
                an AWS error code 15.
        """
        try:
            if seekable:
                input_file = self._filesystem.open_input_file(self._path)
            else:
                input_file = self._filesystem.open_input_stream(self._path, buffer_size=self._buffer_size)
        except (FileNotFoundError, PermissionError):
            raise
        except OSError as e:
            if e.errno == 2 or "Path does not exist" in str(e):
                raise FileNotFoundError(f"Cannot open file, does not exist: {self.location}") from e
            elif e.errno == 13 or "AWS Error [code 15]" in str(e):
                raise PermissionError(f"Cannot open file, access denied: {self.location}") from e
            raise  # pragma: no cover - If some other kind of OSError, raise the raw error
        return input_file

    def create(self, overwrite: bool = False) -> OutputStream:
        """Create a writable pyarrow.lib.NativeFile for this PyArrowFile's location.

        Args:
            overwrite (bool): Whether to overwrite the file if it already exists.

        Returns:
            pyarrow.lib.NativeFile: A NativeFile instance for the file located at self.location.

        Raises:
            FileExistsError: If the file already exists at `self.location` and `overwrite` is False.

        Note:
            This retrieves a pyarrow NativeFile by opening an output stream. If overwrite is set to False,
            a check is first performed to verify that the file does not exist. This is not thread-safe and
            a possibility does exist that the file can be created by a concurrent process after the existence
            check yet before the output stream is created. In such a case, the default pyarrow behavior will
            truncate the contents of the existing file when opening the output stream.
        """
        try:
            if not overwrite and self.exists() is True:
                raise FileExistsError(f"Cannot create file, already exists: {self.location}")
            output_file = self._filesystem.open_output_stream(self._path, buffer_size=self._buffer_size)
        except PermissionError:
            raise
        except OSError as e:
            if e.errno == 13 or "AWS Error [code 15]" in str(e):
                raise PermissionError(f"Cannot create file, access denied: {self.location}") from e
            raise  # pragma: no cover - If some other kind of OSError, raise the raw error
        return output_file

    def to_input_file(self) -> PyArrowFile:
        """Return a new PyArrowFile for the location of an existing PyArrowFile instance.

        This method is included to abide by the OutputFile abstract base class. Since this implementation uses a single
        PyArrowFile class (as opposed to separate InputFile and OutputFile implementations), this method effectively returns
        a copy of the same instance.
        """
        return self

__len__()

Return the total length of the file, in bytes.

Source code in pyiceberg/io/pyarrow.py
def __len__(self) -> int:
    """Return the total length of the file, in bytes."""
    file_info = self._file_info()
    return file_info.size

create(overwrite=False)

Create a writable pyarrow.lib.NativeFile for this PyArrowFile's location.

Parameters:

Name Type Description Default
overwrite bool

Whether to overwrite the file if it already exists.

False

Returns:

Type Description
OutputStream

pyarrow.lib.NativeFile: A NativeFile instance for the file located at self.location.

Raises:

Type Description
FileExistsError

If the file already exists at self.location and overwrite is False.

Note

This retrieves a pyarrow NativeFile by opening an output stream. If overwrite is set to False, a check is first performed to verify that the file does not exist. This is not thread-safe and a possibility does exist that the file can be created by a concurrent process after the existence check yet before the output stream is created. In such a case, the default pyarrow behavior will truncate the contents of the existing file when opening the output stream.

Source code in pyiceberg/io/pyarrow.py
def create(self, overwrite: bool = False) -> OutputStream:
    """Create a writable pyarrow.lib.NativeFile for this PyArrowFile's location.

    Args:
        overwrite (bool): Whether to overwrite the file if it already exists.

    Returns:
        pyarrow.lib.NativeFile: A NativeFile instance for the file located at self.location.

    Raises:
        FileExistsError: If the file already exists at `self.location` and `overwrite` is False.

    Note:
        This retrieves a pyarrow NativeFile by opening an output stream. If overwrite is set to False,
        a check is first performed to verify that the file does not exist. This is not thread-safe and
        a possibility does exist that the file can be created by a concurrent process after the existence
        check yet before the output stream is created. In such a case, the default pyarrow behavior will
        truncate the contents of the existing file when opening the output stream.
    """
    try:
        if not overwrite and self.exists() is True:
            raise FileExistsError(f"Cannot create file, already exists: {self.location}")
        output_file = self._filesystem.open_output_stream(self._path, buffer_size=self._buffer_size)
    except PermissionError:
        raise
    except OSError as e:
        if e.errno == 13 or "AWS Error [code 15]" in str(e):
            raise PermissionError(f"Cannot create file, access denied: {self.location}") from e
        raise  # pragma: no cover - If some other kind of OSError, raise the raw error
    return output_file

exists()

Check whether the location exists.

Source code in pyiceberg/io/pyarrow.py
def exists(self) -> bool:
    """Check whether the location exists."""
    try:
        self._file_info()  # raises FileNotFoundError if it does not exist
        return True
    except FileNotFoundError:
        return False

open(seekable=True)

Open the location using a PyArrow FileSystem inferred from the location.

Parameters:

Name Type Description Default
seekable bool

If the stream should support seek, or if it is consumed sequential.

True

Returns:

Type Description
InputStream

pyarrow.lib.NativeFile: A NativeFile instance for the file located at self.location.

Raises:

Type Description
FileNotFoundError

If the file at self.location does not exist.

PermissionError

If the file at self.location cannot be accessed due to a permission error such as an AWS error code 15.

Source code in pyiceberg/io/pyarrow.py
def open(self, seekable: bool = True) -> InputStream:
    """Open the location using a PyArrow FileSystem inferred from the location.

    Args:
        seekable: If the stream should support seek, or if it is consumed sequential.

    Returns:
        pyarrow.lib.NativeFile: A NativeFile instance for the file located at `self.location`.

    Raises:
        FileNotFoundError: If the file at self.location does not exist.
        PermissionError: If the file at self.location cannot be accessed due to a permission error such as
            an AWS error code 15.
    """
    try:
        if seekable:
            input_file = self._filesystem.open_input_file(self._path)
        else:
            input_file = self._filesystem.open_input_stream(self._path, buffer_size=self._buffer_size)
    except (FileNotFoundError, PermissionError):
        raise
    except OSError as e:
        if e.errno == 2 or "Path does not exist" in str(e):
            raise FileNotFoundError(f"Cannot open file, does not exist: {self.location}") from e
        elif e.errno == 13 or "AWS Error [code 15]" in str(e):
            raise PermissionError(f"Cannot open file, access denied: {self.location}") from e
        raise  # pragma: no cover - If some other kind of OSError, raise the raw error
    return input_file

to_input_file()

Return a new PyArrowFile for the location of an existing PyArrowFile instance.

This method is included to abide by the OutputFile abstract base class. Since this implementation uses a single PyArrowFile class (as opposed to separate InputFile and OutputFile implementations), this method effectively returns a copy of the same instance.

Source code in pyiceberg/io/pyarrow.py
def to_input_file(self) -> PyArrowFile:
    """Return a new PyArrowFile for the location of an existing PyArrowFile instance.

    This method is included to abide by the OutputFile abstract base class. Since this implementation uses a single
    PyArrowFile class (as opposed to separate InputFile and OutputFile implementations), this method effectively returns
    a copy of the same instance.
    """
    return self

PyArrowFileIO

Bases: FileIO

Source code in pyiceberg/io/pyarrow.py
class PyArrowFileIO(FileIO):
    fs_by_scheme: Callable[[str, Optional[str]], FileSystem]

    def __init__(self, properties: Properties = EMPTY_DICT):
        self.fs_by_scheme: Callable[[str, Optional[str]], FileSystem] = lru_cache(self._initialize_fs)
        super().__init__(properties=properties)

    @staticmethod
    def parse_location(location: str, properties: Properties = EMPTY_DICT) -> Tuple[str, str, str]:
        """Return (scheme, netloc, path) for the given location.

        Uses DEFAULT_SCHEME and DEFAULT_NETLOC if scheme/netloc are missing.
        """
        uri = urlparse(location)

        if not uri.scheme:
            default_scheme = properties.get("DEFAULT_SCHEME", "file")
            default_netloc = properties.get("DEFAULT_NETLOC", "")
            return default_scheme, default_netloc, os.path.abspath(location)
        elif uri.scheme in ("hdfs", "viewfs"):
            return uri.scheme, uri.netloc, uri.path
        else:
            return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}"

    def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSystem:
        """Initialize FileSystem for different scheme."""
        if scheme in {"oss"}:
            return self._initialize_oss_fs()

        elif scheme in {"s3", "s3a", "s3n"}:
            return self._initialize_s3_fs(netloc)

        elif scheme in {"hdfs", "viewfs"}:
            return self._initialize_hdfs_fs(scheme, netloc)

        elif scheme in {"gs", "gcs"}:
            return self._initialize_gcs_fs()

        elif scheme in {"abfs", "abfss", "wasb", "wasbs"}:
            return self._initialize_azure_fs()

        elif scheme in {"file"}:
            return self._initialize_local_fs()

        else:
            raise ValueError(f"Unrecognized filesystem type in URI: {scheme}")

    def _initialize_oss_fs(self) -> FileSystem:
        from pyarrow.fs import S3FileSystem

        client_kwargs: Dict[str, Any] = {
            "endpoint_override": self.properties.get(S3_ENDPOINT),
            "access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
            "secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
            "session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN),
            "region": get_first_property_value(self.properties, S3_REGION, AWS_REGION),
            "force_virtual_addressing": property_as_bool(self.properties, S3_FORCE_VIRTUAL_ADDRESSING, True),
        }

        if proxy_uri := self.properties.get(S3_PROXY_URI):
            client_kwargs["proxy_options"] = proxy_uri

        if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT):
            client_kwargs["connect_timeout"] = float(connect_timeout)

        if request_timeout := self.properties.get(S3_REQUEST_TIMEOUT):
            client_kwargs["request_timeout"] = float(request_timeout)

        if role_arn := get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN):
            client_kwargs["role_arn"] = role_arn

        if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME):
            client_kwargs["session_name"] = session_name

        if s3_anonymous := self.properties.get(S3_ANONYMOUS):
            client_kwargs["anonymous"] = strtobool(s3_anonymous)

        return S3FileSystem(**client_kwargs)

    def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem:
        from pyarrow.fs import S3FileSystem

        provided_region = get_first_property_value(self.properties, S3_REGION, AWS_REGION)

        # Do this when we don't provide the region at all, or when we explicitly enable it
        if provided_region is None or property_as_bool(self.properties, S3_RESOLVE_REGION, False) is True:
            # Resolve region from netloc(bucket), fallback to user-provided region
            # Only supported by buckets hosted by S3
            bucket_region = _cached_resolve_s3_region(bucket=netloc) or provided_region
            if provided_region is not None and bucket_region != provided_region:
                logger.warning(
                    f"PyArrow FileIO overriding S3 bucket region for bucket {netloc}: "
                    f"provided region {provided_region}, actual region {bucket_region}"
                )
        else:
            bucket_region = provided_region

        client_kwargs: Dict[str, Any] = {
            "endpoint_override": self.properties.get(S3_ENDPOINT),
            "access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
            "secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
            "session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN),
            "region": bucket_region,
        }

        if proxy_uri := self.properties.get(S3_PROXY_URI):
            client_kwargs["proxy_options"] = proxy_uri

        if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT):
            client_kwargs["connect_timeout"] = float(connect_timeout)

        if request_timeout := self.properties.get(S3_REQUEST_TIMEOUT):
            client_kwargs["request_timeout"] = float(request_timeout)

        if role_arn := get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN):
            client_kwargs["role_arn"] = role_arn

        if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME):
            client_kwargs["session_name"] = session_name

        if self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING) is not None:
            client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, S3_FORCE_VIRTUAL_ADDRESSING, False)

        if (retry_strategy_impl := self.properties.get(S3_RETRY_STRATEGY_IMPL)) and (
            retry_instance := _import_retry_strategy(retry_strategy_impl)
        ):
            client_kwargs["retry_strategy"] = retry_instance

        if s3_anonymous := self.properties.get(S3_ANONYMOUS):
            client_kwargs["anonymous"] = strtobool(s3_anonymous)

        return S3FileSystem(**client_kwargs)

    def _initialize_azure_fs(self) -> FileSystem:
        # https://arrow.apache.org/docs/python/generated/pyarrow.fs.AzureFileSystem.html
        from packaging import version

        MIN_PYARROW_VERSION_SUPPORTING_AZURE_FS = "20.0.0"
        if version.parse(pyarrow.__version__) < version.parse(MIN_PYARROW_VERSION_SUPPORTING_AZURE_FS):
            raise ImportError(
                f"pyarrow version >= {MIN_PYARROW_VERSION_SUPPORTING_AZURE_FS} required for AzureFileSystem support, "
                f"but found version {pyarrow.__version__}."
            )

        from pyarrow.fs import AzureFileSystem

        client_kwargs: Dict[str, str] = {}

        if account_name := self.properties.get(ADLS_ACCOUNT_NAME):
            client_kwargs["account_name"] = account_name

        if account_key := self.properties.get(ADLS_ACCOUNT_KEY):
            client_kwargs["account_key"] = account_key

        if blob_storage_authority := self.properties.get(ADLS_BLOB_STORAGE_AUTHORITY):
            client_kwargs["blob_storage_authority"] = blob_storage_authority

        if dfs_storage_authority := self.properties.get(ADLS_DFS_STORAGE_AUTHORITY):
            client_kwargs["dfs_storage_authority"] = dfs_storage_authority

        if blob_storage_scheme := self.properties.get(ADLS_BLOB_STORAGE_SCHEME):
            client_kwargs["blob_storage_scheme"] = blob_storage_scheme

        if dfs_storage_scheme := self.properties.get(ADLS_DFS_STORAGE_SCHEME):
            client_kwargs["dfs_storage_scheme"] = dfs_storage_scheme

        if sas_token := self.properties.get(ADLS_SAS_TOKEN):
            client_kwargs["sas_token"] = sas_token

        if client_id := self.properties.get(ADLS_CLIENT_ID):
            client_kwargs["client_id"] = client_id
        if client_secret := self.properties.get(ADLS_CLIENT_SECRET):
            client_kwargs["client_secret"] = client_secret
        if tenant_id := self.properties.get(ADLS_TENANT_ID):
            client_kwargs["tenant_id"] = tenant_id

        # Validate that all three are provided together for ClientSecretCredential
        credential_keys = ["client_id", "client_secret", "tenant_id"]
        provided_keys = [key for key in credential_keys if key in client_kwargs]
        if provided_keys and len(provided_keys) != len(credential_keys):
            missing_keys = [key for key in credential_keys if key not in client_kwargs]
            raise ValueError(
                f"client_id, client_secret, and tenant_id must all be provided together "
                f"to use ClientSecretCredential for Azure authentication. "
                f"Provided: {provided_keys}, Missing: {missing_keys}"
            )

        return AzureFileSystem(**client_kwargs)

    def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem:
        from pyarrow.fs import HadoopFileSystem

        hdfs_kwargs: Dict[str, Any] = {}
        if netloc:
            return HadoopFileSystem.from_uri(f"{scheme}://{netloc}")
        if host := self.properties.get(HDFS_HOST):
            hdfs_kwargs["host"] = host
        if port := self.properties.get(HDFS_PORT):
            # port should be an integer type
            hdfs_kwargs["port"] = int(port)
        if user := self.properties.get(HDFS_USER):
            hdfs_kwargs["user"] = user
        if kerb_ticket := self.properties.get(HDFS_KERB_TICKET):
            hdfs_kwargs["kerb_ticket"] = kerb_ticket

        return HadoopFileSystem(**hdfs_kwargs)

    def _initialize_gcs_fs(self) -> FileSystem:
        from pyarrow.fs import GcsFileSystem

        gcs_kwargs: Dict[str, Any] = {}
        if access_token := self.properties.get(GCS_TOKEN):
            gcs_kwargs["access_token"] = access_token
        if expiration := self.properties.get(GCS_TOKEN_EXPIRES_AT_MS):
            gcs_kwargs["credential_token_expiration"] = millis_to_datetime(int(expiration))
        if bucket_location := self.properties.get(GCS_DEFAULT_LOCATION):
            gcs_kwargs["default_bucket_location"] = bucket_location
        if endpoint := self.properties.get(GCS_SERVICE_HOST):
            url_parts = urlparse(endpoint)
            gcs_kwargs["scheme"] = url_parts.scheme
            gcs_kwargs["endpoint_override"] = url_parts.netloc

        return GcsFileSystem(**gcs_kwargs)

    def _initialize_local_fs(self) -> FileSystem:
        return PyArrowLocalFileSystem()

    def new_input(self, location: str) -> PyArrowFile:
        """Get a PyArrowFile instance to read bytes from the file at the given location.

        Args:
            location (str): A URI or a path to a local file.

        Returns:
            PyArrowFile: A PyArrowFile instance for the given location.
        """
        scheme, netloc, path = self.parse_location(location, self.properties)
        return PyArrowFile(
            fs=self.fs_by_scheme(scheme, netloc),
            location=location,
            path=path,
            buffer_size=int(self.properties.get(BUFFER_SIZE, ONE_MEGABYTE)),
        )

    def new_output(self, location: str) -> PyArrowFile:
        """Get a PyArrowFile instance to write bytes to the file at the given location.

        Args:
            location (str): A URI or a path to a local file.

        Returns:
            PyArrowFile: A PyArrowFile instance for the given location.
        """
        scheme, netloc, path = self.parse_location(location, self.properties)
        return PyArrowFile(
            fs=self.fs_by_scheme(scheme, netloc),
            location=location,
            path=path,
            buffer_size=int(self.properties.get(BUFFER_SIZE, ONE_MEGABYTE)),
        )

    def delete(self, location: Union[str, InputFile, OutputFile]) -> None:
        """Delete the file at the given location.

        Args:
            location (Union[str, InputFile, OutputFile]): The URI to the file--if an InputFile instance or an OutputFile instance is provided,
                the location attribute for that instance is used as the location to delete.

        Raises:
            FileNotFoundError: When the file at the provided location does not exist.
            PermissionError: If the file at the provided location cannot be accessed due to a permission error such as
                an AWS error code 15.
        """
        str_location = location.location if isinstance(location, (InputFile, OutputFile)) else location
        scheme, netloc, path = self.parse_location(str_location, self.properties)
        fs = self.fs_by_scheme(scheme, netloc)

        try:
            fs.delete_file(path)
        except FileNotFoundError:
            raise
        except PermissionError:
            raise
        except OSError as e:
            if e.errno == 2 or "Path does not exist" in str(e):
                raise FileNotFoundError(f"Cannot delete file, does not exist: {location}") from e
            elif e.errno == 13 or "AWS Error [code 15]" in str(e):
                raise PermissionError(f"Cannot delete file, access denied: {location}") from e
            raise  # pragma: no cover - If some other kind of OSError, raise the raw error

    def __getstate__(self) -> Dict[str, Any]:
        """Create a dictionary of the PyArrowFileIO fields used when pickling."""
        fileio_copy = copy(self.__dict__)
        fileio_copy["fs_by_scheme"] = None
        return fileio_copy

    def __setstate__(self, state: Dict[str, Any]) -> None:
        """Deserialize the state into a PyArrowFileIO instance."""
        self.__dict__ = state
        self.fs_by_scheme = lru_cache(self._initialize_fs)

__getstate__()

Create a dictionary of the PyArrowFileIO fields used when pickling.

Source code in pyiceberg/io/pyarrow.py
def __getstate__(self) -> Dict[str, Any]:
    """Create a dictionary of the PyArrowFileIO fields used when pickling."""
    fileio_copy = copy(self.__dict__)
    fileio_copy["fs_by_scheme"] = None
    return fileio_copy

__setstate__(state)

Deserialize the state into a PyArrowFileIO instance.

Source code in pyiceberg/io/pyarrow.py
def __setstate__(self, state: Dict[str, Any]) -> None:
    """Deserialize the state into a PyArrowFileIO instance."""
    self.__dict__ = state
    self.fs_by_scheme = lru_cache(self._initialize_fs)

delete(location)

Delete the file at the given location.

Parameters:

Name Type Description Default
location Union[str, InputFile, OutputFile]

The URI to the file--if an InputFile instance or an OutputFile instance is provided, the location attribute for that instance is used as the location to delete.

required

Raises:

Type Description
FileNotFoundError

When the file at the provided location does not exist.

PermissionError

If the file at the provided location cannot be accessed due to a permission error such as an AWS error code 15.

Source code in pyiceberg/io/pyarrow.py
def delete(self, location: Union[str, InputFile, OutputFile]) -> None:
    """Delete the file at the given location.

    Args:
        location (Union[str, InputFile, OutputFile]): The URI to the file--if an InputFile instance or an OutputFile instance is provided,
            the location attribute for that instance is used as the location to delete.

    Raises:
        FileNotFoundError: When the file at the provided location does not exist.
        PermissionError: If the file at the provided location cannot be accessed due to a permission error such as
            an AWS error code 15.
    """
    str_location = location.location if isinstance(location, (InputFile, OutputFile)) else location
    scheme, netloc, path = self.parse_location(str_location, self.properties)
    fs = self.fs_by_scheme(scheme, netloc)

    try:
        fs.delete_file(path)
    except FileNotFoundError:
        raise
    except PermissionError:
        raise
    except OSError as e:
        if e.errno == 2 or "Path does not exist" in str(e):
            raise FileNotFoundError(f"Cannot delete file, does not exist: {location}") from e
        elif e.errno == 13 or "AWS Error [code 15]" in str(e):
            raise PermissionError(f"Cannot delete file, access denied: {location}") from e
        raise  # pragma: no cover - If some other kind of OSError, raise the raw error

new_input(location)

Get a PyArrowFile instance to read bytes from the file at the given location.

Parameters:

Name Type Description Default
location str

A URI or a path to a local file.

required

Returns:

Name Type Description
PyArrowFile PyArrowFile

A PyArrowFile instance for the given location.

Source code in pyiceberg/io/pyarrow.py
def new_input(self, location: str) -> PyArrowFile:
    """Get a PyArrowFile instance to read bytes from the file at the given location.

    Args:
        location (str): A URI or a path to a local file.

    Returns:
        PyArrowFile: A PyArrowFile instance for the given location.
    """
    scheme, netloc, path = self.parse_location(location, self.properties)
    return PyArrowFile(
        fs=self.fs_by_scheme(scheme, netloc),
        location=location,
        path=path,
        buffer_size=int(self.properties.get(BUFFER_SIZE, ONE_MEGABYTE)),
    )

new_output(location)

Get a PyArrowFile instance to write bytes to the file at the given location.

Parameters:

Name Type Description Default
location str

A URI or a path to a local file.

required

Returns:

Name Type Description
PyArrowFile PyArrowFile

A PyArrowFile instance for the given location.

Source code in pyiceberg/io/pyarrow.py
def new_output(self, location: str) -> PyArrowFile:
    """Get a PyArrowFile instance to write bytes to the file at the given location.

    Args:
        location (str): A URI or a path to a local file.

    Returns:
        PyArrowFile: A PyArrowFile instance for the given location.
    """
    scheme, netloc, path = self.parse_location(location, self.properties)
    return PyArrowFile(
        fs=self.fs_by_scheme(scheme, netloc),
        location=location,
        path=path,
        buffer_size=int(self.properties.get(BUFFER_SIZE, ONE_MEGABYTE)),
    )

parse_location(location, properties=EMPTY_DICT) staticmethod

Return (scheme, netloc, path) for the given location.

Uses DEFAULT_SCHEME and DEFAULT_NETLOC if scheme/netloc are missing.

Source code in pyiceberg/io/pyarrow.py
@staticmethod
def parse_location(location: str, properties: Properties = EMPTY_DICT) -> Tuple[str, str, str]:
    """Return (scheme, netloc, path) for the given location.

    Uses DEFAULT_SCHEME and DEFAULT_NETLOC if scheme/netloc are missing.
    """
    uri = urlparse(location)

    if not uri.scheme:
        default_scheme = properties.get("DEFAULT_SCHEME", "file")
        default_netloc = properties.get("DEFAULT_NETLOC", "")
        return default_scheme, default_netloc, os.path.abspath(location)
    elif uri.scheme in ("hdfs", "viewfs"):
        return uri.scheme, uri.netloc, uri.path
    else:
        return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}"

PyArrowSchemaVisitor

Bases: Generic[T], ABC

Source code in pyiceberg/io/pyarrow.py
class PyArrowSchemaVisitor(Generic[T], ABC):
    def before_field(self, field: pa.Field) -> None:
        """Override this method to perform an action immediately before visiting a field."""

    def after_field(self, field: pa.Field) -> None:
        """Override this method to perform an action immediately after visiting a field."""

    def before_list_element(self, element: pa.Field) -> None:
        """Override this method to perform an action immediately before visiting an element within a ListType."""

    def after_list_element(self, element: pa.Field) -> None:
        """Override this method to perform an action immediately after visiting an element within a ListType."""

    def before_map_key(self, key: pa.Field) -> None:
        """Override this method to perform an action immediately before visiting a key within a MapType."""

    def after_map_key(self, key: pa.Field) -> None:
        """Override this method to perform an action immediately after visiting a key within a MapType."""

    def before_map_value(self, value: pa.Field) -> None:
        """Override this method to perform an action immediately before visiting a value within a MapType."""

    def after_map_value(self, value: pa.Field) -> None:
        """Override this method to perform an action immediately after visiting a value within a MapType."""

    @abstractmethod
    def schema(self, schema: pa.Schema, struct_result: T) -> T:
        """Visit a schema."""

    @abstractmethod
    def struct(self, struct: pa.StructType, field_results: List[T]) -> T:
        """Visit a struct."""

    @abstractmethod
    def field(self, field: pa.Field, field_result: T) -> T:
        """Visit a field."""

    @abstractmethod
    def list(self, list_type: pa.ListType, element_result: T) -> T:
        """Visit a list."""

    @abstractmethod
    def map(self, map_type: pa.MapType, key_result: T, value_result: T) -> T:
        """Visit a map."""

    @abstractmethod
    def primitive(self, primitive: pa.DataType) -> T:
        """Visit a primitive type."""

after_field(field)

Override this method to perform an action immediately after visiting a field.

Source code in pyiceberg/io/pyarrow.py
def after_field(self, field: pa.Field) -> None:
    """Override this method to perform an action immediately after visiting a field."""

after_list_element(element)

Override this method to perform an action immediately after visiting an element within a ListType.

Source code in pyiceberg/io/pyarrow.py
def after_list_element(self, element: pa.Field) -> None:
    """Override this method to perform an action immediately after visiting an element within a ListType."""

after_map_key(key)

Override this method to perform an action immediately after visiting a key within a MapType.

Source code in pyiceberg/io/pyarrow.py
def after_map_key(self, key: pa.Field) -> None:
    """Override this method to perform an action immediately after visiting a key within a MapType."""

after_map_value(value)

Override this method to perform an action immediately after visiting a value within a MapType.

Source code in pyiceberg/io/pyarrow.py
def after_map_value(self, value: pa.Field) -> None:
    """Override this method to perform an action immediately after visiting a value within a MapType."""

before_field(field)

Override this method to perform an action immediately before visiting a field.

Source code in pyiceberg/io/pyarrow.py
def before_field(self, field: pa.Field) -> None:
    """Override this method to perform an action immediately before visiting a field."""

before_list_element(element)

Override this method to perform an action immediately before visiting an element within a ListType.

Source code in pyiceberg/io/pyarrow.py
def before_list_element(self, element: pa.Field) -> None:
    """Override this method to perform an action immediately before visiting an element within a ListType."""

before_map_key(key)

Override this method to perform an action immediately before visiting a key within a MapType.

Source code in pyiceberg/io/pyarrow.py
def before_map_key(self, key: pa.Field) -> None:
    """Override this method to perform an action immediately before visiting a key within a MapType."""

before_map_value(value)

Override this method to perform an action immediately before visiting a value within a MapType.

Source code in pyiceberg/io/pyarrow.py
def before_map_value(self, value: pa.Field) -> None:
    """Override this method to perform an action immediately before visiting a value within a MapType."""

field(field, field_result) abstractmethod

Visit a field.

Source code in pyiceberg/io/pyarrow.py
@abstractmethod
def field(self, field: pa.Field, field_result: T) -> T:
    """Visit a field."""

list(list_type, element_result) abstractmethod

Visit a list.

Source code in pyiceberg/io/pyarrow.py
@abstractmethod
def list(self, list_type: pa.ListType, element_result: T) -> T:
    """Visit a list."""

map(map_type, key_result, value_result) abstractmethod

Visit a map.

Source code in pyiceberg/io/pyarrow.py
@abstractmethod
def map(self, map_type: pa.MapType, key_result: T, value_result: T) -> T:
    """Visit a map."""

primitive(primitive) abstractmethod

Visit a primitive type.

Source code in pyiceberg/io/pyarrow.py
@abstractmethod
def primitive(self, primitive: pa.DataType) -> T:
    """Visit a primitive type."""

schema(schema, struct_result) abstractmethod

Visit a schema.

Source code in pyiceberg/io/pyarrow.py
@abstractmethod
def schema(self, schema: pa.Schema, struct_result: T) -> T:
    """Visit a schema."""

struct(struct, field_results) abstractmethod

Visit a struct.

Source code in pyiceberg/io/pyarrow.py
@abstractmethod
def struct(self, struct: pa.StructType, field_results: List[T]) -> T:
    """Visit a struct."""

UnsupportedPyArrowTypeException

Bases: Exception

Cannot convert PyArrow type to corresponding Iceberg type.

Source code in pyiceberg/io/pyarrow.py
class UnsupportedPyArrowTypeException(Exception):
    """Cannot convert PyArrow type to corresponding Iceberg type."""

    def __init__(self, field: pa.Field, *args: Any):
        self.field = field
        super().__init__(*args)

compute_statistics_plan(schema, table_properties)

Compute the statistics plan for all columns.

The resulting list is assumed to have the same length and same order as the columns in the pyarrow table. This allows the list to map from the column index to the Iceberg column ID. For each element, the desired metrics collection that was provided by the user in the configuration is computed and then adjusted according to the data type of the column. For nested columns the minimum and maximum values are not computed. And truncation is only applied to text of binary strings.

Parameters:

Name Type Description Default
table_properties from pyiceberg.table.metadata.TableMetadata

The Iceberg table metadata properties. They are required to compute the mapping of column position to iceberg schema type id. It's also used to set the mode for column metrics collection

required
Source code in pyiceberg/io/pyarrow.py
def compute_statistics_plan(
    schema: Schema,
    table_properties: Dict[str, str],
) -> Dict[int, StatisticsCollector]:
    """
    Compute the statistics plan for all columns.

    The resulting list is assumed to have the same length and same order as the columns in the pyarrow table.
    This allows the list to map from the column index to the Iceberg column ID.
    For each element, the desired metrics collection that was provided by the user in the configuration
    is computed and then adjusted according to the data type of the column. For nested columns the minimum
    and maximum values are not computed. And truncation is only applied to text of binary strings.

    Args:
        table_properties (from pyiceberg.table.metadata.TableMetadata): The Iceberg table metadata properties.
            They are required to compute the mapping of column position to iceberg schema type id. It's also
            used to set the mode for column metrics collection
    """
    stats_cols = pre_order_visit(schema, PyArrowStatisticsCollector(schema, table_properties))
    result: Dict[int, StatisticsCollector] = {}
    for stats_col in stats_cols:
        result[stats_col.field_id] = stats_col
    return result

data_file_statistics_from_parquet_metadata(parquet_metadata, stats_columns, parquet_column_mapping)

Compute and return DataFileStatistics that includes the following.

  • record_count
  • column_sizes
  • value_counts
  • null_value_counts
  • nan_value_counts
  • column_aggregates
  • split_offsets

Parameters:

Name Type Description Default
parquet_metadata FileMetaData

A pyarrow metadata object.

required
stats_columns Dict[int, StatisticsCollector]

The statistics gathering plan. It is required to set the mode for column metrics collection

required
parquet_column_mapping Dict[str, int]

The mapping of the parquet file name to the field ID

required
Source code in pyiceberg/io/pyarrow.py
def data_file_statistics_from_parquet_metadata(
    parquet_metadata: pq.FileMetaData,
    stats_columns: Dict[int, StatisticsCollector],
    parquet_column_mapping: Dict[str, int],
) -> DataFileStatistics:
    """
    Compute and return DataFileStatistics that includes the following.

    - record_count
    - column_sizes
    - value_counts
    - null_value_counts
    - nan_value_counts
    - column_aggregates
    - split_offsets

    Args:
        parquet_metadata (pyarrow.parquet.FileMetaData): A pyarrow metadata object.
        stats_columns (Dict[int, StatisticsCollector]): The statistics gathering plan. It is required to
            set the mode for column metrics collection
        parquet_column_mapping (Dict[str, int]): The mapping of the parquet file name to the field ID
    """
    column_sizes: Dict[int, int] = {}
    value_counts: Dict[int, int] = {}
    split_offsets: List[int] = []

    null_value_counts: Dict[int, int] = {}
    nan_value_counts: Dict[int, int] = {}

    col_aggs = {}

    invalidate_col: Set[int] = set()
    for r in range(parquet_metadata.num_row_groups):
        # References:
        # https://github.com/apache/iceberg/blob/fc381a81a1fdb8f51a0637ca27cd30673bd7aad3/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java#L232
        # https://github.com/apache/parquet-mr/blob/ac29db4611f86a07cc6877b416aa4b183e09b353/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java#L184

        row_group = parquet_metadata.row_group(r)

        data_offset = row_group.column(0).data_page_offset
        dictionary_offset = row_group.column(0).dictionary_page_offset

        if row_group.column(0).has_dictionary_page and dictionary_offset < data_offset:
            split_offsets.append(dictionary_offset)
        else:
            split_offsets.append(data_offset)

        for pos in range(parquet_metadata.num_columns):
            column = row_group.column(pos)
            field_id = parquet_column_mapping[column.path_in_schema]

            stats_col = stats_columns[field_id]

            column_sizes.setdefault(field_id, 0)
            column_sizes[field_id] += column.total_compressed_size

            if stats_col.mode == MetricsMode(MetricModeTypes.NONE):
                continue

            value_counts[field_id] = value_counts.get(field_id, 0) + column.num_values

            if column.is_stats_set:
                try:
                    statistics = column.statistics

                    if statistics.has_null_count:
                        null_value_counts[field_id] = null_value_counts.get(field_id, 0) + statistics.null_count

                    if stats_col.mode == MetricsMode(MetricModeTypes.COUNTS):
                        continue

                    if field_id not in col_aggs:
                        try:
                            col_aggs[field_id] = StatsAggregator(
                                stats_col.iceberg_type, statistics.physical_type, stats_col.mode.length
                            )
                        except ValueError as e:
                            raise ValueError(f"{e} for column '{stats_col.column_name}'") from e

                    if isinstance(stats_col.iceberg_type, DecimalType) and statistics.physical_type != "FIXED_LEN_BYTE_ARRAY":
                        scale = stats_col.iceberg_type.scale
                        col_aggs[field_id].update_min(
                            unscaled_to_decimal(statistics.min_raw, scale)
                        ) if statistics.min_raw is not None else None
                        col_aggs[field_id].update_max(
                            unscaled_to_decimal(statistics.max_raw, scale)
                        ) if statistics.max_raw is not None else None
                    else:
                        col_aggs[field_id].update_min(statistics.min)
                        col_aggs[field_id].update_max(statistics.max)

                except pyarrow.lib.ArrowNotImplementedError as e:
                    invalidate_col.add(field_id)
                    logger.warning(e)
            else:
                invalidate_col.add(field_id)
                logger.warning("PyArrow statistics missing for column %d when writing file", pos)

    split_offsets.sort()

    for field_id in invalidate_col:
        col_aggs.pop(field_id, None)
        null_value_counts.pop(field_id, None)

    return DataFileStatistics(
        record_count=parquet_metadata.num_rows,
        column_sizes=column_sizes,
        value_counts=value_counts,
        null_value_counts=null_value_counts,
        nan_value_counts=nan_value_counts,
        column_aggregates=col_aggs,
        split_offsets=split_offsets,
    )

parquet_path_to_id_mapping(schema)

Compute the mapping of parquet column path to Iceberg ID.

For each column, the parquet file metadata has a path_in_schema attribute that follows a specific naming scheme for nested columns. This function computes a mapping of the full paths to the corresponding Iceberg IDs.

Parameters:

Name Type Description Default
schema Schema

The current table schema.

required
Source code in pyiceberg/io/pyarrow.py
def parquet_path_to_id_mapping(
    schema: Schema,
) -> Dict[str, int]:
    """
    Compute the mapping of parquet column path to Iceberg ID.

    For each column, the parquet file metadata has a path_in_schema attribute that follows
    a specific naming scheme for nested columns. This function computes a mapping of
    the full paths to the corresponding Iceberg IDs.

    Args:
        schema (pyiceberg.schema.Schema): The current table schema.
    """
    result: Dict[str, int] = {}
    for pair in pre_order_visit(schema, ID2ParquetPathVisitor()):
        result[pair.parquet_path] = pair.field_id
    return result

visit_pyarrow(obj, visitor)

Apply a pyarrow schema visitor to any point within a schema.

The function traverses the schema in post-order fashion.

Parameters:

Name Type Description Default
obj Union[DataType, Schema]

An instance of a Schema or an IcebergType.

required
visitor PyArrowSchemaVisitor[T]

An instance of an implementation of the generic PyarrowSchemaVisitor base class.

required

Raises:

Type Description
NotImplementedError

If attempting to visit an unrecognized object type.

Source code in pyiceberg/io/pyarrow.py
@singledispatch
def visit_pyarrow(obj: Union[pa.DataType, pa.Schema], visitor: PyArrowSchemaVisitor[T]) -> T:
    """Apply a pyarrow schema visitor to any point within a schema.

    The function traverses the schema in post-order fashion.

    Args:
        obj (Union[pa.DataType, pa.Schema]): An instance of a Schema or an IcebergType.
        visitor (PyArrowSchemaVisitor[T]): An instance of an implementation of the generic PyarrowSchemaVisitor base class.

    Raises:
        NotImplementedError: If attempting to visit an unrecognized object type.
    """
    raise NotImplementedError(f"Cannot visit non-type: {obj}")