Skip to content

pyarrow

FileIO implementation for reading and writing table files that uses pyarrow.fs.

This file contains a FileIO implementation that relies on the filesystem interface provided by PyArrow. It relies on PyArrow's from_uri method that infers the correct filesystem type to use. Theoretically, this allows the supported storage types to grow naturally with the pyarrow library.

ArrowScan

Source code in pyiceberg/io/pyarrow.py
class ArrowScan:
    _table_metadata: TableMetadata
    _io: FileIO
    _fs: FileSystem
    _projected_schema: Schema
    _bound_row_filter: BooleanExpression
    _case_sensitive: bool
    _limit: Optional[int]
    """Scan the Iceberg Table and create an Arrow construct.

    Attributes:
        _table_metadata: Current table metadata of the Iceberg table
        _io: PyIceberg FileIO implementation from which to fetch the io properties
        _fs: PyArrow FileSystem to use to read the files
        _projected_schema: Iceberg Schema to project onto the data files
        _bound_row_filter: Schema bound row expression to filter the data with
        _case_sensitive: Case sensitivity when looking up column names
        _limit: Limit the number of records.
    """

    def __init__(
        self,
        table_metadata: TableMetadata,
        io: FileIO,
        projected_schema: Schema,
        row_filter: BooleanExpression,
        case_sensitive: bool = True,
        limit: Optional[int] = None,
    ) -> None:
        self._table_metadata = table_metadata
        self._io = io
        self._fs = _fs_from_file_path(table_metadata.location, io)  # TODO: use different FileSystem per file
        self._projected_schema = projected_schema
        self._bound_row_filter = bind(table_metadata.schema(), row_filter, case_sensitive=case_sensitive)
        self._case_sensitive = case_sensitive
        self._limit = limit

    @property
    def _use_large_types(self) -> bool:
        """Whether to represent data as large arrow types.

        Defaults to True.
        """
        return property_as_bool(self._io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, True)

    @property
    def _projected_field_ids(self) -> Set[int]:
        """Set of field IDs that should be projected from the data files."""
        return {
            id
            for id in self._projected_schema.field_ids
            if not isinstance(self._projected_schema.find_type(id), (MapType, ListType))
        }.union(extract_field_ids(self._bound_row_filter))

    def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
        """Scan the Iceberg table and return a pa.Table.

        Returns a pa.Table with data from the Iceberg table by resolving the
        right columns that match the current table schema. Only data that
        matches the provided row_filter expression is returned.

        Args:
            tasks: FileScanTasks representing the data files and delete files to read from.

        Returns:
            A PyArrow table. Total number of rows will be capped if specified.

        Raises:
            ResolveError: When a required field cannot be found in the file
            ValueError: When a field type in the file cannot be projected to the schema type
        """
        deletes_per_file = _read_all_delete_files(self._fs, tasks)
        executor = ExecutorFactory.get_or_create()

        def _table_from_scan_task(task: FileScanTask) -> pa.Table:
            batches = list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file))
            if len(batches) > 0:
                return pa.Table.from_batches(batches)
            else:
                return None

        futures = [
            executor.submit(
                _table_from_scan_task,
                task,
            )
            for task in tasks
        ]
        total_row_count = 0
        # for consistent ordering, we need to maintain future order
        futures_index = {f: i for i, f in enumerate(futures)}
        completed_futures: SortedList[Future[pa.Table]] = SortedList(iterable=[], key=lambda f: futures_index[f])
        for future in concurrent.futures.as_completed(futures):
            completed_futures.add(future)
            if table_result := future.result():
                total_row_count += len(table_result)
            # stop early if limit is satisfied
            if self._limit is not None and total_row_count >= self._limit:
                break

        # by now, we've either completed all tasks or satisfied the limit
        if self._limit is not None:
            _ = [f.cancel() for f in futures if not f.done()]

        tables = [f.result() for f in completed_futures if f.result()]

        if len(tables) < 1:
            return pa.Table.from_batches([], schema=schema_to_pyarrow(self._projected_schema, include_field_ids=False))

        result = pa.concat_tables(tables, promote_options="permissive")

        if self._limit is not None:
            return result.slice(0, self._limit)

        return result

    def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.RecordBatch]:
        """Scan the Iceberg table and return an Iterator[pa.RecordBatch].

        Returns an Iterator of pa.RecordBatch with data from the Iceberg table
        by resolving the right columns that match the current table schema.
        Only data that matches the provided row_filter expression is returned.

        Args:
            tasks: FileScanTasks representing the data files and delete files to read from.

        Returns:
            An Iterator of PyArrow RecordBatches.
            Total number of rows will be capped if specified.

        Raises:
            ResolveError: When a required field cannot be found in the file
            ValueError: When a field type in the file cannot be projected to the schema type
        """
        deletes_per_file = _read_all_delete_files(self._fs, tasks)
        return self._record_batches_from_scan_tasks_and_deletes(tasks, deletes_per_file)

    def _record_batches_from_scan_tasks_and_deletes(
        self, tasks: Iterable[FileScanTask], deletes_per_file: Dict[str, List[ChunkedArray]]
    ) -> Iterator[pa.RecordBatch]:
        total_row_count = 0
        for task in tasks:
            if self._limit is not None and total_row_count >= self._limit:
                break
            batches = _task_to_record_batches(
                self._fs,
                task,
                self._bound_row_filter,
                self._projected_schema,
                self._projected_field_ids,
                deletes_per_file.get(task.file.file_path),
                self._case_sensitive,
                self._table_metadata.name_mapping(),
                self._use_large_types,
            )
            for batch in batches:
                if self._limit is not None:
                    if total_row_count >= self._limit:
                        break
                    elif total_row_count + len(batch) >= self._limit:
                        batch = batch.slice(0, self._limit - total_row_count)
                yield batch
                total_row_count += len(batch)

to_record_batches(tasks)

Scan the Iceberg table and return an Iterator[pa.RecordBatch].

Returns an Iterator of pa.RecordBatch with data from the Iceberg table by resolving the right columns that match the current table schema. Only data that matches the provided row_filter expression is returned.

Parameters:

Name Type Description Default
tasks Iterable[FileScanTask]

FileScanTasks representing the data files and delete files to read from.

required

Returns:

Type Description
Iterator[RecordBatch]

An Iterator of PyArrow RecordBatches.

Iterator[RecordBatch]

Total number of rows will be capped if specified.

Raises:

Type Description
ResolveError

When a required field cannot be found in the file

ValueError

When a field type in the file cannot be projected to the schema type

Source code in pyiceberg/io/pyarrow.py
def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.RecordBatch]:
    """Scan the Iceberg table and return an Iterator[pa.RecordBatch].

    Returns an Iterator of pa.RecordBatch with data from the Iceberg table
    by resolving the right columns that match the current table schema.
    Only data that matches the provided row_filter expression is returned.

    Args:
        tasks: FileScanTasks representing the data files and delete files to read from.

    Returns:
        An Iterator of PyArrow RecordBatches.
        Total number of rows will be capped if specified.

    Raises:
        ResolveError: When a required field cannot be found in the file
        ValueError: When a field type in the file cannot be projected to the schema type
    """
    deletes_per_file = _read_all_delete_files(self._fs, tasks)
    return self._record_batches_from_scan_tasks_and_deletes(tasks, deletes_per_file)

to_table(tasks)

Scan the Iceberg table and return a pa.Table.

Returns a pa.Table with data from the Iceberg table by resolving the right columns that match the current table schema. Only data that matches the provided row_filter expression is returned.

Parameters:

Name Type Description Default
tasks Iterable[FileScanTask]

FileScanTasks representing the data files and delete files to read from.

required

Returns:

Type Description
Table

A PyArrow table. Total number of rows will be capped if specified.

Raises:

Type Description
ResolveError

When a required field cannot be found in the file

ValueError

When a field type in the file cannot be projected to the schema type

Source code in pyiceberg/io/pyarrow.py
def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
    """Scan the Iceberg table and return a pa.Table.

    Returns a pa.Table with data from the Iceberg table by resolving the
    right columns that match the current table schema. Only data that
    matches the provided row_filter expression is returned.

    Args:
        tasks: FileScanTasks representing the data files and delete files to read from.

    Returns:
        A PyArrow table. Total number of rows will be capped if specified.

    Raises:
        ResolveError: When a required field cannot be found in the file
        ValueError: When a field type in the file cannot be projected to the schema type
    """
    deletes_per_file = _read_all_delete_files(self._fs, tasks)
    executor = ExecutorFactory.get_or_create()

    def _table_from_scan_task(task: FileScanTask) -> pa.Table:
        batches = list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file))
        if len(batches) > 0:
            return pa.Table.from_batches(batches)
        else:
            return None

    futures = [
        executor.submit(
            _table_from_scan_task,
            task,
        )
        for task in tasks
    ]
    total_row_count = 0
    # for consistent ordering, we need to maintain future order
    futures_index = {f: i for i, f in enumerate(futures)}
    completed_futures: SortedList[Future[pa.Table]] = SortedList(iterable=[], key=lambda f: futures_index[f])
    for future in concurrent.futures.as_completed(futures):
        completed_futures.add(future)
        if table_result := future.result():
            total_row_count += len(table_result)
        # stop early if limit is satisfied
        if self._limit is not None and total_row_count >= self._limit:
            break

    # by now, we've either completed all tasks or satisfied the limit
    if self._limit is not None:
        _ = [f.cancel() for f in futures if not f.done()]

    tables = [f.result() for f in completed_futures if f.result()]

    if len(tables) < 1:
        return pa.Table.from_batches([], schema=schema_to_pyarrow(self._projected_schema, include_field_ids=False))

    result = pa.concat_tables(tables, promote_options="permissive")

    if self._limit is not None:
        return result.slice(0, self._limit)

    return result

PyArrowFile

Bases: InputFile, OutputFile

A combined InputFile and OutputFile implementation that uses a pyarrow filesystem to generate pyarrow.lib.NativeFile instances.

Parameters:

Name Type Description Default
location str

A URI or a path to a local file.

required

Attributes:

Name Type Description
location(str)

The URI or path to a local file for a PyArrowFile instance.

Examples:

>>> from pyiceberg.io.pyarrow import PyArrowFile
>>> # input_file = PyArrowFile("s3://foo/bar.txt")
>>> # Read the contents of the PyArrowFile instance
>>> # Make sure that you have permissions to read/write
>>> # file_content = input_file.open().read()
>>> # output_file = PyArrowFile("s3://baz/qux.txt")
>>> # Write bytes to a file
>>> # Make sure that you have permissions to read/write
>>> # output_file.create().write(b'foobytes')
Source code in pyiceberg/io/pyarrow.py
class PyArrowFile(InputFile, OutputFile):
    """A combined InputFile and OutputFile implementation that uses a pyarrow filesystem to generate pyarrow.lib.NativeFile instances.

    Args:
        location (str): A URI or a path to a local file.

    Attributes:
        location(str): The URI or path to a local file for a PyArrowFile instance.

    Examples:
        >>> from pyiceberg.io.pyarrow import PyArrowFile
        >>> # input_file = PyArrowFile("s3://foo/bar.txt")
        >>> # Read the contents of the PyArrowFile instance
        >>> # Make sure that you have permissions to read/write
        >>> # file_content = input_file.open().read()

        >>> # output_file = PyArrowFile("s3://baz/qux.txt")
        >>> # Write bytes to a file
        >>> # Make sure that you have permissions to read/write
        >>> # output_file.create().write(b'foobytes')
    """

    _filesystem: FileSystem
    _path: str
    _buffer_size: int

    def __init__(self, location: str, path: str, fs: FileSystem, buffer_size: int = ONE_MEGABYTE):
        self._filesystem = fs
        self._path = path
        self._buffer_size = buffer_size
        super().__init__(location=location)

    def _file_info(self) -> FileInfo:
        """Retrieve a pyarrow.fs.FileInfo object for the location.

        Raises:
            PermissionError: If the file at self.location cannot be accessed due to a permission error such as
                an AWS error code 15.
        """
        try:
            file_info = self._filesystem.get_file_info(self._path)
        except OSError as e:
            if e.errno == 13 or "AWS Error [code 15]" in str(e):
                raise PermissionError(f"Cannot get file info, access denied: {self.location}") from e
            raise  # pragma: no cover - If some other kind of OSError, raise the raw error

        if file_info.type == FileType.NotFound:
            raise FileNotFoundError(f"Cannot get file info, file not found: {self.location}")
        return file_info

    def __len__(self) -> int:
        """Return the total length of the file, in bytes."""
        file_info = self._file_info()
        return file_info.size

    def exists(self) -> bool:
        """Check whether the location exists."""
        try:
            self._file_info()  # raises FileNotFoundError if it does not exist
            return True
        except FileNotFoundError:
            return False

    def open(self, seekable: bool = True) -> InputStream:
        """Open the location using a PyArrow FileSystem inferred from the location.

        Args:
            seekable: If the stream should support seek, or if it is consumed sequential.

        Returns:
            pyarrow.lib.NativeFile: A NativeFile instance for the file located at `self.location`.

        Raises:
            FileNotFoundError: If the file at self.location does not exist.
            PermissionError: If the file at self.location cannot be accessed due to a permission error such as
                an AWS error code 15.
        """
        try:
            if seekable:
                input_file = self._filesystem.open_input_file(self._path)
            else:
                input_file = self._filesystem.open_input_stream(self._path, buffer_size=self._buffer_size)
        except FileNotFoundError:
            raise
        except PermissionError:
            raise
        except OSError as e:
            if e.errno == 2 or "Path does not exist" in str(e):
                raise FileNotFoundError(f"Cannot open file, does not exist: {self.location}") from e
            elif e.errno == 13 or "AWS Error [code 15]" in str(e):
                raise PermissionError(f"Cannot open file, access denied: {self.location}") from e
            raise  # pragma: no cover - If some other kind of OSError, raise the raw error
        return input_file

    def create(self, overwrite: bool = False) -> OutputStream:
        """Create a writable pyarrow.lib.NativeFile for this PyArrowFile's location.

        Args:
            overwrite (bool): Whether to overwrite the file if it already exists.

        Returns:
            pyarrow.lib.NativeFile: A NativeFile instance for the file located at self.location.

        Raises:
            FileExistsError: If the file already exists at `self.location` and `overwrite` is False.

        Note:
            This retrieves a pyarrow NativeFile by opening an output stream. If overwrite is set to False,
            a check is first performed to verify that the file does not exist. This is not thread-safe and
            a possibility does exist that the file can be created by a concurrent process after the existence
            check yet before the output stream is created. In such a case, the default pyarrow behavior will
            truncate the contents of the existing file when opening the output stream.
        """
        try:
            if not overwrite and self.exists() is True:
                raise FileExistsError(f"Cannot create file, already exists: {self.location}")
            output_file = self._filesystem.open_output_stream(self._path, buffer_size=self._buffer_size)
        except PermissionError:
            raise
        except OSError as e:
            if e.errno == 13 or "AWS Error [code 15]" in str(e):
                raise PermissionError(f"Cannot create file, access denied: {self.location}") from e
            raise  # pragma: no cover - If some other kind of OSError, raise the raw error
        return output_file

    def to_input_file(self) -> PyArrowFile:
        """Return a new PyArrowFile for the location of an existing PyArrowFile instance.

        This method is included to abide by the OutputFile abstract base class. Since this implementation uses a single
        PyArrowFile class (as opposed to separate InputFile and OutputFile implementations), this method effectively returns
        a copy of the same instance.
        """
        return self

__len__()

Return the total length of the file, in bytes.

Source code in pyiceberg/io/pyarrow.py
def __len__(self) -> int:
    """Return the total length of the file, in bytes."""
    file_info = self._file_info()
    return file_info.size

create(overwrite=False)

Create a writable pyarrow.lib.NativeFile for this PyArrowFile's location.

Parameters:

Name Type Description Default
overwrite bool

Whether to overwrite the file if it already exists.

False

Returns:

Type Description
OutputStream

pyarrow.lib.NativeFile: A NativeFile instance for the file located at self.location.

Raises:

Type Description
FileExistsError

If the file already exists at self.location and overwrite is False.

Note

This retrieves a pyarrow NativeFile by opening an output stream. If overwrite is set to False, a check is first performed to verify that the file does not exist. This is not thread-safe and a possibility does exist that the file can be created by a concurrent process after the existence check yet before the output stream is created. In such a case, the default pyarrow behavior will truncate the contents of the existing file when opening the output stream.

Source code in pyiceberg/io/pyarrow.py
def create(self, overwrite: bool = False) -> OutputStream:
    """Create a writable pyarrow.lib.NativeFile for this PyArrowFile's location.

    Args:
        overwrite (bool): Whether to overwrite the file if it already exists.

    Returns:
        pyarrow.lib.NativeFile: A NativeFile instance for the file located at self.location.

    Raises:
        FileExistsError: If the file already exists at `self.location` and `overwrite` is False.

    Note:
        This retrieves a pyarrow NativeFile by opening an output stream. If overwrite is set to False,
        a check is first performed to verify that the file does not exist. This is not thread-safe and
        a possibility does exist that the file can be created by a concurrent process after the existence
        check yet before the output stream is created. In such a case, the default pyarrow behavior will
        truncate the contents of the existing file when opening the output stream.
    """
    try:
        if not overwrite and self.exists() is True:
            raise FileExistsError(f"Cannot create file, already exists: {self.location}")
        output_file = self._filesystem.open_output_stream(self._path, buffer_size=self._buffer_size)
    except PermissionError:
        raise
    except OSError as e:
        if e.errno == 13 or "AWS Error [code 15]" in str(e):
            raise PermissionError(f"Cannot create file, access denied: {self.location}") from e
        raise  # pragma: no cover - If some other kind of OSError, raise the raw error
    return output_file

exists()

Check whether the location exists.

Source code in pyiceberg/io/pyarrow.py
def exists(self) -> bool:
    """Check whether the location exists."""
    try:
        self._file_info()  # raises FileNotFoundError if it does not exist
        return True
    except FileNotFoundError:
        return False

open(seekable=True)

Open the location using a PyArrow FileSystem inferred from the location.

Parameters:

Name Type Description Default
seekable bool

If the stream should support seek, or if it is consumed sequential.

True

Returns:

Type Description
InputStream

pyarrow.lib.NativeFile: A NativeFile instance for the file located at self.location.

Raises:

Type Description
FileNotFoundError

If the file at self.location does not exist.

PermissionError

If the file at self.location cannot be accessed due to a permission error such as an AWS error code 15.

Source code in pyiceberg/io/pyarrow.py
def open(self, seekable: bool = True) -> InputStream:
    """Open the location using a PyArrow FileSystem inferred from the location.

    Args:
        seekable: If the stream should support seek, or if it is consumed sequential.

    Returns:
        pyarrow.lib.NativeFile: A NativeFile instance for the file located at `self.location`.

    Raises:
        FileNotFoundError: If the file at self.location does not exist.
        PermissionError: If the file at self.location cannot be accessed due to a permission error such as
            an AWS error code 15.
    """
    try:
        if seekable:
            input_file = self._filesystem.open_input_file(self._path)
        else:
            input_file = self._filesystem.open_input_stream(self._path, buffer_size=self._buffer_size)
    except FileNotFoundError:
        raise
    except PermissionError:
        raise
    except OSError as e:
        if e.errno == 2 or "Path does not exist" in str(e):
            raise FileNotFoundError(f"Cannot open file, does not exist: {self.location}") from e
        elif e.errno == 13 or "AWS Error [code 15]" in str(e):
            raise PermissionError(f"Cannot open file, access denied: {self.location}") from e
        raise  # pragma: no cover - If some other kind of OSError, raise the raw error
    return input_file

to_input_file()

Return a new PyArrowFile for the location of an existing PyArrowFile instance.

This method is included to abide by the OutputFile abstract base class. Since this implementation uses a single PyArrowFile class (as opposed to separate InputFile and OutputFile implementations), this method effectively returns a copy of the same instance.

Source code in pyiceberg/io/pyarrow.py
def to_input_file(self) -> PyArrowFile:
    """Return a new PyArrowFile for the location of an existing PyArrowFile instance.

    This method is included to abide by the OutputFile abstract base class. Since this implementation uses a single
    PyArrowFile class (as opposed to separate InputFile and OutputFile implementations), this method effectively returns
    a copy of the same instance.
    """
    return self

PyArrowFileIO

Bases: FileIO

Source code in pyiceberg/io/pyarrow.py
class PyArrowFileIO(FileIO):
    fs_by_scheme: Callable[[str, Optional[str]], FileSystem]

    def __init__(self, properties: Properties = EMPTY_DICT):
        self.fs_by_scheme: Callable[[str, Optional[str]], FileSystem] = lru_cache(self._initialize_fs)
        super().__init__(properties=properties)

    @staticmethod
    def parse_location(location: str) -> Tuple[str, str, str]:
        """Return the path without the scheme."""
        uri = urlparse(location)
        if not uri.scheme:
            return "file", uri.netloc, os.path.abspath(location)
        elif uri.scheme in ("hdfs", "viewfs"):
            return uri.scheme, uri.netloc, uri.path
        else:
            return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}"

    def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSystem:
        if scheme in {"s3", "s3a", "s3n"}:
            from pyarrow.fs import S3FileSystem

            client_kwargs: Dict[str, Any] = {
                "endpoint_override": self.properties.get(S3_ENDPOINT),
                "access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
                "secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
                "session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN),
                "region": get_first_property_value(self.properties, S3_REGION, AWS_REGION),
            }

            if proxy_uri := self.properties.get(S3_PROXY_URI):
                client_kwargs["proxy_options"] = proxy_uri

            if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT):
                client_kwargs["connect_timeout"] = float(connect_timeout)

            if role_arn := get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN):
                client_kwargs["role_arn"] = role_arn

            if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME):
                client_kwargs["session_name"] = session_name

            return S3FileSystem(**client_kwargs)
        elif scheme in ("hdfs", "viewfs"):
            from pyarrow.fs import HadoopFileSystem

            hdfs_kwargs: Dict[str, Any] = {}
            if netloc:
                return HadoopFileSystem.from_uri(f"{scheme}://{netloc}")
            if host := self.properties.get(HDFS_HOST):
                hdfs_kwargs["host"] = host
            if port := self.properties.get(HDFS_PORT):
                # port should be an integer type
                hdfs_kwargs["port"] = int(port)
            if user := self.properties.get(HDFS_USER):
                hdfs_kwargs["user"] = user
            if kerb_ticket := self.properties.get(HDFS_KERB_TICKET):
                hdfs_kwargs["kerb_ticket"] = kerb_ticket

            return HadoopFileSystem(**hdfs_kwargs)
        elif scheme in {"gs", "gcs"}:
            from pyarrow.fs import GcsFileSystem

            gcs_kwargs: Dict[str, Any] = {}
            if access_token := self.properties.get(GCS_TOKEN):
                gcs_kwargs["access_token"] = access_token
            if expiration := self.properties.get(GCS_TOKEN_EXPIRES_AT_MS):
                gcs_kwargs["credential_token_expiration"] = millis_to_datetime(int(expiration))
            if bucket_location := self.properties.get(GCS_DEFAULT_LOCATION):
                gcs_kwargs["default_bucket_location"] = bucket_location
            if endpoint := get_first_property_value(self.properties, GCS_SERVICE_HOST, GCS_ENDPOINT):
                if self.properties.get(GCS_ENDPOINT):
                    deprecation_message(
                        deprecated_in="0.8.0",
                        removed_in="0.9.0",
                        help_message=f"The property {GCS_ENDPOINT} is deprecated, please use {GCS_SERVICE_HOST} instead",
                    )
                url_parts = urlparse(endpoint)
                gcs_kwargs["scheme"] = url_parts.scheme
                gcs_kwargs["endpoint_override"] = url_parts.netloc

            return GcsFileSystem(**gcs_kwargs)
        elif scheme == "file":
            return PyArrowLocalFileSystem()
        else:
            raise ValueError(f"Unrecognized filesystem type in URI: {scheme}")

    def new_input(self, location: str) -> PyArrowFile:
        """Get a PyArrowFile instance to read bytes from the file at the given location.

        Args:
            location (str): A URI or a path to a local file.

        Returns:
            PyArrowFile: A PyArrowFile instance for the given location.
        """
        scheme, netloc, path = self.parse_location(location)
        return PyArrowFile(
            fs=self.fs_by_scheme(scheme, netloc),
            location=location,
            path=path,
            buffer_size=int(self.properties.get(BUFFER_SIZE, ONE_MEGABYTE)),
        )

    def new_output(self, location: str) -> PyArrowFile:
        """Get a PyArrowFile instance to write bytes to the file at the given location.

        Args:
            location (str): A URI or a path to a local file.

        Returns:
            PyArrowFile: A PyArrowFile instance for the given location.
        """
        scheme, netloc, path = self.parse_location(location)
        return PyArrowFile(
            fs=self.fs_by_scheme(scheme, netloc),
            location=location,
            path=path,
            buffer_size=int(self.properties.get(BUFFER_SIZE, ONE_MEGABYTE)),
        )

    def delete(self, location: Union[str, InputFile, OutputFile]) -> None:
        """Delete the file at the given location.

        Args:
            location (Union[str, InputFile, OutputFile]): The URI to the file--if an InputFile instance or an OutputFile instance is provided,
                the location attribute for that instance is used as the location to delete.

        Raises:
            FileNotFoundError: When the file at the provided location does not exist.
            PermissionError: If the file at the provided location cannot be accessed due to a permission error such as
                an AWS error code 15.
        """
        str_location = location.location if isinstance(location, (InputFile, OutputFile)) else location
        scheme, netloc, path = self.parse_location(str_location)
        fs = self.fs_by_scheme(scheme, netloc)

        try:
            fs.delete_file(path)
        except FileNotFoundError:
            raise
        except PermissionError:
            raise
        except OSError as e:
            if e.errno == 2 or "Path does not exist" in str(e):
                raise FileNotFoundError(f"Cannot delete file, does not exist: {location}") from e
            elif e.errno == 13 or "AWS Error [code 15]" in str(e):
                raise PermissionError(f"Cannot delete file, access denied: {location}") from e
            raise  # pragma: no cover - If some other kind of OSError, raise the raw error

    def __getstate__(self) -> Dict[str, Any]:
        """Create a dictionary of the PyArrowFileIO fields used when pickling."""
        fileio_copy = copy(self.__dict__)
        fileio_copy["fs_by_scheme"] = None
        return fileio_copy

    def __setstate__(self, state: Dict[str, Any]) -> None:
        """Deserialize the state into a PyArrowFileIO instance."""
        self.__dict__ = state
        self.fs_by_scheme = lru_cache(self._initialize_fs)

__getstate__()

Create a dictionary of the PyArrowFileIO fields used when pickling.

Source code in pyiceberg/io/pyarrow.py
def __getstate__(self) -> Dict[str, Any]:
    """Create a dictionary of the PyArrowFileIO fields used when pickling."""
    fileio_copy = copy(self.__dict__)
    fileio_copy["fs_by_scheme"] = None
    return fileio_copy

__setstate__(state)

Deserialize the state into a PyArrowFileIO instance.

Source code in pyiceberg/io/pyarrow.py
def __setstate__(self, state: Dict[str, Any]) -> None:
    """Deserialize the state into a PyArrowFileIO instance."""
    self.__dict__ = state
    self.fs_by_scheme = lru_cache(self._initialize_fs)

delete(location)

Delete the file at the given location.

Parameters:

Name Type Description Default
location Union[str, InputFile, OutputFile]

The URI to the file--if an InputFile instance or an OutputFile instance is provided, the location attribute for that instance is used as the location to delete.

required

Raises:

Type Description
FileNotFoundError

When the file at the provided location does not exist.

PermissionError

If the file at the provided location cannot be accessed due to a permission error such as an AWS error code 15.

Source code in pyiceberg/io/pyarrow.py
def delete(self, location: Union[str, InputFile, OutputFile]) -> None:
    """Delete the file at the given location.

    Args:
        location (Union[str, InputFile, OutputFile]): The URI to the file--if an InputFile instance or an OutputFile instance is provided,
            the location attribute for that instance is used as the location to delete.

    Raises:
        FileNotFoundError: When the file at the provided location does not exist.
        PermissionError: If the file at the provided location cannot be accessed due to a permission error such as
            an AWS error code 15.
    """
    str_location = location.location if isinstance(location, (InputFile, OutputFile)) else location
    scheme, netloc, path = self.parse_location(str_location)
    fs = self.fs_by_scheme(scheme, netloc)

    try:
        fs.delete_file(path)
    except FileNotFoundError:
        raise
    except PermissionError:
        raise
    except OSError as e:
        if e.errno == 2 or "Path does not exist" in str(e):
            raise FileNotFoundError(f"Cannot delete file, does not exist: {location}") from e
        elif e.errno == 13 or "AWS Error [code 15]" in str(e):
            raise PermissionError(f"Cannot delete file, access denied: {location}") from e
        raise  # pragma: no cover - If some other kind of OSError, raise the raw error

new_input(location)

Get a PyArrowFile instance to read bytes from the file at the given location.

Parameters:

Name Type Description Default
location str

A URI or a path to a local file.

required

Returns:

Name Type Description
PyArrowFile PyArrowFile

A PyArrowFile instance for the given location.

Source code in pyiceberg/io/pyarrow.py
def new_input(self, location: str) -> PyArrowFile:
    """Get a PyArrowFile instance to read bytes from the file at the given location.

    Args:
        location (str): A URI or a path to a local file.

    Returns:
        PyArrowFile: A PyArrowFile instance for the given location.
    """
    scheme, netloc, path = self.parse_location(location)
    return PyArrowFile(
        fs=self.fs_by_scheme(scheme, netloc),
        location=location,
        path=path,
        buffer_size=int(self.properties.get(BUFFER_SIZE, ONE_MEGABYTE)),
    )

new_output(location)

Get a PyArrowFile instance to write bytes to the file at the given location.

Parameters:

Name Type Description Default
location str

A URI or a path to a local file.

required

Returns:

Name Type Description
PyArrowFile PyArrowFile

A PyArrowFile instance for the given location.

Source code in pyiceberg/io/pyarrow.py
def new_output(self, location: str) -> PyArrowFile:
    """Get a PyArrowFile instance to write bytes to the file at the given location.

    Args:
        location (str): A URI or a path to a local file.

    Returns:
        PyArrowFile: A PyArrowFile instance for the given location.
    """
    scheme, netloc, path = self.parse_location(location)
    return PyArrowFile(
        fs=self.fs_by_scheme(scheme, netloc),
        location=location,
        path=path,
        buffer_size=int(self.properties.get(BUFFER_SIZE, ONE_MEGABYTE)),
    )

parse_location(location) staticmethod

Return the path without the scheme.

Source code in pyiceberg/io/pyarrow.py
@staticmethod
def parse_location(location: str) -> Tuple[str, str, str]:
    """Return the path without the scheme."""
    uri = urlparse(location)
    if not uri.scheme:
        return "file", uri.netloc, os.path.abspath(location)
    elif uri.scheme in ("hdfs", "viewfs"):
        return uri.scheme, uri.netloc, uri.path
    else:
        return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}"

PyArrowSchemaVisitor

Bases: Generic[T], ABC

Source code in pyiceberg/io/pyarrow.py
class PyArrowSchemaVisitor(Generic[T], ABC):
    def before_field(self, field: pa.Field) -> None:
        """Override this method to perform an action immediately before visiting a field."""

    def after_field(self, field: pa.Field) -> None:
        """Override this method to perform an action immediately after visiting a field."""

    def before_list_element(self, element: pa.Field) -> None:
        """Override this method to perform an action immediately before visiting an element within a ListType."""

    def after_list_element(self, element: pa.Field) -> None:
        """Override this method to perform an action immediately after visiting an element within a ListType."""

    def before_map_key(self, key: pa.Field) -> None:
        """Override this method to perform an action immediately before visiting a key within a MapType."""

    def after_map_key(self, key: pa.Field) -> None:
        """Override this method to perform an action immediately after visiting a key within a MapType."""

    def before_map_value(self, value: pa.Field) -> None:
        """Override this method to perform an action immediately before visiting a value within a MapType."""

    def after_map_value(self, value: pa.Field) -> None:
        """Override this method to perform an action immediately after visiting a value within a MapType."""

    @abstractmethod
    def schema(self, schema: pa.Schema, struct_result: T) -> T:
        """Visit a schema."""

    @abstractmethod
    def struct(self, struct: pa.StructType, field_results: List[T]) -> T:
        """Visit a struct."""

    @abstractmethod
    def field(self, field: pa.Field, field_result: T) -> T:
        """Visit a field."""

    @abstractmethod
    def list(self, list_type: pa.ListType, element_result: T) -> T:
        """Visit a list."""

    @abstractmethod
    def map(self, map_type: pa.MapType, key_result: T, value_result: T) -> T:
        """Visit a map."""

    @abstractmethod
    def primitive(self, primitive: pa.DataType) -> T:
        """Visit a primitive type."""

after_field(field)

Override this method to perform an action immediately after visiting a field.

Source code in pyiceberg/io/pyarrow.py
def after_field(self, field: pa.Field) -> None:
    """Override this method to perform an action immediately after visiting a field."""

after_list_element(element)

Override this method to perform an action immediately after visiting an element within a ListType.

Source code in pyiceberg/io/pyarrow.py
def after_list_element(self, element: pa.Field) -> None:
    """Override this method to perform an action immediately after visiting an element within a ListType."""

after_map_key(key)

Override this method to perform an action immediately after visiting a key within a MapType.

Source code in pyiceberg/io/pyarrow.py
def after_map_key(self, key: pa.Field) -> None:
    """Override this method to perform an action immediately after visiting a key within a MapType."""

after_map_value(value)

Override this method to perform an action immediately after visiting a value within a MapType.

Source code in pyiceberg/io/pyarrow.py
def after_map_value(self, value: pa.Field) -> None:
    """Override this method to perform an action immediately after visiting a value within a MapType."""

before_field(field)

Override this method to perform an action immediately before visiting a field.

Source code in pyiceberg/io/pyarrow.py
def before_field(self, field: pa.Field) -> None:
    """Override this method to perform an action immediately before visiting a field."""

before_list_element(element)

Override this method to perform an action immediately before visiting an element within a ListType.

Source code in pyiceberg/io/pyarrow.py
def before_list_element(self, element: pa.Field) -> None:
    """Override this method to perform an action immediately before visiting an element within a ListType."""

before_map_key(key)

Override this method to perform an action immediately before visiting a key within a MapType.

Source code in pyiceberg/io/pyarrow.py
def before_map_key(self, key: pa.Field) -> None:
    """Override this method to perform an action immediately before visiting a key within a MapType."""

before_map_value(value)

Override this method to perform an action immediately before visiting a value within a MapType.

Source code in pyiceberg/io/pyarrow.py
def before_map_value(self, value: pa.Field) -> None:
    """Override this method to perform an action immediately before visiting a value within a MapType."""

field(field, field_result) abstractmethod

Visit a field.

Source code in pyiceberg/io/pyarrow.py
@abstractmethod
def field(self, field: pa.Field, field_result: T) -> T:
    """Visit a field."""

list(list_type, element_result) abstractmethod

Visit a list.

Source code in pyiceberg/io/pyarrow.py
@abstractmethod
def list(self, list_type: pa.ListType, element_result: T) -> T:
    """Visit a list."""

map(map_type, key_result, value_result) abstractmethod

Visit a map.

Source code in pyiceberg/io/pyarrow.py
@abstractmethod
def map(self, map_type: pa.MapType, key_result: T, value_result: T) -> T:
    """Visit a map."""

primitive(primitive) abstractmethod

Visit a primitive type.

Source code in pyiceberg/io/pyarrow.py
@abstractmethod
def primitive(self, primitive: pa.DataType) -> T:
    """Visit a primitive type."""

schema(schema, struct_result) abstractmethod

Visit a schema.

Source code in pyiceberg/io/pyarrow.py
@abstractmethod
def schema(self, schema: pa.Schema, struct_result: T) -> T:
    """Visit a schema."""

struct(struct, field_results) abstractmethod

Visit a struct.

Source code in pyiceberg/io/pyarrow.py
@abstractmethod
def struct(self, struct: pa.StructType, field_results: List[T]) -> T:
    """Visit a struct."""

compute_statistics_plan(schema, table_properties)

Compute the statistics plan for all columns.

The resulting list is assumed to have the same length and same order as the columns in the pyarrow table. This allows the list to map from the column index to the Iceberg column ID. For each element, the desired metrics collection that was provided by the user in the configuration is computed and then adjusted according to the data type of the column. For nested columns the minimum and maximum values are not computed. And truncation is only applied to text of binary strings.

Parameters:

Name Type Description Default
table_properties from pyiceberg.table.metadata.TableMetadata

The Iceberg table metadata properties. They are required to compute the mapping of column position to iceberg schema type id. It's also used to set the mode for column metrics collection

required
Source code in pyiceberg/io/pyarrow.py
def compute_statistics_plan(
    schema: Schema,
    table_properties: Dict[str, str],
) -> Dict[int, StatisticsCollector]:
    """
    Compute the statistics plan for all columns.

    The resulting list is assumed to have the same length and same order as the columns in the pyarrow table.
    This allows the list to map from the column index to the Iceberg column ID.
    For each element, the desired metrics collection that was provided by the user in the configuration
    is computed and then adjusted according to the data type of the column. For nested columns the minimum
    and maximum values are not computed. And truncation is only applied to text of binary strings.

    Args:
        table_properties (from pyiceberg.table.metadata.TableMetadata): The Iceberg table metadata properties.
            They are required to compute the mapping of column position to iceberg schema type id. It's also
            used to set the mode for column metrics collection
    """
    stats_cols = pre_order_visit(schema, PyArrowStatisticsCollector(schema, table_properties))
    result: Dict[int, StatisticsCollector] = {}
    for stats_col in stats_cols:
        result[stats_col.field_id] = stats_col
    return result

data_file_statistics_from_parquet_metadata(parquet_metadata, stats_columns, parquet_column_mapping)

Compute and return DataFileStatistics that includes the following.

  • record_count
  • column_sizes
  • value_counts
  • null_value_counts
  • nan_value_counts
  • column_aggregates
  • split_offsets

Parameters:

Name Type Description Default
parquet_metadata FileMetaData

A pyarrow metadata object.

required
stats_columns Dict[int, StatisticsCollector]

The statistics gathering plan. It is required to set the mode for column metrics collection

required
parquet_column_mapping Dict[str, int]

The mapping of the parquet file name to the field ID

required
Source code in pyiceberg/io/pyarrow.py
def data_file_statistics_from_parquet_metadata(
    parquet_metadata: pq.FileMetaData,
    stats_columns: Dict[int, StatisticsCollector],
    parquet_column_mapping: Dict[str, int],
) -> DataFileStatistics:
    """
    Compute and return DataFileStatistics that includes the following.

    - record_count
    - column_sizes
    - value_counts
    - null_value_counts
    - nan_value_counts
    - column_aggregates
    - split_offsets

    Args:
        parquet_metadata (pyarrow.parquet.FileMetaData): A pyarrow metadata object.
        stats_columns (Dict[int, StatisticsCollector]): The statistics gathering plan. It is required to
            set the mode for column metrics collection
        parquet_column_mapping (Dict[str, int]): The mapping of the parquet file name to the field ID
    """
    column_sizes: Dict[int, int] = {}
    value_counts: Dict[int, int] = {}
    split_offsets: List[int] = []

    null_value_counts: Dict[int, int] = {}
    nan_value_counts: Dict[int, int] = {}

    col_aggs = {}

    invalidate_col: Set[int] = set()
    for r in range(parquet_metadata.num_row_groups):
        # References:
        # https://github.com/apache/iceberg/blob/fc381a81a1fdb8f51a0637ca27cd30673bd7aad3/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java#L232
        # https://github.com/apache/parquet-mr/blob/ac29db4611f86a07cc6877b416aa4b183e09b353/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java#L184

        row_group = parquet_metadata.row_group(r)

        data_offset = row_group.column(0).data_page_offset
        dictionary_offset = row_group.column(0).dictionary_page_offset

        if row_group.column(0).has_dictionary_page and dictionary_offset < data_offset:
            split_offsets.append(dictionary_offset)
        else:
            split_offsets.append(data_offset)

        for pos in range(parquet_metadata.num_columns):
            column = row_group.column(pos)
            field_id = parquet_column_mapping[column.path_in_schema]

            stats_col = stats_columns[field_id]

            column_sizes.setdefault(field_id, 0)
            column_sizes[field_id] += column.total_compressed_size

            if stats_col.mode == MetricsMode(MetricModeTypes.NONE):
                continue

            value_counts[field_id] = value_counts.get(field_id, 0) + column.num_values

            if column.is_stats_set:
                try:
                    statistics = column.statistics

                    if statistics.has_null_count:
                        null_value_counts[field_id] = null_value_counts.get(field_id, 0) + statistics.null_count

                    if stats_col.mode == MetricsMode(MetricModeTypes.COUNTS):
                        continue

                    if field_id not in col_aggs:
                        col_aggs[field_id] = StatsAggregator(
                            stats_col.iceberg_type, statistics.physical_type, stats_col.mode.length
                        )

                    col_aggs[field_id].update_min(statistics.min)
                    col_aggs[field_id].update_max(statistics.max)

                except pyarrow.lib.ArrowNotImplementedError as e:
                    invalidate_col.add(field_id)
                    logger.warning(e)
            else:
                invalidate_col.add(field_id)
                logger.warning("PyArrow statistics missing for column %d when writing file", pos)

    split_offsets.sort()

    for field_id in invalidate_col:
        col_aggs.pop(field_id, None)
        null_value_counts.pop(field_id, None)

    return DataFileStatistics(
        record_count=parquet_metadata.num_rows,
        column_sizes=column_sizes,
        value_counts=value_counts,
        null_value_counts=null_value_counts,
        nan_value_counts=nan_value_counts,
        column_aggregates=col_aggs,
        split_offsets=split_offsets,
    )

parquet_path_to_id_mapping(schema)

Compute the mapping of parquet column path to Iceberg ID.

For each column, the parquet file metadata has a path_in_schema attribute that follows a specific naming scheme for nested columnds. This function computes a mapping of the full paths to the corresponding Iceberg IDs.

Parameters:

Name Type Description Default
schema Schema

The current table schema.

required
Source code in pyiceberg/io/pyarrow.py
def parquet_path_to_id_mapping(
    schema: Schema,
) -> Dict[str, int]:
    """
    Compute the mapping of parquet column path to Iceberg ID.

    For each column, the parquet file metadata has a path_in_schema attribute that follows
    a specific naming scheme for nested columnds. This function computes a mapping of
    the full paths to the corresponding Iceberg IDs.

    Args:
        schema (pyiceberg.schema.Schema): The current table schema.
    """
    result: Dict[str, int] = {}
    for pair in pre_order_visit(schema, ID2ParquetPathVisitor()):
        result[pair.parquet_path] = pair.field_id
    return result

project_batches(tasks, table_metadata, io, row_filter, projected_schema, case_sensitive=True, limit=None)

Resolve the right columns based on the identifier.

Parameters:

Name Type Description Default
tasks Iterable[FileScanTask]

A URI or a path to a local file.

required
table_metadata TableMetadata

The table metadata of the table that's being queried

required
io FileIO

A FileIO to open streams to the object store

required
row_filter BooleanExpression

The expression for filtering rows.

required
projected_schema Schema

The output schema.

required
case_sensitive bool

Case sensitivity when looking up column names.

True
limit Optional[int]

Limit the number of records.

None

Raises:

Type Description
ResolveError

When an incompatible query is done.

Source code in pyiceberg/io/pyarrow.py
@deprecated(
    deprecated_in="0.8.0",
    removed_in="0.9.0",
    help_message="project_table is deprecated. Use ArrowScan.to_record_batches instead.",
)
def project_batches(
    tasks: Iterable[FileScanTask],
    table_metadata: TableMetadata,
    io: FileIO,
    row_filter: BooleanExpression,
    projected_schema: Schema,
    case_sensitive: bool = True,
    limit: Optional[int] = None,
) -> Iterator[pa.RecordBatch]:
    """Resolve the right columns based on the identifier.

    Args:
        tasks (Iterable[FileScanTask]): A URI or a path to a local file.
        table_metadata (TableMetadata): The table metadata of the table that's being queried
        io (FileIO): A FileIO to open streams to the object store
        row_filter (BooleanExpression): The expression for filtering rows.
        projected_schema (Schema): The output schema.
        case_sensitive (bool): Case sensitivity when looking up column names.
        limit (Optional[int]): Limit the number of records.

    Raises:
        ResolveError: When an incompatible query is done.
    """
    scheme, netloc, _ = PyArrowFileIO.parse_location(table_metadata.location)
    if isinstance(io, PyArrowFileIO):
        fs = io.fs_by_scheme(scheme, netloc)
    else:
        try:
            from pyiceberg.io.fsspec import FsspecFileIO

            if isinstance(io, FsspecFileIO):
                from pyarrow.fs import PyFileSystem

                fs = PyFileSystem(FSSpecHandler(io.get_fs(scheme)))
            else:
                raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}")
        except ModuleNotFoundError as e:
            # When FsSpec is not installed
            raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}") from e

    use_large_types = property_as_bool(io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, True)

    bound_row_filter = bind(table_metadata.schema(), row_filter, case_sensitive=case_sensitive)

    projected_field_ids = {
        id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType))
    }.union(extract_field_ids(bound_row_filter))

    deletes_per_file = _read_all_delete_files(fs, tasks)

    total_row_count = 0

    for task in tasks:
        # stop early if limit is satisfied
        if limit is not None and total_row_count >= limit:
            break
        batches = _task_to_record_batches(
            fs,
            task,
            bound_row_filter,
            projected_schema,
            projected_field_ids,
            deletes_per_file.get(task.file.file_path),
            case_sensitive,
            table_metadata.name_mapping(),
            use_large_types,
        )
        for batch in batches:
            if limit is not None:
                if total_row_count >= limit:
                    break
                elif total_row_count + len(batch) >= limit:
                    batch = batch.slice(0, limit - total_row_count)
            yield batch
            total_row_count += len(batch)

project_table(tasks, table_metadata, io, row_filter, projected_schema, case_sensitive=True, limit=None)

Resolve the right columns based on the identifier.

Parameters:

Name Type Description Default
tasks Iterable[FileScanTask]

A URI or a path to a local file.

required
table_metadata TableMetadata

The table metadata of the table that's being queried

required
io FileIO

A FileIO to open streams to the object store

required
row_filter BooleanExpression

The expression for filtering rows.

required
projected_schema Schema

The output schema.

required
case_sensitive bool

Case sensitivity when looking up column names.

True
limit Optional[int]

Limit the number of records.

None

Raises:

Type Description
ResolveError

When an incompatible query is done.

Source code in pyiceberg/io/pyarrow.py
@deprecated(
    deprecated_in="0.8.0",
    removed_in="0.9.0",
    help_message="project_table is deprecated. Use ArrowScan.to_table instead.",
)
def project_table(
    tasks: Iterable[FileScanTask],
    table_metadata: TableMetadata,
    io: FileIO,
    row_filter: BooleanExpression,
    projected_schema: Schema,
    case_sensitive: bool = True,
    limit: Optional[int] = None,
) -> pa.Table:
    """Resolve the right columns based on the identifier.

    Args:
        tasks (Iterable[FileScanTask]): A URI or a path to a local file.
        table_metadata (TableMetadata): The table metadata of the table that's being queried
        io (FileIO): A FileIO to open streams to the object store
        row_filter (BooleanExpression): The expression for filtering rows.
        projected_schema (Schema): The output schema.
        case_sensitive (bool): Case sensitivity when looking up column names.
        limit (Optional[int]): Limit the number of records.

    Raises:
        ResolveError: When an incompatible query is done.
    """
    scheme, netloc, _ = PyArrowFileIO.parse_location(table_metadata.location)
    if isinstance(io, PyArrowFileIO):
        fs = io.fs_by_scheme(scheme, netloc)
    else:
        try:
            from pyiceberg.io.fsspec import FsspecFileIO

            if isinstance(io, FsspecFileIO):
                from pyarrow.fs import PyFileSystem

                fs = PyFileSystem(FSSpecHandler(io.get_fs(scheme)))
            else:
                raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}")
        except ModuleNotFoundError as e:
            # When FsSpec is not installed
            raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}") from e

    use_large_types = property_as_bool(io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, True)

    bound_row_filter = bind(table_metadata.schema(), row_filter, case_sensitive=case_sensitive)

    projected_field_ids = {
        id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType))
    }.union(extract_field_ids(bound_row_filter))

    deletes_per_file = _read_all_delete_files(fs, tasks)
    executor = ExecutorFactory.get_or_create()
    futures = [
        executor.submit(
            _task_to_table,
            fs,
            task,
            bound_row_filter,
            projected_schema,
            projected_field_ids,
            deletes_per_file.get(task.file.file_path),
            case_sensitive,
            table_metadata.name_mapping(),
            use_large_types,
        )
        for task in tasks
    ]
    total_row_count = 0
    # for consistent ordering, we need to maintain future order
    futures_index = {f: i for i, f in enumerate(futures)}
    completed_futures: SortedList[Future[pa.Table]] = SortedList(iterable=[], key=lambda f: futures_index[f])
    for future in concurrent.futures.as_completed(futures):
        completed_futures.add(future)
        if table_result := future.result():
            total_row_count += len(table_result)
        # stop early if limit is satisfied
        if limit is not None and total_row_count >= limit:
            break

    # by now, we've either completed all tasks or satisfied the limit
    if limit is not None:
        _ = [f.cancel() for f in futures if not f.done()]

    tables = [f.result() for f in completed_futures if f.result()]

    if len(tables) < 1:
        return pa.Table.from_batches([], schema=schema_to_pyarrow(projected_schema, include_field_ids=False))

    result = pa.concat_tables(tables, promote_options="permissive")

    if limit is not None:
        return result.slice(0, limit)

    return result

visit_pyarrow(obj, visitor)

Apply a pyarrow schema visitor to any point within a schema.

The function traverses the schema in post-order fashion.

Parameters:

Name Type Description Default
obj Union[DataType, Schema]

An instance of a Schema or an IcebergType.

required
visitor PyArrowSchemaVisitor[T]

An instance of an implementation of the generic PyarrowSchemaVisitor base class.

required

Raises:

Type Description
NotImplementedError

If attempting to visit an unrecognized object type.

Source code in pyiceberg/io/pyarrow.py
@singledispatch
def visit_pyarrow(obj: Union[pa.DataType, pa.Schema], visitor: PyArrowSchemaVisitor[T]) -> T:
    """Apply a pyarrow schema visitor to any point within a schema.

    The function traverses the schema in post-order fashion.

    Args:
        obj (Union[pa.DataType, pa.Schema]): An instance of a Schema or an IcebergType.
        visitor (PyArrowSchemaVisitor[T]): An instance of an implementation of the generic PyarrowSchemaVisitor base class.

    Raises:
        NotImplementedError: If attempting to visit an unrecognized object type.
    """
    raise NotImplementedError(f"Cannot visit non-type: {obj}")