Skip to content

pyarrow

FileIO implementation for reading and writing table files that uses pyarrow.fs.

This file contains a FileIO implementation that relies on the filesystem interface provided by PyArrow. It relies on PyArrow's from_uri method that infers the correct filesystem type to use. Theoretically, this allows the supported storage types to grow naturally with the pyarrow library.

PyArrowFile

Bases: InputFile, OutputFile

A combined InputFile and OutputFile implementation that uses a pyarrow filesystem to generate pyarrow.lib.NativeFile instances.

Parameters:

Name Type Description Default
location str

A URI or a path to a local file.

required

Attributes:

Name Type Description
location(str)

The URI or path to a local file for a PyArrowFile instance.

Examples:

>>> from pyiceberg.io.pyarrow import PyArrowFile
>>> # input_file = PyArrowFile("s3://foo/bar.txt")
>>> # Read the contents of the PyArrowFile instance
>>> # Make sure that you have permissions to read/write
>>> # file_content = input_file.open().read()
>>> # output_file = PyArrowFile("s3://baz/qux.txt")
>>> # Write bytes to a file
>>> # Make sure that you have permissions to read/write
>>> # output_file.create().write(b'foobytes')
Source code in pyiceberg/io/pyarrow.py
class PyArrowFile(InputFile, OutputFile):
    """A combined InputFile and OutputFile implementation that uses a pyarrow filesystem to generate pyarrow.lib.NativeFile instances.

    Args:
        location (str): A URI or a path to a local file.

    Attributes:
        location(str): The URI or path to a local file for a PyArrowFile instance.

    Examples:
        >>> from pyiceberg.io.pyarrow import PyArrowFile
        >>> # input_file = PyArrowFile("s3://foo/bar.txt")
        >>> # Read the contents of the PyArrowFile instance
        >>> # Make sure that you have permissions to read/write
        >>> # file_content = input_file.open().read()

        >>> # output_file = PyArrowFile("s3://baz/qux.txt")
        >>> # Write bytes to a file
        >>> # Make sure that you have permissions to read/write
        >>> # output_file.create().write(b'foobytes')
    """

    _fs: FileSystem
    _path: str
    _buffer_size: int

    def __init__(self, location: str, path: str, fs: FileSystem, buffer_size: int = ONE_MEGABYTE):
        self._filesystem = fs
        self._path = path
        self._buffer_size = buffer_size
        super().__init__(location=location)

    def _file_info(self) -> FileInfo:
        """Retrieve a pyarrow.fs.FileInfo object for the location.

        Raises:
            PermissionError: If the file at self.location cannot be accessed due to a permission error such as
                an AWS error code 15.
        """
        try:
            file_info = self._filesystem.get_file_info(self._path)
        except OSError as e:
            if e.errno == 13 or "AWS Error [code 15]" in str(e):
                raise PermissionError(f"Cannot get file info, access denied: {self.location}") from e
            raise  # pragma: no cover - If some other kind of OSError, raise the raw error

        if file_info.type == FileType.NotFound:
            raise FileNotFoundError(f"Cannot get file info, file not found: {self.location}")
        return file_info

    def __len__(self) -> int:
        """Return the total length of the file, in bytes."""
        file_info = self._file_info()
        return file_info.size

    def exists(self) -> bool:
        """Check whether the location exists."""
        try:
            self._file_info()  # raises FileNotFoundError if it does not exist
            return True
        except FileNotFoundError:
            return False

    def open(self, seekable: bool = True) -> InputStream:
        """Open the location using a PyArrow FileSystem inferred from the location.

        Args:
            seekable: If the stream should support seek, or if it is consumed sequential.

        Returns:
            pyarrow.lib.NativeFile: A NativeFile instance for the file located at `self.location`.

        Raises:
            FileNotFoundError: If the file at self.location does not exist.
            PermissionError: If the file at self.location cannot be accessed due to a permission error such as
                an AWS error code 15.
        """
        try:
            if seekable:
                input_file = self._filesystem.open_input_file(self._path)
            else:
                input_file = self._filesystem.open_input_stream(self._path, buffer_size=self._buffer_size)
        except FileNotFoundError:
            raise
        except PermissionError:
            raise
        except OSError as e:
            if e.errno == 2 or "Path does not exist" in str(e):
                raise FileNotFoundError(f"Cannot open file, does not exist: {self.location}") from e
            elif e.errno == 13 or "AWS Error [code 15]" in str(e):
                raise PermissionError(f"Cannot open file, access denied: {self.location}") from e
            raise  # pragma: no cover - If some other kind of OSError, raise the raw error
        return input_file

    def create(self, overwrite: bool = False) -> OutputStream:
        """Create a writable pyarrow.lib.NativeFile for this PyArrowFile's location.

        Args:
            overwrite (bool): Whether to overwrite the file if it already exists.

        Returns:
            pyarrow.lib.NativeFile: A NativeFile instance for the file located at self.location.

        Raises:
            FileExistsError: If the file already exists at `self.location` and `overwrite` is False.

        Note:
            This retrieves a pyarrow NativeFile by opening an output stream. If overwrite is set to False,
            a check is first performed to verify that the file does not exist. This is not thread-safe and
            a possibility does exist that the file can be created by a concurrent process after the existence
            check yet before the output stream is created. In such a case, the default pyarrow behavior will
            truncate the contents of the existing file when opening the output stream.
        """
        try:
            if not overwrite and self.exists() is True:
                raise FileExistsError(f"Cannot create file, already exists: {self.location}")
            output_file = self._filesystem.open_output_stream(self._path, buffer_size=self._buffer_size)
        except PermissionError:
            raise
        except OSError as e:
            if e.errno == 13 or "AWS Error [code 15]" in str(e):
                raise PermissionError(f"Cannot create file, access denied: {self.location}") from e
            raise  # pragma: no cover - If some other kind of OSError, raise the raw error
        return output_file

    def to_input_file(self) -> PyArrowFile:
        """Return a new PyArrowFile for the location of an existing PyArrowFile instance.

        This method is included to abide by the OutputFile abstract base class. Since this implementation uses a single
        PyArrowFile class (as opposed to separate InputFile and OutputFile implementations), this method effectively returns
        a copy of the same instance.
        """
        return self

__len__()

Return the total length of the file, in bytes.

Source code in pyiceberg/io/pyarrow.py
def __len__(self) -> int:
    """Return the total length of the file, in bytes."""
    file_info = self._file_info()
    return file_info.size

create(overwrite=False)

Create a writable pyarrow.lib.NativeFile for this PyArrowFile's location.

Parameters:

Name Type Description Default
overwrite bool

Whether to overwrite the file if it already exists.

False

Returns:

Type Description
OutputStream

pyarrow.lib.NativeFile: A NativeFile instance for the file located at self.location.

Raises:

Type Description
FileExistsError

If the file already exists at self.location and overwrite is False.

Note

This retrieves a pyarrow NativeFile by opening an output stream. If overwrite is set to False, a check is first performed to verify that the file does not exist. This is not thread-safe and a possibility does exist that the file can be created by a concurrent process after the existence check yet before the output stream is created. In such a case, the default pyarrow behavior will truncate the contents of the existing file when opening the output stream.

Source code in pyiceberg/io/pyarrow.py
def create(self, overwrite: bool = False) -> OutputStream:
    """Create a writable pyarrow.lib.NativeFile for this PyArrowFile's location.

    Args:
        overwrite (bool): Whether to overwrite the file if it already exists.

    Returns:
        pyarrow.lib.NativeFile: A NativeFile instance for the file located at self.location.

    Raises:
        FileExistsError: If the file already exists at `self.location` and `overwrite` is False.

    Note:
        This retrieves a pyarrow NativeFile by opening an output stream. If overwrite is set to False,
        a check is first performed to verify that the file does not exist. This is not thread-safe and
        a possibility does exist that the file can be created by a concurrent process after the existence
        check yet before the output stream is created. In such a case, the default pyarrow behavior will
        truncate the contents of the existing file when opening the output stream.
    """
    try:
        if not overwrite and self.exists() is True:
            raise FileExistsError(f"Cannot create file, already exists: {self.location}")
        output_file = self._filesystem.open_output_stream(self._path, buffer_size=self._buffer_size)
    except PermissionError:
        raise
    except OSError as e:
        if e.errno == 13 or "AWS Error [code 15]" in str(e):
            raise PermissionError(f"Cannot create file, access denied: {self.location}") from e
        raise  # pragma: no cover - If some other kind of OSError, raise the raw error
    return output_file

exists()

Check whether the location exists.

Source code in pyiceberg/io/pyarrow.py
def exists(self) -> bool:
    """Check whether the location exists."""
    try:
        self._file_info()  # raises FileNotFoundError if it does not exist
        return True
    except FileNotFoundError:
        return False

open(seekable=True)

Open the location using a PyArrow FileSystem inferred from the location.

Parameters:

Name Type Description Default
seekable bool

If the stream should support seek, or if it is consumed sequential.

True

Returns:

Type Description
InputStream

pyarrow.lib.NativeFile: A NativeFile instance for the file located at self.location.

Raises:

Type Description
FileNotFoundError

If the file at self.location does not exist.

PermissionError

If the file at self.location cannot be accessed due to a permission error such as an AWS error code 15.

Source code in pyiceberg/io/pyarrow.py
def open(self, seekable: bool = True) -> InputStream:
    """Open the location using a PyArrow FileSystem inferred from the location.

    Args:
        seekable: If the stream should support seek, or if it is consumed sequential.

    Returns:
        pyarrow.lib.NativeFile: A NativeFile instance for the file located at `self.location`.

    Raises:
        FileNotFoundError: If the file at self.location does not exist.
        PermissionError: If the file at self.location cannot be accessed due to a permission error such as
            an AWS error code 15.
    """
    try:
        if seekable:
            input_file = self._filesystem.open_input_file(self._path)
        else:
            input_file = self._filesystem.open_input_stream(self._path, buffer_size=self._buffer_size)
    except FileNotFoundError:
        raise
    except PermissionError:
        raise
    except OSError as e:
        if e.errno == 2 or "Path does not exist" in str(e):
            raise FileNotFoundError(f"Cannot open file, does not exist: {self.location}") from e
        elif e.errno == 13 or "AWS Error [code 15]" in str(e):
            raise PermissionError(f"Cannot open file, access denied: {self.location}") from e
        raise  # pragma: no cover - If some other kind of OSError, raise the raw error
    return input_file

to_input_file()

Return a new PyArrowFile for the location of an existing PyArrowFile instance.

This method is included to abide by the OutputFile abstract base class. Since this implementation uses a single PyArrowFile class (as opposed to separate InputFile and OutputFile implementations), this method effectively returns a copy of the same instance.

Source code in pyiceberg/io/pyarrow.py
def to_input_file(self) -> PyArrowFile:
    """Return a new PyArrowFile for the location of an existing PyArrowFile instance.

    This method is included to abide by the OutputFile abstract base class. Since this implementation uses a single
    PyArrowFile class (as opposed to separate InputFile and OutputFile implementations), this method effectively returns
    a copy of the same instance.
    """
    return self

PyArrowFileIO

Bases: FileIO

Source code in pyiceberg/io/pyarrow.py
class PyArrowFileIO(FileIO):
    fs_by_scheme: Callable[[str, Optional[str]], FileSystem]

    def __init__(self, properties: Properties = EMPTY_DICT):
        self.fs_by_scheme: Callable[[str, Optional[str]], FileSystem] = lru_cache(self._initialize_fs)
        super().__init__(properties=properties)

    @staticmethod
    def parse_location(location: str) -> Tuple[str, str, str]:
        """Return the path without the scheme."""
        uri = urlparse(location)
        if not uri.scheme:
            return "file", uri.netloc, os.path.abspath(location)
        elif uri.scheme == "hdfs":
            return uri.scheme, uri.netloc, location
        else:
            return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}"

    def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSystem:
        if scheme in {"s3", "s3a", "s3n"}:
            from pyarrow.fs import S3FileSystem

            client_kwargs: Dict[str, Any] = {
                "endpoint_override": self.properties.get(S3_ENDPOINT),
                "access_key": self.properties.get(S3_ACCESS_KEY_ID),
                "secret_key": self.properties.get(S3_SECRET_ACCESS_KEY),
                "session_token": self.properties.get(S3_SESSION_TOKEN),
                "region": self.properties.get(S3_REGION),
            }

            if proxy_uri := self.properties.get(S3_PROXY_URI):
                client_kwargs["proxy_options"] = proxy_uri

            if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT):
                client_kwargs["connect_timeout"] = float(connect_timeout)

            return S3FileSystem(**client_kwargs)
        elif scheme == "hdfs":
            from pyarrow.fs import HadoopFileSystem

            hdfs_kwargs: Dict[str, Any] = {}
            if netloc:
                return HadoopFileSystem.from_uri(f"hdfs://{netloc}")
            if host := self.properties.get(HDFS_HOST):
                hdfs_kwargs["host"] = host
            if port := self.properties.get(HDFS_PORT):
                # port should be an integer type
                hdfs_kwargs["port"] = int(port)
            if user := self.properties.get(HDFS_USER):
                hdfs_kwargs["user"] = user
            if kerb_ticket := self.properties.get(HDFS_KERB_TICKET):
                hdfs_kwargs["kerb_ticket"] = kerb_ticket

            return HadoopFileSystem(**hdfs_kwargs)
        elif scheme in {"gs", "gcs"}:
            from pyarrow.fs import GcsFileSystem

            gcs_kwargs: Dict[str, Any] = {}
            if access_token := self.properties.get(GCS_TOKEN):
                gcs_kwargs["access_token"] = access_token
            if expiration := self.properties.get(GCS_TOKEN_EXPIRES_AT_MS):
                gcs_kwargs["credential_token_expiration"] = millis_to_datetime(int(expiration))
            if bucket_location := self.properties.get(GCS_DEFAULT_LOCATION):
                gcs_kwargs["default_bucket_location"] = bucket_location
            if endpoint := self.properties.get(GCS_ENDPOINT):
                url_parts = urlparse(endpoint)
                gcs_kwargs["scheme"] = url_parts.scheme
                gcs_kwargs["endpoint_override"] = url_parts.netloc

            return GcsFileSystem(**gcs_kwargs)
        elif scheme == "file":
            return PyArrowLocalFileSystem()
        else:
            raise ValueError(f"Unrecognized filesystem type in URI: {scheme}")

    def new_input(self, location: str) -> PyArrowFile:
        """Get a PyArrowFile instance to read bytes from the file at the given location.

        Args:
            location (str): A URI or a path to a local file.

        Returns:
            PyArrowFile: A PyArrowFile instance for the given location.
        """
        scheme, netloc, path = self.parse_location(location)
        return PyArrowFile(
            fs=self.fs_by_scheme(scheme, netloc),
            location=location,
            path=path,
            buffer_size=int(self.properties.get(BUFFER_SIZE, ONE_MEGABYTE)),
        )

    def new_output(self, location: str) -> PyArrowFile:
        """Get a PyArrowFile instance to write bytes to the file at the given location.

        Args:
            location (str): A URI or a path to a local file.

        Returns:
            PyArrowFile: A PyArrowFile instance for the given location.
        """
        scheme, netloc, path = self.parse_location(location)
        return PyArrowFile(
            fs=self.fs_by_scheme(scheme, netloc),
            location=location,
            path=path,
            buffer_size=int(self.properties.get(BUFFER_SIZE, ONE_MEGABYTE)),
        )

    def delete(self, location: Union[str, InputFile, OutputFile]) -> None:
        """Delete the file at the given location.

        Args:
            location (Union[str, InputFile, OutputFile]): The URI to the file--if an InputFile instance or an OutputFile instance is provided,
                the location attribute for that instance is used as the location to delete.

        Raises:
            FileNotFoundError: When the file at the provided location does not exist.
            PermissionError: If the file at the provided location cannot be accessed due to a permission error such as
                an AWS error code 15.
        """
        str_location = location.location if isinstance(location, (InputFile, OutputFile)) else location
        scheme, netloc, path = self.parse_location(str_location)
        fs = self.fs_by_scheme(scheme, netloc)

        try:
            fs.delete_file(path)
        except FileNotFoundError:
            raise
        except PermissionError:
            raise
        except OSError as e:
            if e.errno == 2 or "Path does not exist" in str(e):
                raise FileNotFoundError(f"Cannot delete file, does not exist: {location}") from e
            elif e.errno == 13 or "AWS Error [code 15]" in str(e):
                raise PermissionError(f"Cannot delete file, access denied: {location}") from e
            raise  # pragma: no cover - If some other kind of OSError, raise the raw error

delete(location)

Delete the file at the given location.

Parameters:

Name Type Description Default
location Union[str, InputFile, OutputFile]

The URI to the file--if an InputFile instance or an OutputFile instance is provided, the location attribute for that instance is used as the location to delete.

required

Raises:

Type Description
FileNotFoundError

When the file at the provided location does not exist.

PermissionError

If the file at the provided location cannot be accessed due to a permission error such as an AWS error code 15.

Source code in pyiceberg/io/pyarrow.py
def delete(self, location: Union[str, InputFile, OutputFile]) -> None:
    """Delete the file at the given location.

    Args:
        location (Union[str, InputFile, OutputFile]): The URI to the file--if an InputFile instance or an OutputFile instance is provided,
            the location attribute for that instance is used as the location to delete.

    Raises:
        FileNotFoundError: When the file at the provided location does not exist.
        PermissionError: If the file at the provided location cannot be accessed due to a permission error such as
            an AWS error code 15.
    """
    str_location = location.location if isinstance(location, (InputFile, OutputFile)) else location
    scheme, netloc, path = self.parse_location(str_location)
    fs = self.fs_by_scheme(scheme, netloc)

    try:
        fs.delete_file(path)
    except FileNotFoundError:
        raise
    except PermissionError:
        raise
    except OSError as e:
        if e.errno == 2 or "Path does not exist" in str(e):
            raise FileNotFoundError(f"Cannot delete file, does not exist: {location}") from e
        elif e.errno == 13 or "AWS Error [code 15]" in str(e):
            raise PermissionError(f"Cannot delete file, access denied: {location}") from e
        raise  # pragma: no cover - If some other kind of OSError, raise the raw error

new_input(location)

Get a PyArrowFile instance to read bytes from the file at the given location.

Parameters:

Name Type Description Default
location str

A URI or a path to a local file.

required

Returns:

Name Type Description
PyArrowFile PyArrowFile

A PyArrowFile instance for the given location.

Source code in pyiceberg/io/pyarrow.py
def new_input(self, location: str) -> PyArrowFile:
    """Get a PyArrowFile instance to read bytes from the file at the given location.

    Args:
        location (str): A URI or a path to a local file.

    Returns:
        PyArrowFile: A PyArrowFile instance for the given location.
    """
    scheme, netloc, path = self.parse_location(location)
    return PyArrowFile(
        fs=self.fs_by_scheme(scheme, netloc),
        location=location,
        path=path,
        buffer_size=int(self.properties.get(BUFFER_SIZE, ONE_MEGABYTE)),
    )

new_output(location)

Get a PyArrowFile instance to write bytes to the file at the given location.

Parameters:

Name Type Description Default
location str

A URI or a path to a local file.

required

Returns:

Name Type Description
PyArrowFile PyArrowFile

A PyArrowFile instance for the given location.

Source code in pyiceberg/io/pyarrow.py
def new_output(self, location: str) -> PyArrowFile:
    """Get a PyArrowFile instance to write bytes to the file at the given location.

    Args:
        location (str): A URI or a path to a local file.

    Returns:
        PyArrowFile: A PyArrowFile instance for the given location.
    """
    scheme, netloc, path = self.parse_location(location)
    return PyArrowFile(
        fs=self.fs_by_scheme(scheme, netloc),
        location=location,
        path=path,
        buffer_size=int(self.properties.get(BUFFER_SIZE, ONE_MEGABYTE)),
    )

parse_location(location) staticmethod

Return the path without the scheme.

Source code in pyiceberg/io/pyarrow.py
@staticmethod
def parse_location(location: str) -> Tuple[str, str, str]:
    """Return the path without the scheme."""
    uri = urlparse(location)
    if not uri.scheme:
        return "file", uri.netloc, os.path.abspath(location)
    elif uri.scheme == "hdfs":
        return uri.scheme, uri.netloc, location
    else:
        return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}"

PyArrowSchemaVisitor

Bases: Generic[T], ABC

Source code in pyiceberg/io/pyarrow.py
class PyArrowSchemaVisitor(Generic[T], ABC):
    def before_field(self, field: pa.Field) -> None:
        """Override this method to perform an action immediately before visiting a field."""

    def after_field(self, field: pa.Field) -> None:
        """Override this method to perform an action immediately after visiting a field."""

    def before_list_element(self, element: pa.Field) -> None:
        """Override this method to perform an action immediately before visiting an element within a ListType."""

    def after_list_element(self, element: pa.Field) -> None:
        """Override this method to perform an action immediately after visiting an element within a ListType."""

    def before_map_key(self, key: pa.Field) -> None:
        """Override this method to perform an action immediately before visiting a key within a MapType."""

    def after_map_key(self, key: pa.Field) -> None:
        """Override this method to perform an action immediately after visiting a key within a MapType."""

    def before_map_value(self, value: pa.Field) -> None:
        """Override this method to perform an action immediately before visiting a value within a MapType."""

    def after_map_value(self, value: pa.Field) -> None:
        """Override this method to perform an action immediately after visiting a value within a MapType."""

    @abstractmethod
    def schema(self, schema: pa.Schema, struct_result: T) -> T:
        """Visit a schema."""

    @abstractmethod
    def struct(self, struct: pa.StructType, field_results: List[T]) -> T:
        """Visit a struct."""

    @abstractmethod
    def field(self, field: pa.Field, field_result: T) -> T:
        """Visit a field."""

    @abstractmethod
    def list(self, list_type: pa.ListType, element_result: T) -> T:
        """Visit a list."""

    @abstractmethod
    def map(self, map_type: pa.MapType, key_result: T, value_result: T) -> T:
        """Visit a map."""

    @abstractmethod
    def primitive(self, primitive: pa.DataType) -> T:
        """Visit a primitive type."""

after_field(field)

Override this method to perform an action immediately after visiting a field.

Source code in pyiceberg/io/pyarrow.py
def after_field(self, field: pa.Field) -> None:
    """Override this method to perform an action immediately after visiting a field."""

after_list_element(element)

Override this method to perform an action immediately after visiting an element within a ListType.

Source code in pyiceberg/io/pyarrow.py
def after_list_element(self, element: pa.Field) -> None:
    """Override this method to perform an action immediately after visiting an element within a ListType."""

after_map_key(key)

Override this method to perform an action immediately after visiting a key within a MapType.

Source code in pyiceberg/io/pyarrow.py
def after_map_key(self, key: pa.Field) -> None:
    """Override this method to perform an action immediately after visiting a key within a MapType."""

after_map_value(value)

Override this method to perform an action immediately after visiting a value within a MapType.

Source code in pyiceberg/io/pyarrow.py
def after_map_value(self, value: pa.Field) -> None:
    """Override this method to perform an action immediately after visiting a value within a MapType."""

before_field(field)

Override this method to perform an action immediately before visiting a field.

Source code in pyiceberg/io/pyarrow.py
def before_field(self, field: pa.Field) -> None:
    """Override this method to perform an action immediately before visiting a field."""

before_list_element(element)

Override this method to perform an action immediately before visiting an element within a ListType.

Source code in pyiceberg/io/pyarrow.py
def before_list_element(self, element: pa.Field) -> None:
    """Override this method to perform an action immediately before visiting an element within a ListType."""

before_map_key(key)

Override this method to perform an action immediately before visiting a key within a MapType.

Source code in pyiceberg/io/pyarrow.py
def before_map_key(self, key: pa.Field) -> None:
    """Override this method to perform an action immediately before visiting a key within a MapType."""

before_map_value(value)

Override this method to perform an action immediately before visiting a value within a MapType.

Source code in pyiceberg/io/pyarrow.py
def before_map_value(self, value: pa.Field) -> None:
    """Override this method to perform an action immediately before visiting a value within a MapType."""

field(field, field_result) abstractmethod

Visit a field.

Source code in pyiceberg/io/pyarrow.py
@abstractmethod
def field(self, field: pa.Field, field_result: T) -> T:
    """Visit a field."""

list(list_type, element_result) abstractmethod

Visit a list.

Source code in pyiceberg/io/pyarrow.py
@abstractmethod
def list(self, list_type: pa.ListType, element_result: T) -> T:
    """Visit a list."""

map(map_type, key_result, value_result) abstractmethod

Visit a map.

Source code in pyiceberg/io/pyarrow.py
@abstractmethod
def map(self, map_type: pa.MapType, key_result: T, value_result: T) -> T:
    """Visit a map."""

primitive(primitive) abstractmethod

Visit a primitive type.

Source code in pyiceberg/io/pyarrow.py
@abstractmethod
def primitive(self, primitive: pa.DataType) -> T:
    """Visit a primitive type."""

schema(schema, struct_result) abstractmethod

Visit a schema.

Source code in pyiceberg/io/pyarrow.py
@abstractmethod
def schema(self, schema: pa.Schema, struct_result: T) -> T:
    """Visit a schema."""

struct(struct, field_results) abstractmethod

Visit a struct.

Source code in pyiceberg/io/pyarrow.py
@abstractmethod
def struct(self, struct: pa.StructType, field_results: List[T]) -> T:
    """Visit a struct."""

compute_statistics_plan(schema, table_properties)

Compute the statistics plan for all columns.

The resulting list is assumed to have the same length and same order as the columns in the pyarrow table. This allows the list to map from the column index to the Iceberg column ID. For each element, the desired metrics collection that was provided by the user in the configuration is computed and then adjusted according to the data type of the column. For nested columns the minimum and maximum values are not computed. And truncation is only applied to text of binary strings.

Parameters:

Name Type Description Default
table_properties from pyiceberg.table.metadata.TableMetadata

The Iceberg table metadata properties. They are required to compute the mapping of column position to iceberg schema type id. It's also used to set the mode for column metrics collection

required
Source code in pyiceberg/io/pyarrow.py
def compute_statistics_plan(
    schema: Schema,
    table_properties: Dict[str, str],
) -> Dict[int, StatisticsCollector]:
    """
    Compute the statistics plan for all columns.

    The resulting list is assumed to have the same length and same order as the columns in the pyarrow table.
    This allows the list to map from the column index to the Iceberg column ID.
    For each element, the desired metrics collection that was provided by the user in the configuration
    is computed and then adjusted according to the data type of the column. For nested columns the minimum
    and maximum values are not computed. And truncation is only applied to text of binary strings.

    Args:
        table_properties (from pyiceberg.table.metadata.TableMetadata): The Iceberg table metadata properties.
            They are required to compute the mapping of column position to iceberg schema type id. It's also
            used to set the mode for column metrics collection
    """
    stats_cols = pre_order_visit(schema, PyArrowStatisticsCollector(schema, table_properties))
    result: Dict[int, StatisticsCollector] = {}
    for stats_col in stats_cols:
        result[stats_col.field_id] = stats_col
    return result

fill_parquet_file_metadata(data_file, parquet_metadata, stats_columns, parquet_column_mapping)

Compute and fill the following fields of the DataFile object.

  • file_format
  • column_sizes
  • value_counts
  • null_value_counts
  • nan_value_counts
  • lower_bounds
  • upper_bounds
  • split_offsets

Parameters:

Name Type Description Default
data_file DataFile

A DataFile object representing the Parquet file for which metadata is to be filled.

required
parquet_metadata FileMetaData

A pyarrow metadata object.

required
stats_columns Dict[int, StatisticsCollector]

The statistics gathering plan. It is required to set the mode for column metrics collection

required
Source code in pyiceberg/io/pyarrow.py
def fill_parquet_file_metadata(
    data_file: DataFile,
    parquet_metadata: pq.FileMetaData,
    stats_columns: Dict[int, StatisticsCollector],
    parquet_column_mapping: Dict[str, int],
) -> None:
    """
    Compute and fill the following fields of the DataFile object.

    - file_format
    - column_sizes
    - value_counts
    - null_value_counts
    - nan_value_counts
    - lower_bounds
    - upper_bounds
    - split_offsets

    Args:
        data_file (DataFile): A DataFile object representing the Parquet file for which metadata is to be filled.
        parquet_metadata (pyarrow.parquet.FileMetaData): A pyarrow metadata object.
        stats_columns (Dict[int, StatisticsCollector]): The statistics gathering plan. It is required to
            set the mode for column metrics collection
    """
    if parquet_metadata.num_columns != len(stats_columns):
        raise ValueError(
            f"Number of columns in statistics configuration ({len(stats_columns)}) is different from the number of columns in pyarrow table ({parquet_metadata.num_columns})"
        )

    if parquet_metadata.num_columns != len(parquet_column_mapping):
        raise ValueError(
            f"Number of columns in column mapping ({len(parquet_column_mapping)}) is different from the number of columns in pyarrow table ({parquet_metadata.num_columns})"
        )

    column_sizes: Dict[int, int] = {}
    value_counts: Dict[int, int] = {}
    split_offsets: List[int] = []

    null_value_counts: Dict[int, int] = {}
    nan_value_counts: Dict[int, int] = {}

    col_aggs = {}

    for r in range(parquet_metadata.num_row_groups):
        # References:
        # https://github.com/apache/iceberg/blob/fc381a81a1fdb8f51a0637ca27cd30673bd7aad3/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java#L232
        # https://github.com/apache/parquet-mr/blob/ac29db4611f86a07cc6877b416aa4b183e09b353/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java#L184

        row_group = parquet_metadata.row_group(r)

        data_offset = row_group.column(0).data_page_offset
        dictionary_offset = row_group.column(0).dictionary_page_offset

        if row_group.column(0).has_dictionary_page and dictionary_offset < data_offset:
            split_offsets.append(dictionary_offset)
        else:
            split_offsets.append(data_offset)

        invalidate_col: Set[int] = set()

        for pos in range(parquet_metadata.num_columns):
            column = row_group.column(pos)
            field_id = parquet_column_mapping[column.path_in_schema]

            stats_col = stats_columns[field_id]

            column_sizes.setdefault(field_id, 0)
            column_sizes[field_id] += column.total_compressed_size

            if stats_col.mode == MetricsMode(MetricModeTypes.NONE):
                continue

            value_counts[field_id] = value_counts.get(field_id, 0) + column.num_values

            if column.is_stats_set:
                try:
                    statistics = column.statistics

                    if statistics.has_null_count:
                        null_value_counts[field_id] = null_value_counts.get(field_id, 0) + statistics.null_count

                    if stats_col.mode == MetricsMode(MetricModeTypes.COUNTS):
                        continue

                    if field_id not in col_aggs:
                        col_aggs[field_id] = StatsAggregator(
                            stats_col.iceberg_type, statistics.physical_type, stats_col.mode.length
                        )

                    col_aggs[field_id].update_min(statistics.min)
                    col_aggs[field_id].update_max(statistics.max)

                except pyarrow.lib.ArrowNotImplementedError as e:
                    invalidate_col.add(field_id)
                    logger.warning(e)
            else:
                invalidate_col.add(field_id)
                logger.warning("PyArrow statistics missing for column %d when writing file", pos)

    split_offsets.sort()

    lower_bounds = {}
    upper_bounds = {}

    for k, agg in col_aggs.items():
        _min = agg.min_as_bytes()
        if _min is not None:
            lower_bounds[k] = _min
        _max = agg.max_as_bytes()
        if _max is not None:
            upper_bounds[k] = _max

    for field_id in invalidate_col:
        del lower_bounds[field_id]
        del upper_bounds[field_id]
        del null_value_counts[field_id]

    data_file.record_count = parquet_metadata.num_rows
    data_file.column_sizes = column_sizes
    data_file.value_counts = value_counts
    data_file.null_value_counts = null_value_counts
    data_file.nan_value_counts = nan_value_counts
    data_file.lower_bounds = lower_bounds
    data_file.upper_bounds = upper_bounds
    data_file.split_offsets = split_offsets

parquet_path_to_id_mapping(schema)

Compute the mapping of parquet column path to Iceberg ID.

For each column, the parquet file metadata has a path_in_schema attribute that follows a specific naming scheme for nested columnds. This function computes a mapping of the full paths to the corresponding Iceberg IDs.

Parameters:

Name Type Description Default
schema Schema

The current table schema.

required
Source code in pyiceberg/io/pyarrow.py
def parquet_path_to_id_mapping(
    schema: Schema,
) -> Dict[str, int]:
    """
    Compute the mapping of parquet column path to Iceberg ID.

    For each column, the parquet file metadata has a path_in_schema attribute that follows
    a specific naming scheme for nested columnds. This function computes a mapping of
    the full paths to the corresponding Iceberg IDs.

    Args:
        schema (pyiceberg.schema.Schema): The current table schema.
    """
    result: Dict[str, int] = {}
    for pair in pre_order_visit(schema, ID2ParquetPathVisitor()):
        result[pair.parquet_path] = pair.field_id
    return result

project_table(tasks, table, row_filter, projected_schema, case_sensitive=True, limit=None)

Resolve the right columns based on the identifier.

Parameters:

Name Type Description Default
tasks Iterable[FileScanTask]

A URI or a path to a local file.

required
table Table

The table that's being queried.

required
row_filter BooleanExpression

The expression for filtering rows.

required
projected_schema Schema

The output schema.

required
case_sensitive bool

Case sensitivity when looking up column names.

True
limit Optional[int]

Limit the number of records.

None

Raises:

Type Description
ResolveError

When an incompatible query is done.

Source code in pyiceberg/io/pyarrow.py
def project_table(
    tasks: Iterable[FileScanTask],
    table: Table,
    row_filter: BooleanExpression,
    projected_schema: Schema,
    case_sensitive: bool = True,
    limit: Optional[int] = None,
) -> pa.Table:
    """Resolve the right columns based on the identifier.

    Args:
        tasks (Iterable[FileScanTask]): A URI or a path to a local file.
        table (Table): The table that's being queried.
        row_filter (BooleanExpression): The expression for filtering rows.
        projected_schema (Schema): The output schema.
        case_sensitive (bool): Case sensitivity when looking up column names.
        limit (Optional[int]): Limit the number of records.

    Raises:
        ResolveError: When an incompatible query is done.
    """
    scheme, netloc, _ = PyArrowFileIO.parse_location(table.location())
    if isinstance(table.io, PyArrowFileIO):
        fs = table.io.fs_by_scheme(scheme, netloc)
    else:
        try:
            from pyiceberg.io.fsspec import FsspecFileIO

            if isinstance(table.io, FsspecFileIO):
                from pyarrow.fs import PyFileSystem

                fs = PyFileSystem(FSSpecHandler(table.io.get_fs(scheme)))
            else:
                raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {table.io}")
        except ModuleNotFoundError as e:
            # When FsSpec is not installed
            raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {table.io}") from e

    bound_row_filter = bind(table.schema(), row_filter, case_sensitive=case_sensitive)

    projected_field_ids = {
        id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType))
    }.union(extract_field_ids(bound_row_filter))

    row_counts: List[int] = []
    deletes_per_file = _read_all_delete_files(fs, tasks)
    executor = ExecutorFactory.get_or_create()
    futures = [
        executor.submit(
            _task_to_table,
            fs,
            task,
            bound_row_filter,
            projected_schema,
            projected_field_ids,
            deletes_per_file.get(task.file.file_path),
            case_sensitive,
            row_counts,
            limit,
            table.name_mapping(),
        )
        for task in tasks
    ]

    # for consistent ordering, we need to maintain future order
    futures_index = {f: i for i, f in enumerate(futures)}
    completed_futures: SortedList[Future[pa.Table]] = SortedList(iterable=[], key=lambda f: futures_index[f])
    for future in concurrent.futures.as_completed(futures):
        completed_futures.add(future)

        # stop early if limit is satisfied
        if limit is not None and sum(row_counts) >= limit:
            break

    # by now, we've either completed all tasks or satisfied the limit
    if limit is not None:
        _ = [f.cancel() for f in futures if not f.done()]

    tables = [f.result() for f in completed_futures if f.result()]

    if len(tables) < 1:
        return pa.Table.from_batches([], schema=schema_to_pyarrow(projected_schema))

    result = pa.concat_tables(tables)

    if limit is not None:
        return result.slice(0, limit)

    return result

visit_pyarrow(obj, visitor)

Apply a pyarrow schema visitor to any point within a schema.

The function traverses the schema in post-order fashion.

Parameters:

Name Type Description Default
obj Union[DataType, Schema]

An instance of a Schema or an IcebergType.

required
visitor PyArrowSchemaVisitor[T]

An instance of an implementation of the generic PyarrowSchemaVisitor base class.

required

Raises:

Type Description
NotImplementedError

If attempting to visit an unrecognized object type.

Source code in pyiceberg/io/pyarrow.py
@singledispatch
def visit_pyarrow(obj: Union[pa.DataType, pa.Schema], visitor: PyArrowSchemaVisitor[T]) -> T:
    """Apply a pyarrow schema visitor to any point within a schema.

    The function traverses the schema in post-order fashion.

    Args:
        obj (Union[pa.DataType, pa.Schema]): An instance of a Schema or an IcebergType.
        visitor (PyArrowSchemaVisitor[T]): An instance of an implementation of the generic PyarrowSchemaVisitor base class.

    Raises:
        NotImplementedError: If attempting to visit an unrecognized object type.
    """
    raise NotImplementedError(f"Cannot visit non-type: {obj}")