Skip to content

pyarrow

FileIO implementation for reading and writing table files that uses pyarrow.fs.

This file contains a FileIO implementation that relies on the filesystem interface provided by PyArrow. It relies on PyArrow's from_uri method that infers the correct filesystem type to use. Theoretically, this allows the supported storage types to grow naturally with the pyarrow library.

ArrowScan

Source code in pyiceberg/io/pyarrow.py
class ArrowScan:
    _table_metadata: TableMetadata
    _io: FileIO
    _projected_schema: Schema
    _bound_row_filter: BooleanExpression
    _case_sensitive: bool
    _limit: Optional[int]
    """Scan the Iceberg Table and create an Arrow construct.

    Attributes:
        _table_metadata: Current table metadata of the Iceberg table
        _io: PyIceberg FileIO implementation from which to fetch the io properties
        _projected_schema: Iceberg Schema to project onto the data files
        _bound_row_filter: Schema bound row expression to filter the data with
        _case_sensitive: Case sensitivity when looking up column names
        _limit: Limit the number of records.
    """

    def __init__(
        self,
        table_metadata: TableMetadata,
        io: FileIO,
        projected_schema: Schema,
        row_filter: BooleanExpression,
        case_sensitive: bool = True,
        limit: Optional[int] = None,
    ) -> None:
        self._table_metadata = table_metadata
        self._io = io
        self._projected_schema = projected_schema
        self._bound_row_filter = bind(table_metadata.schema(), row_filter, case_sensitive=case_sensitive)
        self._case_sensitive = case_sensitive
        self._limit = limit

    @property
    def _use_large_types(self) -> bool:
        """Whether to represent data as large arrow types.

        Defaults to True.
        """
        return property_as_bool(self._io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, True)

    @property
    def _projected_field_ids(self) -> Set[int]:
        """Set of field IDs that should be projected from the data files."""
        return {
            id
            for id in self._projected_schema.field_ids
            if not isinstance(self._projected_schema.find_type(id), (MapType, ListType))
        }.union(extract_field_ids(self._bound_row_filter))

    def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
        """Scan the Iceberg table and return a pa.Table.

        Returns a pa.Table with data from the Iceberg table by resolving the
        right columns that match the current table schema. Only data that
        matches the provided row_filter expression is returned.

        Args:
            tasks: FileScanTasks representing the data files and delete files to read from.

        Returns:
            A PyArrow table. Total number of rows will be capped if specified.

        Raises:
            ResolveError: When a required field cannot be found in the file
            ValueError: When a field type in the file cannot be projected to the schema type
        """
        deletes_per_file = _read_all_delete_files(self._io, tasks)
        executor = ExecutorFactory.get_or_create()

        def _table_from_scan_task(task: FileScanTask) -> pa.Table:
            batches = list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file))
            if len(batches) > 0:
                return pa.Table.from_batches(batches)
            else:
                return None

        futures = [
            executor.submit(
                _table_from_scan_task,
                task,
            )
            for task in tasks
        ]
        total_row_count = 0
        # for consistent ordering, we need to maintain future order
        futures_index = {f: i for i, f in enumerate(futures)}
        completed_futures: SortedList[Future[pa.Table]] = SortedList(iterable=[], key=lambda f: futures_index[f])
        for future in concurrent.futures.as_completed(futures):
            completed_futures.add(future)
            if table_result := future.result():
                total_row_count += len(table_result)
            # stop early if limit is satisfied
            if self._limit is not None and total_row_count >= self._limit:
                break

        # by now, we've either completed all tasks or satisfied the limit
        if self._limit is not None:
            _ = [f.cancel() for f in futures if not f.done()]

        tables = [f.result() for f in completed_futures if f.result()]

        if len(tables) < 1:
            return pa.Table.from_batches([], schema=schema_to_pyarrow(self._projected_schema, include_field_ids=False))

        result = pa.concat_tables(tables, promote_options="permissive")

        if self._limit is not None:
            return result.slice(0, self._limit)

        return result

    def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.RecordBatch]:
        """Scan the Iceberg table and return an Iterator[pa.RecordBatch].

        Returns an Iterator of pa.RecordBatch with data from the Iceberg table
        by resolving the right columns that match the current table schema.
        Only data that matches the provided row_filter expression is returned.

        Args:
            tasks: FileScanTasks representing the data files and delete files to read from.

        Returns:
            An Iterator of PyArrow RecordBatches.
            Total number of rows will be capped if specified.

        Raises:
            ResolveError: When a required field cannot be found in the file
            ValueError: When a field type in the file cannot be projected to the schema type
        """
        deletes_per_file = _read_all_delete_files(self._io, tasks)
        return self._record_batches_from_scan_tasks_and_deletes(tasks, deletes_per_file)

    def _record_batches_from_scan_tasks_and_deletes(
        self, tasks: Iterable[FileScanTask], deletes_per_file: Dict[str, List[ChunkedArray]]
    ) -> Iterator[pa.RecordBatch]:
        total_row_count = 0
        for task in tasks:
            if self._limit is not None and total_row_count >= self._limit:
                break
            batches = _task_to_record_batches(
                _fs_from_file_path(self._io, task.file.file_path),
                task,
                self._bound_row_filter,
                self._projected_schema,
                self._projected_field_ids,
                deletes_per_file.get(task.file.file_path),
                self._case_sensitive,
                self._table_metadata.name_mapping(),
                self._use_large_types,
                self._table_metadata.spec(),
            )
            for batch in batches:
                if self._limit is not None:
                    if total_row_count >= self._limit:
                        break
                    elif total_row_count + len(batch) >= self._limit:
                        batch = batch.slice(0, self._limit - total_row_count)
                yield batch
                total_row_count += len(batch)

_limit = limit instance-attribute

Scan the Iceberg Table and create an Arrow construct.

Attributes:

Name Type Description
_table_metadata

Current table metadata of the Iceberg table

_io

PyIceberg FileIO implementation from which to fetch the io properties

_projected_schema

Iceberg Schema to project onto the data files

_bound_row_filter

Schema bound row expression to filter the data with

_case_sensitive

Case sensitivity when looking up column names

_limit

Limit the number of records.

_projected_field_ids property

Set of field IDs that should be projected from the data files.

_use_large_types property

Whether to represent data as large arrow types.

Defaults to True.

to_record_batches(tasks)

Scan the Iceberg table and return an Iterator[pa.RecordBatch].

Returns an Iterator of pa.RecordBatch with data from the Iceberg table by resolving the right columns that match the current table schema. Only data that matches the provided row_filter expression is returned.

Parameters:

Name Type Description Default
tasks Iterable[FileScanTask]

FileScanTasks representing the data files and delete files to read from.

required

Returns:

Type Description
Iterator[RecordBatch]

An Iterator of PyArrow RecordBatches.

Iterator[RecordBatch]

Total number of rows will be capped if specified.

Raises:

Type Description
ResolveError

When a required field cannot be found in the file

ValueError

When a field type in the file cannot be projected to the schema type

Source code in pyiceberg/io/pyarrow.py
def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.RecordBatch]:
    """Scan the Iceberg table and return an Iterator[pa.RecordBatch].

    Returns an Iterator of pa.RecordBatch with data from the Iceberg table
    by resolving the right columns that match the current table schema.
    Only data that matches the provided row_filter expression is returned.

    Args:
        tasks: FileScanTasks representing the data files and delete files to read from.

    Returns:
        An Iterator of PyArrow RecordBatches.
        Total number of rows will be capped if specified.

    Raises:
        ResolveError: When a required field cannot be found in the file
        ValueError: When a field type in the file cannot be projected to the schema type
    """
    deletes_per_file = _read_all_delete_files(self._io, tasks)
    return self._record_batches_from_scan_tasks_and_deletes(tasks, deletes_per_file)

to_table(tasks)

Scan the Iceberg table and return a pa.Table.

Returns a pa.Table with data from the Iceberg table by resolving the right columns that match the current table schema. Only data that matches the provided row_filter expression is returned.

Parameters:

Name Type Description Default
tasks Iterable[FileScanTask]

FileScanTasks representing the data files and delete files to read from.

required

Returns:

Type Description
Table

A PyArrow table. Total number of rows will be capped if specified.

Raises:

Type Description
ResolveError

When a required field cannot be found in the file

ValueError

When a field type in the file cannot be projected to the schema type

Source code in pyiceberg/io/pyarrow.py
def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
    """Scan the Iceberg table and return a pa.Table.

    Returns a pa.Table with data from the Iceberg table by resolving the
    right columns that match the current table schema. Only data that
    matches the provided row_filter expression is returned.

    Args:
        tasks: FileScanTasks representing the data files and delete files to read from.

    Returns:
        A PyArrow table. Total number of rows will be capped if specified.

    Raises:
        ResolveError: When a required field cannot be found in the file
        ValueError: When a field type in the file cannot be projected to the schema type
    """
    deletes_per_file = _read_all_delete_files(self._io, tasks)
    executor = ExecutorFactory.get_or_create()

    def _table_from_scan_task(task: FileScanTask) -> pa.Table:
        batches = list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file))
        if len(batches) > 0:
            return pa.Table.from_batches(batches)
        else:
            return None

    futures = [
        executor.submit(
            _table_from_scan_task,
            task,
        )
        for task in tasks
    ]
    total_row_count = 0
    # for consistent ordering, we need to maintain future order
    futures_index = {f: i for i, f in enumerate(futures)}
    completed_futures: SortedList[Future[pa.Table]] = SortedList(iterable=[], key=lambda f: futures_index[f])
    for future in concurrent.futures.as_completed(futures):
        completed_futures.add(future)
        if table_result := future.result():
            total_row_count += len(table_result)
        # stop early if limit is satisfied
        if self._limit is not None and total_row_count >= self._limit:
            break

    # by now, we've either completed all tasks or satisfied the limit
    if self._limit is not None:
        _ = [f.cancel() for f in futures if not f.done()]

    tables = [f.result() for f in completed_futures if f.result()]

    if len(tables) < 1:
        return pa.Table.from_batches([], schema=schema_to_pyarrow(self._projected_schema, include_field_ids=False))

    result = pa.concat_tables(tables, promote_options="permissive")

    if self._limit is not None:
        return result.slice(0, self._limit)

    return result

PyArrowFile

Bases: InputFile, OutputFile

A combined InputFile and OutputFile implementation that uses a pyarrow filesystem to generate pyarrow.lib.NativeFile instances.

Parameters:

Name Type Description Default
location str

A URI or a path to a local file.

required

Attributes:

Name Type Description
location(str)

The URI or path to a local file for a PyArrowFile instance.

Examples:

>>> from pyiceberg.io.pyarrow import PyArrowFile
>>> # input_file = PyArrowFile("s3://foo/bar.txt")
>>> # Read the contents of the PyArrowFile instance
>>> # Make sure that you have permissions to read/write
>>> # file_content = input_file.open().read()
>>> # output_file = PyArrowFile("s3://baz/qux.txt")
>>> # Write bytes to a file
>>> # Make sure that you have permissions to read/write
>>> # output_file.create().write(b'foobytes')
Source code in pyiceberg/io/pyarrow.py
class PyArrowFile(InputFile, OutputFile):
    """A combined InputFile and OutputFile implementation that uses a pyarrow filesystem to generate pyarrow.lib.NativeFile instances.

    Args:
        location (str): A URI or a path to a local file.

    Attributes:
        location(str): The URI or path to a local file for a PyArrowFile instance.

    Examples:
        >>> from pyiceberg.io.pyarrow import PyArrowFile
        >>> # input_file = PyArrowFile("s3://foo/bar.txt")
        >>> # Read the contents of the PyArrowFile instance
        >>> # Make sure that you have permissions to read/write
        >>> # file_content = input_file.open().read()

        >>> # output_file = PyArrowFile("s3://baz/qux.txt")
        >>> # Write bytes to a file
        >>> # Make sure that you have permissions to read/write
        >>> # output_file.create().write(b'foobytes')
    """

    _filesystem: FileSystem
    _path: str
    _buffer_size: int

    def __init__(self, location: str, path: str, fs: FileSystem, buffer_size: int = ONE_MEGABYTE):
        self._filesystem = fs
        self._path = path
        self._buffer_size = buffer_size
        super().__init__(location=location)

    def _file_info(self) -> FileInfo:
        """Retrieve a pyarrow.fs.FileInfo object for the location.

        Raises:
            PermissionError: If the file at self.location cannot be accessed due to a permission error such as
                an AWS error code 15.
        """
        try:
            file_info = self._filesystem.get_file_info(self._path)
        except OSError as e:
            if e.errno == 13 or "AWS Error [code 15]" in str(e):
                raise PermissionError(f"Cannot get file info, access denied: {self.location}") from e
            raise  # pragma: no cover - If some other kind of OSError, raise the raw error

        if file_info.type == FileType.NotFound:
            raise FileNotFoundError(f"Cannot get file info, file not found: {self.location}")
        return file_info

    def __len__(self) -> int:
        """Return the total length of the file, in bytes."""
        file_info = self._file_info()
        return file_info.size

    def exists(self) -> bool:
        """Check whether the location exists."""
        try:
            self._file_info()  # raises FileNotFoundError if it does not exist
            return True
        except FileNotFoundError:
            return False

    def open(self, seekable: bool = True) -> InputStream:
        """Open the location using a PyArrow FileSystem inferred from the location.

        Args:
            seekable: If the stream should support seek, or if it is consumed sequential.

        Returns:
            pyarrow.lib.NativeFile: A NativeFile instance for the file located at `self.location`.

        Raises:
            FileNotFoundError: If the file at self.location does not exist.
            PermissionError: If the file at self.location cannot be accessed due to a permission error such as
                an AWS error code 15.
        """
        try:
            if seekable:
                input_file = self._filesystem.open_input_file(self._path)
            else:
                input_file = self._filesystem.open_input_stream(self._path, buffer_size=self._buffer_size)
        except FileNotFoundError:
            raise
        except PermissionError:
            raise
        except OSError as e:
            if e.errno == 2 or "Path does not exist" in str(e):
                raise FileNotFoundError(f"Cannot open file, does not exist: {self.location}") from e
            elif e.errno == 13 or "AWS Error [code 15]" in str(e):
                raise PermissionError(f"Cannot open file, access denied: {self.location}") from e
            raise  # pragma: no cover - If some other kind of OSError, raise the raw error
        return input_file

    def create(self, overwrite: bool = False) -> OutputStream:
        """Create a writable pyarrow.lib.NativeFile for this PyArrowFile's location.

        Args:
            overwrite (bool): Whether to overwrite the file if it already exists.

        Returns:
            pyarrow.lib.NativeFile: A NativeFile instance for the file located at self.location.

        Raises:
            FileExistsError: If the file already exists at `self.location` and `overwrite` is False.

        Note:
            This retrieves a pyarrow NativeFile by opening an output stream. If overwrite is set to False,
            a check is first performed to verify that the file does not exist. This is not thread-safe and
            a possibility does exist that the file can be created by a concurrent process after the existence
            check yet before the output stream is created. In such a case, the default pyarrow behavior will
            truncate the contents of the existing file when opening the output stream.
        """
        try:
            if not overwrite and self.exists() is True:
                raise FileExistsError(f"Cannot create file, already exists: {self.location}")
            output_file = self._filesystem.open_output_stream(self._path, buffer_size=self._buffer_size)
        except PermissionError:
            raise
        except OSError as e:
            if e.errno == 13 or "AWS Error [code 15]" in str(e):
                raise PermissionError(f"Cannot create file, access denied: {self.location}") from e
            raise  # pragma: no cover - If some other kind of OSError, raise the raw error
        return output_file

    def to_input_file(self) -> PyArrowFile:
        """Return a new PyArrowFile for the location of an existing PyArrowFile instance.

        This method is included to abide by the OutputFile abstract base class. Since this implementation uses a single
        PyArrowFile class (as opposed to separate InputFile and OutputFile implementations), this method effectively returns
        a copy of the same instance.
        """
        return self

__len__()

Return the total length of the file, in bytes.

Source code in pyiceberg/io/pyarrow.py
def __len__(self) -> int:
    """Return the total length of the file, in bytes."""
    file_info = self._file_info()
    return file_info.size

_file_info()

Retrieve a pyarrow.fs.FileInfo object for the location.

Raises:

Type Description
PermissionError

If the file at self.location cannot be accessed due to a permission error such as an AWS error code 15.

Source code in pyiceberg/io/pyarrow.py
def _file_info(self) -> FileInfo:
    """Retrieve a pyarrow.fs.FileInfo object for the location.

    Raises:
        PermissionError: If the file at self.location cannot be accessed due to a permission error such as
            an AWS error code 15.
    """
    try:
        file_info = self._filesystem.get_file_info(self._path)
    except OSError as e:
        if e.errno == 13 or "AWS Error [code 15]" in str(e):
            raise PermissionError(f"Cannot get file info, access denied: {self.location}") from e
        raise  # pragma: no cover - If some other kind of OSError, raise the raw error

    if file_info.type == FileType.NotFound:
        raise FileNotFoundError(f"Cannot get file info, file not found: {self.location}")
    return file_info

create(overwrite=False)

Create a writable pyarrow.lib.NativeFile for this PyArrowFile's location.

Parameters:

Name Type Description Default
overwrite bool

Whether to overwrite the file if it already exists.

False

Returns:

Type Description
OutputStream

pyarrow.lib.NativeFile: A NativeFile instance for the file located at self.location.

Raises:

Type Description
FileExistsError

If the file already exists at self.location and overwrite is False.

Note

This retrieves a pyarrow NativeFile by opening an output stream. If overwrite is set to False, a check is first performed to verify that the file does not exist. This is not thread-safe and a possibility does exist that the file can be created by a concurrent process after the existence check yet before the output stream is created. In such a case, the default pyarrow behavior will truncate the contents of the existing file when opening the output stream.

Source code in pyiceberg/io/pyarrow.py
def create(self, overwrite: bool = False) -> OutputStream:
    """Create a writable pyarrow.lib.NativeFile for this PyArrowFile's location.

    Args:
        overwrite (bool): Whether to overwrite the file if it already exists.

    Returns:
        pyarrow.lib.NativeFile: A NativeFile instance for the file located at self.location.

    Raises:
        FileExistsError: If the file already exists at `self.location` and `overwrite` is False.

    Note:
        This retrieves a pyarrow NativeFile by opening an output stream. If overwrite is set to False,
        a check is first performed to verify that the file does not exist. This is not thread-safe and
        a possibility does exist that the file can be created by a concurrent process after the existence
        check yet before the output stream is created. In such a case, the default pyarrow behavior will
        truncate the contents of the existing file when opening the output stream.
    """
    try:
        if not overwrite and self.exists() is True:
            raise FileExistsError(f"Cannot create file, already exists: {self.location}")
        output_file = self._filesystem.open_output_stream(self._path, buffer_size=self._buffer_size)
    except PermissionError:
        raise
    except OSError as e:
        if e.errno == 13 or "AWS Error [code 15]" in str(e):
            raise PermissionError(f"Cannot create file, access denied: {self.location}") from e
        raise  # pragma: no cover - If some other kind of OSError, raise the raw error
    return output_file

exists()

Check whether the location exists.

Source code in pyiceberg/io/pyarrow.py
def exists(self) -> bool:
    """Check whether the location exists."""
    try:
        self._file_info()  # raises FileNotFoundError if it does not exist
        return True
    except FileNotFoundError:
        return False

open(seekable=True)

Open the location using a PyArrow FileSystem inferred from the location.

Parameters:

Name Type Description Default
seekable bool

If the stream should support seek, or if it is consumed sequential.

True

Returns:

Type Description
InputStream

pyarrow.lib.NativeFile: A NativeFile instance for the file located at self.location.

Raises:

Type Description
FileNotFoundError

If the file at self.location does not exist.

PermissionError

If the file at self.location cannot be accessed due to a permission error such as an AWS error code 15.

Source code in pyiceberg/io/pyarrow.py
def open(self, seekable: bool = True) -> InputStream:
    """Open the location using a PyArrow FileSystem inferred from the location.

    Args:
        seekable: If the stream should support seek, or if it is consumed sequential.

    Returns:
        pyarrow.lib.NativeFile: A NativeFile instance for the file located at `self.location`.

    Raises:
        FileNotFoundError: If the file at self.location does not exist.
        PermissionError: If the file at self.location cannot be accessed due to a permission error such as
            an AWS error code 15.
    """
    try:
        if seekable:
            input_file = self._filesystem.open_input_file(self._path)
        else:
            input_file = self._filesystem.open_input_stream(self._path, buffer_size=self._buffer_size)
    except FileNotFoundError:
        raise
    except PermissionError:
        raise
    except OSError as e:
        if e.errno == 2 or "Path does not exist" in str(e):
            raise FileNotFoundError(f"Cannot open file, does not exist: {self.location}") from e
        elif e.errno == 13 or "AWS Error [code 15]" in str(e):
            raise PermissionError(f"Cannot open file, access denied: {self.location}") from e
        raise  # pragma: no cover - If some other kind of OSError, raise the raw error
    return input_file

to_input_file()

Return a new PyArrowFile for the location of an existing PyArrowFile instance.

This method is included to abide by the OutputFile abstract base class. Since this implementation uses a single PyArrowFile class (as opposed to separate InputFile and OutputFile implementations), this method effectively returns a copy of the same instance.

Source code in pyiceberg/io/pyarrow.py
def to_input_file(self) -> PyArrowFile:
    """Return a new PyArrowFile for the location of an existing PyArrowFile instance.

    This method is included to abide by the OutputFile abstract base class. Since this implementation uses a single
    PyArrowFile class (as opposed to separate InputFile and OutputFile implementations), this method effectively returns
    a copy of the same instance.
    """
    return self

PyArrowFileIO

Bases: FileIO

Source code in pyiceberg/io/pyarrow.py
class PyArrowFileIO(FileIO):
    fs_by_scheme: Callable[[str, Optional[str]], FileSystem]

    def __init__(self, properties: Properties = EMPTY_DICT):
        self.fs_by_scheme: Callable[[str, Optional[str]], FileSystem] = lru_cache(self._initialize_fs)
        super().__init__(properties=properties)

    @staticmethod
    def parse_location(location: str) -> Tuple[str, str, str]:
        """Return the path without the scheme."""
        uri = urlparse(location)
        if not uri.scheme:
            return "file", uri.netloc, os.path.abspath(location)
        elif uri.scheme in ("hdfs", "viewfs"):
            return uri.scheme, uri.netloc, uri.path
        else:
            return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}"

    def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSystem:
        """Initialize FileSystem for different scheme."""
        if scheme in {"oss"}:
            return self._initialize_oss_fs()

        elif scheme in {"s3", "s3a", "s3n"}:
            return self._initialize_s3_fs(netloc)

        elif scheme in {"hdfs", "viewfs"}:
            return self._initialize_hdfs_fs(scheme, netloc)

        elif scheme in {"gs", "gcs"}:
            return self._initialize_gcs_fs()

        elif scheme in {"file"}:
            return self._initialize_local_fs()

        else:
            raise ValueError(f"Unrecognized filesystem type in URI: {scheme}")

    def _initialize_oss_fs(self) -> FileSystem:
        from pyarrow.fs import S3FileSystem

        client_kwargs: Dict[str, Any] = {
            "endpoint_override": self.properties.get(S3_ENDPOINT),
            "access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
            "secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
            "session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN),
            "region": get_first_property_value(self.properties, S3_REGION, AWS_REGION),
        }

        if proxy_uri := self.properties.get(S3_PROXY_URI):
            client_kwargs["proxy_options"] = proxy_uri

        if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT):
            client_kwargs["connect_timeout"] = float(connect_timeout)

        if request_timeout := self.properties.get(S3_REQUEST_TIMEOUT):
            client_kwargs["request_timeout"] = float(request_timeout)

        if role_arn := get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN):
            client_kwargs["role_arn"] = role_arn

        if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME):
            client_kwargs["session_name"] = session_name

        if force_virtual_addressing := self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING):
            client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, force_virtual_addressing, False)

        return S3FileSystem(**client_kwargs)

    def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem:
        from pyarrow.fs import S3FileSystem

        provided_region = get_first_property_value(self.properties, S3_REGION, AWS_REGION)

        # Do this when we don't provide the region at all, or when we explicitly enable it
        if provided_region is None or property_as_bool(self.properties, S3_RESOLVE_REGION, False) is True:
            # Resolve region from netloc(bucket), fallback to user-provided region
            # Only supported by buckets hosted by S3
            bucket_region = _cached_resolve_s3_region(bucket=netloc) or provided_region
            if provided_region is not None and bucket_region != provided_region:
                logger.warning(
                    f"PyArrow FileIO overriding S3 bucket region for bucket {netloc}: "
                    f"provided region {provided_region}, actual region {bucket_region}"
                )
        else:
            bucket_region = provided_region

        client_kwargs: Dict[str, Any] = {
            "endpoint_override": self.properties.get(S3_ENDPOINT),
            "access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
            "secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
            "session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN),
            "region": bucket_region,
        }

        if proxy_uri := self.properties.get(S3_PROXY_URI):
            client_kwargs["proxy_options"] = proxy_uri

        if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT):
            client_kwargs["connect_timeout"] = float(connect_timeout)

        if request_timeout := self.properties.get(S3_REQUEST_TIMEOUT):
            client_kwargs["request_timeout"] = float(request_timeout)

        if role_arn := get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN):
            client_kwargs["role_arn"] = role_arn

        if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME):
            client_kwargs["session_name"] = session_name

        if force_virtual_addressing := self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING):
            client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, force_virtual_addressing, False)

        return S3FileSystem(**client_kwargs)

    def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem:
        from pyarrow.fs import HadoopFileSystem

        hdfs_kwargs: Dict[str, Any] = {}
        if netloc:
            return HadoopFileSystem.from_uri(f"{scheme}://{netloc}")
        if host := self.properties.get(HDFS_HOST):
            hdfs_kwargs["host"] = host
        if port := self.properties.get(HDFS_PORT):
            # port should be an integer type
            hdfs_kwargs["port"] = int(port)
        if user := self.properties.get(HDFS_USER):
            hdfs_kwargs["user"] = user
        if kerb_ticket := self.properties.get(HDFS_KERB_TICKET):
            hdfs_kwargs["kerb_ticket"] = kerb_ticket

        return HadoopFileSystem(**hdfs_kwargs)

    def _initialize_gcs_fs(self) -> FileSystem:
        from pyarrow.fs import GcsFileSystem

        gcs_kwargs: Dict[str, Any] = {}
        if access_token := self.properties.get(GCS_TOKEN):
            gcs_kwargs["access_token"] = access_token
        if expiration := self.properties.get(GCS_TOKEN_EXPIRES_AT_MS):
            gcs_kwargs["credential_token_expiration"] = millis_to_datetime(int(expiration))
        if bucket_location := self.properties.get(GCS_DEFAULT_LOCATION):
            gcs_kwargs["default_bucket_location"] = bucket_location
        if endpoint := self.properties.get(GCS_SERVICE_HOST):
            url_parts = urlparse(endpoint)
            gcs_kwargs["scheme"] = url_parts.scheme
            gcs_kwargs["endpoint_override"] = url_parts.netloc

        return GcsFileSystem(**gcs_kwargs)

    def _initialize_local_fs(self) -> FileSystem:
        return PyArrowLocalFileSystem()

    def new_input(self, location: str) -> PyArrowFile:
        """Get a PyArrowFile instance to read bytes from the file at the given location.

        Args:
            location (str): A URI or a path to a local file.

        Returns:
            PyArrowFile: A PyArrowFile instance for the given location.
        """
        scheme, netloc, path = self.parse_location(location)
        return PyArrowFile(
            fs=self.fs_by_scheme(scheme, netloc),
            location=location,
            path=path,
            buffer_size=int(self.properties.get(BUFFER_SIZE, ONE_MEGABYTE)),
        )

    def new_output(self, location: str) -> PyArrowFile:
        """Get a PyArrowFile instance to write bytes to the file at the given location.

        Args:
            location (str): A URI or a path to a local file.

        Returns:
            PyArrowFile: A PyArrowFile instance for the given location.
        """
        scheme, netloc, path = self.parse_location(location)
        return PyArrowFile(
            fs=self.fs_by_scheme(scheme, netloc),
            location=location,
            path=path,
            buffer_size=int(self.properties.get(BUFFER_SIZE, ONE_MEGABYTE)),
        )

    def delete(self, location: Union[str, InputFile, OutputFile]) -> None:
        """Delete the file at the given location.

        Args:
            location (Union[str, InputFile, OutputFile]): The URI to the file--if an InputFile instance or an OutputFile instance is provided,
                the location attribute for that instance is used as the location to delete.

        Raises:
            FileNotFoundError: When the file at the provided location does not exist.
            PermissionError: If the file at the provided location cannot be accessed due to a permission error such as
                an AWS error code 15.
        """
        str_location = location.location if isinstance(location, (InputFile, OutputFile)) else location
        scheme, netloc, path = self.parse_location(str_location)
        fs = self.fs_by_scheme(scheme, netloc)

        try:
            fs.delete_file(path)
        except FileNotFoundError:
            raise
        except PermissionError:
            raise
        except OSError as e:
            if e.errno == 2 or "Path does not exist" in str(e):
                raise FileNotFoundError(f"Cannot delete file, does not exist: {location}") from e
            elif e.errno == 13 or "AWS Error [code 15]" in str(e):
                raise PermissionError(f"Cannot delete file, access denied: {location}") from e
            raise  # pragma: no cover - If some other kind of OSError, raise the raw error

    def __getstate__(self) -> Dict[str, Any]:
        """Create a dictionary of the PyArrowFileIO fields used when pickling."""
        fileio_copy = copy(self.__dict__)
        fileio_copy["fs_by_scheme"] = None
        return fileio_copy

    def __setstate__(self, state: Dict[str, Any]) -> None:
        """Deserialize the state into a PyArrowFileIO instance."""
        self.__dict__ = state
        self.fs_by_scheme = lru_cache(self._initialize_fs)

__getstate__()

Create a dictionary of the PyArrowFileIO fields used when pickling.

Source code in pyiceberg/io/pyarrow.py
def __getstate__(self) -> Dict[str, Any]:
    """Create a dictionary of the PyArrowFileIO fields used when pickling."""
    fileio_copy = copy(self.__dict__)
    fileio_copy["fs_by_scheme"] = None
    return fileio_copy

__setstate__(state)

Deserialize the state into a PyArrowFileIO instance.

Source code in pyiceberg/io/pyarrow.py
def __setstate__(self, state: Dict[str, Any]) -> None:
    """Deserialize the state into a PyArrowFileIO instance."""
    self.__dict__ = state
    self.fs_by_scheme = lru_cache(self._initialize_fs)

_initialize_fs(scheme, netloc=None)

Initialize FileSystem for different scheme.

Source code in pyiceberg/io/pyarrow.py
def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSystem:
    """Initialize FileSystem for different scheme."""
    if scheme in {"oss"}:
        return self._initialize_oss_fs()

    elif scheme in {"s3", "s3a", "s3n"}:
        return self._initialize_s3_fs(netloc)

    elif scheme in {"hdfs", "viewfs"}:
        return self._initialize_hdfs_fs(scheme, netloc)

    elif scheme in {"gs", "gcs"}:
        return self._initialize_gcs_fs()

    elif scheme in {"file"}:
        return self._initialize_local_fs()

    else:
        raise ValueError(f"Unrecognized filesystem type in URI: {scheme}")

delete(location)

Delete the file at the given location.

Parameters:

Name Type Description Default
location Union[str, InputFile, OutputFile]

The URI to the file--if an InputFile instance or an OutputFile instance is provided, the location attribute for that instance is used as the location to delete.

required

Raises:

Type Description
FileNotFoundError

When the file at the provided location does not exist.

PermissionError

If the file at the provided location cannot be accessed due to a permission error such as an AWS error code 15.

Source code in pyiceberg/io/pyarrow.py
def delete(self, location: Union[str, InputFile, OutputFile]) -> None:
    """Delete the file at the given location.

    Args:
        location (Union[str, InputFile, OutputFile]): The URI to the file--if an InputFile instance or an OutputFile instance is provided,
            the location attribute for that instance is used as the location to delete.

    Raises:
        FileNotFoundError: When the file at the provided location does not exist.
        PermissionError: If the file at the provided location cannot be accessed due to a permission error such as
            an AWS error code 15.
    """
    str_location = location.location if isinstance(location, (InputFile, OutputFile)) else location
    scheme, netloc, path = self.parse_location(str_location)
    fs = self.fs_by_scheme(scheme, netloc)

    try:
        fs.delete_file(path)
    except FileNotFoundError:
        raise
    except PermissionError:
        raise
    except OSError as e:
        if e.errno == 2 or "Path does not exist" in str(e):
            raise FileNotFoundError(f"Cannot delete file, does not exist: {location}") from e
        elif e.errno == 13 or "AWS Error [code 15]" in str(e):
            raise PermissionError(f"Cannot delete file, access denied: {location}") from e
        raise  # pragma: no cover - If some other kind of OSError, raise the raw error

new_input(location)

Get a PyArrowFile instance to read bytes from the file at the given location.

Parameters:

Name Type Description Default
location str

A URI or a path to a local file.

required

Returns:

Name Type Description
PyArrowFile PyArrowFile

A PyArrowFile instance for the given location.

Source code in pyiceberg/io/pyarrow.py
def new_input(self, location: str) -> PyArrowFile:
    """Get a PyArrowFile instance to read bytes from the file at the given location.

    Args:
        location (str): A URI or a path to a local file.

    Returns:
        PyArrowFile: A PyArrowFile instance for the given location.
    """
    scheme, netloc, path = self.parse_location(location)
    return PyArrowFile(
        fs=self.fs_by_scheme(scheme, netloc),
        location=location,
        path=path,
        buffer_size=int(self.properties.get(BUFFER_SIZE, ONE_MEGABYTE)),
    )

new_output(location)

Get a PyArrowFile instance to write bytes to the file at the given location.

Parameters:

Name Type Description Default
location str

A URI or a path to a local file.

required

Returns:

Name Type Description
PyArrowFile PyArrowFile

A PyArrowFile instance for the given location.

Source code in pyiceberg/io/pyarrow.py
def new_output(self, location: str) -> PyArrowFile:
    """Get a PyArrowFile instance to write bytes to the file at the given location.

    Args:
        location (str): A URI or a path to a local file.

    Returns:
        PyArrowFile: A PyArrowFile instance for the given location.
    """
    scheme, netloc, path = self.parse_location(location)
    return PyArrowFile(
        fs=self.fs_by_scheme(scheme, netloc),
        location=location,
        path=path,
        buffer_size=int(self.properties.get(BUFFER_SIZE, ONE_MEGABYTE)),
    )

parse_location(location) staticmethod

Return the path without the scheme.

Source code in pyiceberg/io/pyarrow.py
@staticmethod
def parse_location(location: str) -> Tuple[str, str, str]:
    """Return the path without the scheme."""
    uri = urlparse(location)
    if not uri.scheme:
        return "file", uri.netloc, os.path.abspath(location)
    elif uri.scheme in ("hdfs", "viewfs"):
        return uri.scheme, uri.netloc, uri.path
    else:
        return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}"

PyArrowSchemaVisitor

Bases: Generic[T], ABC

Source code in pyiceberg/io/pyarrow.py
class PyArrowSchemaVisitor(Generic[T], ABC):
    def before_field(self, field: pa.Field) -> None:
        """Override this method to perform an action immediately before visiting a field."""

    def after_field(self, field: pa.Field) -> None:
        """Override this method to perform an action immediately after visiting a field."""

    def before_list_element(self, element: pa.Field) -> None:
        """Override this method to perform an action immediately before visiting an element within a ListType."""

    def after_list_element(self, element: pa.Field) -> None:
        """Override this method to perform an action immediately after visiting an element within a ListType."""

    def before_map_key(self, key: pa.Field) -> None:
        """Override this method to perform an action immediately before visiting a key within a MapType."""

    def after_map_key(self, key: pa.Field) -> None:
        """Override this method to perform an action immediately after visiting a key within a MapType."""

    def before_map_value(self, value: pa.Field) -> None:
        """Override this method to perform an action immediately before visiting a value within a MapType."""

    def after_map_value(self, value: pa.Field) -> None:
        """Override this method to perform an action immediately after visiting a value within a MapType."""

    @abstractmethod
    def schema(self, schema: pa.Schema, struct_result: T) -> T:
        """Visit a schema."""

    @abstractmethod
    def struct(self, struct: pa.StructType, field_results: List[T]) -> T:
        """Visit a struct."""

    @abstractmethod
    def field(self, field: pa.Field, field_result: T) -> T:
        """Visit a field."""

    @abstractmethod
    def list(self, list_type: pa.ListType, element_result: T) -> T:
        """Visit a list."""

    @abstractmethod
    def map(self, map_type: pa.MapType, key_result: T, value_result: T) -> T:
        """Visit a map."""

    @abstractmethod
    def primitive(self, primitive: pa.DataType) -> T:
        """Visit a primitive type."""

after_field(field)

Override this method to perform an action immediately after visiting a field.

Source code in pyiceberg/io/pyarrow.py
def after_field(self, field: pa.Field) -> None:
    """Override this method to perform an action immediately after visiting a field."""

after_list_element(element)

Override this method to perform an action immediately after visiting an element within a ListType.

Source code in pyiceberg/io/pyarrow.py
def after_list_element(self, element: pa.Field) -> None:
    """Override this method to perform an action immediately after visiting an element within a ListType."""

after_map_key(key)

Override this method to perform an action immediately after visiting a key within a MapType.

Source code in pyiceberg/io/pyarrow.py
def after_map_key(self, key: pa.Field) -> None:
    """Override this method to perform an action immediately after visiting a key within a MapType."""

after_map_value(value)

Override this method to perform an action immediately after visiting a value within a MapType.

Source code in pyiceberg/io/pyarrow.py
def after_map_value(self, value: pa.Field) -> None:
    """Override this method to perform an action immediately after visiting a value within a MapType."""

before_field(field)

Override this method to perform an action immediately before visiting a field.

Source code in pyiceberg/io/pyarrow.py
def before_field(self, field: pa.Field) -> None:
    """Override this method to perform an action immediately before visiting a field."""

before_list_element(element)

Override this method to perform an action immediately before visiting an element within a ListType.

Source code in pyiceberg/io/pyarrow.py
def before_list_element(self, element: pa.Field) -> None:
    """Override this method to perform an action immediately before visiting an element within a ListType."""

before_map_key(key)

Override this method to perform an action immediately before visiting a key within a MapType.

Source code in pyiceberg/io/pyarrow.py
def before_map_key(self, key: pa.Field) -> None:
    """Override this method to perform an action immediately before visiting a key within a MapType."""

before_map_value(value)

Override this method to perform an action immediately before visiting a value within a MapType.

Source code in pyiceberg/io/pyarrow.py
def before_map_value(self, value: pa.Field) -> None:
    """Override this method to perform an action immediately before visiting a value within a MapType."""

field(field, field_result) abstractmethod

Visit a field.

Source code in pyiceberg/io/pyarrow.py
@abstractmethod
def field(self, field: pa.Field, field_result: T) -> T:
    """Visit a field."""

list(list_type, element_result) abstractmethod

Visit a list.

Source code in pyiceberg/io/pyarrow.py
@abstractmethod
def list(self, list_type: pa.ListType, element_result: T) -> T:
    """Visit a list."""

map(map_type, key_result, value_result) abstractmethod

Visit a map.

Source code in pyiceberg/io/pyarrow.py
@abstractmethod
def map(self, map_type: pa.MapType, key_result: T, value_result: T) -> T:
    """Visit a map."""

primitive(primitive) abstractmethod

Visit a primitive type.

Source code in pyiceberg/io/pyarrow.py
@abstractmethod
def primitive(self, primitive: pa.DataType) -> T:
    """Visit a primitive type."""

schema(schema, struct_result) abstractmethod

Visit a schema.

Source code in pyiceberg/io/pyarrow.py
@abstractmethod
def schema(self, schema: pa.Schema, struct_result: T) -> T:
    """Visit a schema."""

struct(struct, field_results) abstractmethod

Visit a struct.

Source code in pyiceberg/io/pyarrow.py
@abstractmethod
def struct(self, struct: pa.StructType, field_results: List[T]) -> T:
    """Visit a struct."""

UnsupportedPyArrowTypeException

Bases: Exception

Cannot convert PyArrow type to corresponding Iceberg type.

Source code in pyiceberg/io/pyarrow.py
class UnsupportedPyArrowTypeException(Exception):
    """Cannot convert PyArrow type to corresponding Iceberg type."""

    def __init__(self, field: pa.Field, *args: Any):
        self.field = field
        super().__init__(*args)

_ConvertToIceberg

Bases: PyArrowSchemaVisitor[Union[IcebergType, Schema]]

Converts PyArrowSchema to Iceberg Schema. Applies the IDs from name_mapping if provided.

Source code in pyiceberg/io/pyarrow.py
class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
    """Converts PyArrowSchema to Iceberg Schema. Applies the IDs from name_mapping if provided."""

    _field_names: List[str]

    def __init__(self, downcast_ns_timestamp_to_us: bool = False) -> None:
        self._field_names = []
        self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us

    def _field_id(self, field: pa.Field) -> int:
        if (field_id := _get_field_id(field)) is not None:
            return field_id
        else:
            raise ValueError(f"Cannot convert {field} to Iceberg Field as field_id is empty.")

    def schema(self, schema: pa.Schema, struct_result: StructType) -> Schema:
        return Schema(*struct_result.fields)

    def struct(self, struct: pa.StructType, field_results: List[NestedField]) -> StructType:
        return StructType(*field_results)

    def field(self, field: pa.Field, field_result: IcebergType) -> NestedField:
        field_id = self._field_id(field)
        field_doc = doc_str.decode() if (field.metadata and (doc_str := field.metadata.get(PYARROW_FIELD_DOC_KEY))) else None
        field_type = field_result
        return NestedField(field_id, field.name, field_type, required=not field.nullable, doc=field_doc)

    def list(self, list_type: pa.ListType, element_result: IcebergType) -> ListType:
        element_field = list_type.value_field
        self._field_names.append(LIST_ELEMENT_NAME)
        element_id = self._field_id(element_field)
        self._field_names.pop()
        return ListType(element_id, element_result, element_required=not element_field.nullable)

    def map(self, map_type: pa.MapType, key_result: IcebergType, value_result: IcebergType) -> MapType:
        key_field = map_type.key_field
        self._field_names.append(MAP_KEY_NAME)
        key_id = self._field_id(key_field)
        self._field_names.pop()
        value_field = map_type.item_field
        self._field_names.append(MAP_VALUE_NAME)
        value_id = self._field_id(value_field)
        self._field_names.pop()
        return MapType(key_id, key_result, value_id, value_result, value_required=not value_field.nullable)

    def primitive(self, primitive: pa.DataType) -> PrimitiveType:
        if pa.types.is_boolean(primitive):
            return BooleanType()
        elif pa.types.is_integer(primitive):
            width = primitive.bit_width
            if width <= 32:
                return IntegerType()
            elif width <= 64:
                return LongType()
            else:
                # Does not exist (yet)
                raise TypeError(f"Unsupported integer type: {primitive}")
        elif pa.types.is_float32(primitive):
            return FloatType()
        elif pa.types.is_float64(primitive):
            return DoubleType()
        elif isinstance(primitive, pa.Decimal128Type):
            primitive = cast(pa.Decimal128Type, primitive)
            return DecimalType(primitive.precision, primitive.scale)
        elif pa.types.is_string(primitive) or pa.types.is_large_string(primitive):
            return StringType()
        elif pa.types.is_date32(primitive):
            return DateType()
        elif isinstance(primitive, pa.Time64Type) and primitive.unit == "us":
            return TimeType()
        elif pa.types.is_timestamp(primitive):
            primitive = cast(pa.TimestampType, primitive)
            if primitive.unit in ("s", "ms", "us"):
                # Supported types, will be upcast automatically to 'us'
                pass
            elif primitive.unit == "ns":
                if self._downcast_ns_timestamp_to_us:
                    logger.warning("Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.")
                else:
                    raise TypeError(
                        "Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write.",
                    )
            else:
                raise TypeError(f"Unsupported precision for timestamp type: {primitive.unit}")

            if primitive.tz in UTC_ALIASES:
                return TimestamptzType()
            elif primitive.tz is None:
                return TimestampType()

        elif pa.types.is_binary(primitive) or pa.types.is_large_binary(primitive):
            return BinaryType()
        elif pa.types.is_fixed_size_binary(primitive):
            primitive = cast(pa.FixedSizeBinaryType, primitive)
            return FixedType(primitive.byte_width)

        raise TypeError(f"Unsupported type: {primitive}")

    def before_field(self, field: pa.Field) -> None:
        self._field_names.append(field.name)

    def after_field(self, field: pa.Field) -> None:
        self._field_names.pop()

    def before_list_element(self, element: pa.Field) -> None:
        self._field_names.append(LIST_ELEMENT_NAME)

    def after_list_element(self, element: pa.Field) -> None:
        self._field_names.pop()

    def before_map_key(self, key: pa.Field) -> None:
        self._field_names.append(MAP_KEY_NAME)

    def after_map_key(self, element: pa.Field) -> None:
        self._field_names.pop()

    def before_map_value(self, value: pa.Field) -> None:
        self._field_names.append(MAP_VALUE_NAME)

    def after_map_value(self, element: pa.Field) -> None:
        self._field_names.pop()

_ConvertToIcebergWithoutIDs

Bases: _ConvertToIceberg

Converts PyArrowSchema to Iceberg Schema with all -1 ids.

The schema generated through this visitor should always be used in conjunction with new_table_metadata function to assign new field ids in order. This is currently used only when creating an Iceberg Schema from a PyArrow schema when creating a new Iceberg table.

Source code in pyiceberg/io/pyarrow.py
class _ConvertToIcebergWithoutIDs(_ConvertToIceberg):
    """
    Converts PyArrowSchema to Iceberg Schema with all -1 ids.

    The schema generated through this visitor should always be
    used in conjunction with `new_table_metadata` function to
    assign new field ids in order. This is currently used only
    when creating an Iceberg Schema from a PyArrow schema when
    creating a new Iceberg table.
    """

    def _field_id(self, field: pa.Field) -> int:
        return -1

_NullNaNUnmentionedTermsCollector

Bases: BoundBooleanExpressionVisitor[None]

Source code in pyiceberg/io/pyarrow.py
class _NullNaNUnmentionedTermsCollector(BoundBooleanExpressionVisitor[None]):
    # BoundTerms which have either is_null or is_not_null appearing at least once in the boolean expr.
    is_null_or_not_bound_terms: set[BoundTerm[Any]]
    # The remaining BoundTerms appearing in the boolean expr.
    null_unmentioned_bound_terms: set[BoundTerm[Any]]
    # BoundTerms which have either is_nan or is_not_nan appearing at least once in the boolean expr.
    is_nan_or_not_bound_terms: set[BoundTerm[Any]]
    # The remaining BoundTerms appearing in the boolean expr.
    nan_unmentioned_bound_terms: set[BoundTerm[Any]]

    def __init__(self) -> None:
        super().__init__()
        self.is_null_or_not_bound_terms = set()
        self.null_unmentioned_bound_terms = set()
        self.is_nan_or_not_bound_terms = set()
        self.nan_unmentioned_bound_terms = set()

    def _handle_explicit_is_null_or_not(self, term: BoundTerm[Any]) -> None:
        """Handle the predicate case where either is_null or is_not_null is included."""
        if term in self.null_unmentioned_bound_terms:
            self.null_unmentioned_bound_terms.remove(term)
        self.is_null_or_not_bound_terms.add(term)

    def _handle_null_unmentioned(self, term: BoundTerm[Any]) -> None:
        """Handle the predicate case where neither is_null or is_not_null is included."""
        if term not in self.is_null_or_not_bound_terms:
            self.null_unmentioned_bound_terms.add(term)

    def _handle_explicit_is_nan_or_not(self, term: BoundTerm[Any]) -> None:
        """Handle the predicate case where either is_nan or is_not_nan is included."""
        if term in self.nan_unmentioned_bound_terms:
            self.nan_unmentioned_bound_terms.remove(term)
        self.is_nan_or_not_bound_terms.add(term)

    def _handle_nan_unmentioned(self, term: BoundTerm[Any]) -> None:
        """Handle the predicate case where neither is_nan or is_not_nan is included."""
        if term not in self.is_nan_or_not_bound_terms:
            self.nan_unmentioned_bound_terms.add(term)

    def visit_in(self, term: BoundTerm[Any], literals: Set[Any]) -> None:
        self._handle_null_unmentioned(term)
        self._handle_nan_unmentioned(term)

    def visit_not_in(self, term: BoundTerm[Any], literals: Set[Any]) -> None:
        self._handle_null_unmentioned(term)
        self._handle_nan_unmentioned(term)

    def visit_is_nan(self, term: BoundTerm[Any]) -> None:
        self._handle_null_unmentioned(term)
        self._handle_explicit_is_nan_or_not(term)

    def visit_not_nan(self, term: BoundTerm[Any]) -> None:
        self._handle_null_unmentioned(term)
        self._handle_explicit_is_nan_or_not(term)

    def visit_is_null(self, term: BoundTerm[Any]) -> None:
        self._handle_explicit_is_null_or_not(term)
        self._handle_nan_unmentioned(term)

    def visit_not_null(self, term: BoundTerm[Any]) -> None:
        self._handle_explicit_is_null_or_not(term)
        self._handle_nan_unmentioned(term)

    def visit_equal(self, term: BoundTerm[Any], literal: Literal[Any]) -> None:
        self._handle_null_unmentioned(term)
        self._handle_nan_unmentioned(term)

    def visit_not_equal(self, term: BoundTerm[Any], literal: Literal[Any]) -> None:
        self._handle_null_unmentioned(term)
        self._handle_nan_unmentioned(term)

    def visit_greater_than_or_equal(self, term: BoundTerm[Any], literal: Literal[Any]) -> None:
        self._handle_null_unmentioned(term)
        self._handle_nan_unmentioned(term)

    def visit_greater_than(self, term: BoundTerm[Any], literal: Literal[Any]) -> None:
        self._handle_null_unmentioned(term)
        self._handle_nan_unmentioned(term)

    def visit_less_than(self, term: BoundTerm[Any], literal: Literal[Any]) -> None:
        self._handle_null_unmentioned(term)
        self._handle_nan_unmentioned(term)

    def visit_less_than_or_equal(self, term: BoundTerm[Any], literal: Literal[Any]) -> None:
        self._handle_null_unmentioned(term)
        self._handle_nan_unmentioned(term)

    def visit_starts_with(self, term: BoundTerm[Any], literal: Literal[Any]) -> None:
        self._handle_null_unmentioned(term)
        self._handle_nan_unmentioned(term)

    def visit_not_starts_with(self, term: BoundTerm[Any], literal: Literal[Any]) -> None:
        self._handle_null_unmentioned(term)
        self._handle_nan_unmentioned(term)

    def visit_true(self) -> None:
        return

    def visit_false(self) -> None:
        return

    def visit_not(self, child_result: None) -> None:
        return

    def visit_and(self, left_result: None, right_result: None) -> None:
        return

    def visit_or(self, left_result: None, right_result: None) -> None:
        return

    def collect(
        self,
        expr: BooleanExpression,
    ) -> None:
        """Collect the bound references categorized by having at least one is_null or is_not_null in the expr and the remaining."""
        boolean_expression_visit(expr, self)

_handle_explicit_is_nan_or_not(term)

Handle the predicate case where either is_nan or is_not_nan is included.

Source code in pyiceberg/io/pyarrow.py
def _handle_explicit_is_nan_or_not(self, term: BoundTerm[Any]) -> None:
    """Handle the predicate case where either is_nan or is_not_nan is included."""
    if term in self.nan_unmentioned_bound_terms:
        self.nan_unmentioned_bound_terms.remove(term)
    self.is_nan_or_not_bound_terms.add(term)

_handle_explicit_is_null_or_not(term)

Handle the predicate case where either is_null or is_not_null is included.

Source code in pyiceberg/io/pyarrow.py
def _handle_explicit_is_null_or_not(self, term: BoundTerm[Any]) -> None:
    """Handle the predicate case where either is_null or is_not_null is included."""
    if term in self.null_unmentioned_bound_terms:
        self.null_unmentioned_bound_terms.remove(term)
    self.is_null_or_not_bound_terms.add(term)

_handle_nan_unmentioned(term)

Handle the predicate case where neither is_nan or is_not_nan is included.

Source code in pyiceberg/io/pyarrow.py
def _handle_nan_unmentioned(self, term: BoundTerm[Any]) -> None:
    """Handle the predicate case where neither is_nan or is_not_nan is included."""
    if term not in self.is_nan_or_not_bound_terms:
        self.nan_unmentioned_bound_terms.add(term)

_handle_null_unmentioned(term)

Handle the predicate case where neither is_null or is_not_null is included.

Source code in pyiceberg/io/pyarrow.py
def _handle_null_unmentioned(self, term: BoundTerm[Any]) -> None:
    """Handle the predicate case where neither is_null or is_not_null is included."""
    if term not in self.is_null_or_not_bound_terms:
        self.null_unmentioned_bound_terms.add(term)

collect(expr)

Collect the bound references categorized by having at least one is_null or is_not_null in the expr and the remaining.

Source code in pyiceberg/io/pyarrow.py
def collect(
    self,
    expr: BooleanExpression,
) -> None:
    """Collect the bound references categorized by having at least one is_null or is_not_null in the expr and the remaining."""
    boolean_expression_visit(expr, self)

_check_pyarrow_schema_compatible(requested_schema, provided_schema, downcast_ns_timestamp_to_us=False)

Check if the requested_schema is compatible with provided_schema.

Two schemas are considered compatible when they are equal in terms of the Iceberg Schema type.

Raises:

Type Description
ValueError

If the schemas are not compatible.

Source code in pyiceberg/io/pyarrow.py
def _check_pyarrow_schema_compatible(
    requested_schema: Schema, provided_schema: pa.Schema, downcast_ns_timestamp_to_us: bool = False
) -> None:
    """
    Check if the `requested_schema` is compatible with `provided_schema`.

    Two schemas are considered compatible when they are equal in terms of the Iceberg Schema type.

    Raises:
        ValueError: If the schemas are not compatible.
    """
    name_mapping = requested_schema.name_mapping
    try:
        provided_schema = pyarrow_to_schema(
            provided_schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
        )
    except ValueError as e:
        provided_schema = _pyarrow_to_schema_without_ids(provided_schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
        additional_names = set(provided_schema._name_to_id.keys()) - set(requested_schema._name_to_id.keys())
        raise ValueError(
            f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)."
        ) from e
    _check_schema_compatible(requested_schema, provided_schema)

_dataframe_to_data_files(table_metadata, df, io, write_uuid=None, counter=None)

Convert a PyArrow table into a DataFile.

Returns:

Type Description
Iterable[DataFile]

An iterable that supplies datafiles that represent the table.

Source code in pyiceberg/io/pyarrow.py
def _dataframe_to_data_files(
    table_metadata: TableMetadata,
    df: pa.Table,
    io: FileIO,
    write_uuid: Optional[uuid.UUID] = None,
    counter: Optional[itertools.count[int]] = None,
) -> Iterable[DataFile]:
    """Convert a PyArrow table into a DataFile.

    Returns:
        An iterable that supplies datafiles that represent the table.
    """
    from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties, WriteTask

    counter = counter or itertools.count(0)
    write_uuid = write_uuid or uuid.uuid4()
    target_file_size: int = property_as_int(  # type: ignore  # The property is set with non-None value.
        properties=table_metadata.properties,
        property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
        default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
    )
    name_mapping = table_metadata.schema().name_mapping
    downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
    task_schema = pyarrow_to_schema(df.schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)

    if table_metadata.spec().is_unpartitioned():
        yield from write_file(
            io=io,
            table_metadata=table_metadata,
            tasks=iter(
                [
                    WriteTask(write_uuid=write_uuid, task_id=next(counter), record_batches=batches, schema=task_schema)
                    for batches in bin_pack_arrow_table(df, target_file_size)
                ]
            ),
        )
    else:
        partitions = _determine_partitions(spec=table_metadata.spec(), schema=table_metadata.schema(), arrow_table=df)
        yield from write_file(
            io=io,
            table_metadata=table_metadata,
            tasks=iter(
                [
                    WriteTask(
                        write_uuid=write_uuid,
                        task_id=next(counter),
                        record_batches=batches,
                        partition_key=partition.partition_key,
                        schema=task_schema,
                    )
                    for partition in partitions
                    for batches in bin_pack_arrow_table(partition.arrow_table_partition, target_file_size)
                ]
            ),
        )

_determine_partitions(spec, schema, arrow_table)

Based on the iceberg table partition spec, filter the arrow table into partitions with their keys.

Example: Input: An arrow table with partition key of ['n_legs', 'year'] and with data of {'year': [2020, 2022, 2022, 2021, 2022, 2022, 2022, 2019, 2021], 'n_legs': [2, 2, 2, 4, 4, 4, 4, 5, 100], 'animal': ["Flamingo", "Parrot", "Parrot", "Dog", "Horse", "Horse", "Horse","Brittle stars", "Centipede"]}. The algorithm: - We determine the set of unique partition keys - Then we produce a set of partitions by filtering on each of the combinations - We combine the chunks to create a copy to avoid GIL congestion on the original table

Source code in pyiceberg/io/pyarrow.py
def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.Table) -> List[_TablePartition]:
    """Based on the iceberg table partition spec, filter the arrow table into partitions with their keys.

    Example:
    Input:
    An arrow table with partition key of ['n_legs', 'year'] and with data of
    {'year': [2020, 2022, 2022, 2021, 2022, 2022, 2022, 2019, 2021],
     'n_legs': [2, 2, 2, 4, 4, 4, 4, 5, 100],
     'animal': ["Flamingo", "Parrot", "Parrot", "Dog", "Horse", "Horse", "Horse","Brittle stars", "Centipede"]}.
    The algorithm:
    - We determine the set of unique partition keys
    - Then we produce a set of partitions by filtering on each of the combinations
    - We combine the chunks to create a copy to avoid GIL congestion on the original table
    """
    # Assign unique names to columns where the partition transform has been applied
    # to avoid conflicts
    partition_fields = [f"_partition_{field.name}" for field in spec.fields]

    for partition, name in zip(spec.fields, partition_fields):
        source_field = schema.find_field(partition.source_id)
        arrow_table = arrow_table.append_column(
            name, partition.transform.pyarrow_transform(source_field.field_type)(arrow_table[source_field.name])
        )

    unique_partition_fields = arrow_table.select(partition_fields).group_by(partition_fields).aggregate([])

    table_partitions = []
    # TODO: As a next step, we could also play around with yielding instead of materializing the full list
    for unique_partition in unique_partition_fields.to_pylist():
        partition_key = PartitionKey(
            field_values=[
                PartitionFieldValue(field=field, value=unique_partition[name])
                for field, name in zip(spec.fields, partition_fields)
            ],
            partition_spec=spec,
            schema=schema,
        )
        filtered_table = arrow_table.filter(
            functools.reduce(
                operator.and_,
                [
                    pc.field(partition_field_name) == unique_partition[partition_field_name]
                    if unique_partition[partition_field_name] is not None
                    else pc.field(partition_field_name).is_null()
                    for field, partition_field_name in zip(spec.fields, partition_fields)
                ],
            )
        )
        filtered_table = filtered_table.drop_columns(partition_fields)

        # The combine_chunks seems to be counter-intuitive to do, but it actually returns
        # fresh buffers that don't interfere with each other when it is written out to file
        table_partitions.append(
            _TablePartition(partition_key=partition_key, arrow_table_partition=filtered_table.combine_chunks())
        )

    return table_partitions

_expression_to_complementary_pyarrow(expr)

Complementary filter conversion function of expression_to_pyarrow.

Could not use expression_to_pyarrow(Not(expr)) to achieve this complementary effect because ~ in pyarrow.compute.Expression does not handle null.

Source code in pyiceberg/io/pyarrow.py
def _expression_to_complementary_pyarrow(expr: BooleanExpression) -> pc.Expression:
    """Complementary filter conversion function of expression_to_pyarrow.

    Could not use expression_to_pyarrow(Not(expr)) to achieve this complementary effect because ~ in pyarrow.compute.Expression does not handle null.
    """
    collector = _NullNaNUnmentionedTermsCollector()
    collector.collect(expr)

    # Convert the set of terms to a sorted list so that layout of the expression to build is deterministic.
    null_unmentioned_bound_terms: List[BoundTerm[Any]] = sorted(
        collector.null_unmentioned_bound_terms, key=lambda term: term.ref().field.name
    )
    nan_unmentioned_bound_terms: List[BoundTerm[Any]] = sorted(
        collector.nan_unmentioned_bound_terms, key=lambda term: term.ref().field.name
    )

    preserve_expr: BooleanExpression = Not(expr)
    for term in null_unmentioned_bound_terms:
        preserve_expr = Or(preserve_expr, BoundIsNull(term=term))
    for term in nan_unmentioned_bound_terms:
        preserve_expr = Or(preserve_expr, BoundIsNaN(term=term))
    return expression_to_pyarrow(preserve_expr)

_get_column_projection_values(file, projected_schema, partition_spec, file_project_field_ids)

Apply Column Projection rules to File Schema.

Source code in pyiceberg/io/pyarrow.py
def _get_column_projection_values(
    file: DataFile, projected_schema: Schema, partition_spec: Optional[PartitionSpec], file_project_field_ids: Set[int]
) -> Tuple[bool, Dict[str, Any]]:
    """Apply Column Projection rules to File Schema."""
    project_schema_diff = projected_schema.field_ids.difference(file_project_field_ids)
    should_project_columns = len(project_schema_diff) > 0
    projected_missing_fields: Dict[str, Any] = {}

    if not should_project_columns:
        return False, {}

    partition_schema: StructType
    accessors: Dict[int, Accessor]

    if partition_spec is not None:
        partition_schema = partition_spec.partition_type(projected_schema)
        accessors = build_position_accessors(partition_schema)
    else:
        return False, {}

    for field_id in project_schema_diff:
        for partition_field in partition_spec.fields_by_source_id(field_id):
            if isinstance(partition_field.transform, IdentityTransform):
                accessor = accessors.get(partition_field.field_id)

                if accessor is None:
                    continue

                # The partition field may not exist in the partition record of the data file.
                # This can happen when new partition fields are introduced after the file was written.
                try:
                    if partition_value := accessor.get(file.partition):
                        projected_missing_fields[partition_field.name] = partition_value
                except IndexError:
                    continue

    return True, projected_missing_fields

compute_statistics_plan(schema, table_properties)

Compute the statistics plan for all columns.

The resulting list is assumed to have the same length and same order as the columns in the pyarrow table. This allows the list to map from the column index to the Iceberg column ID. For each element, the desired metrics collection that was provided by the user in the configuration is computed and then adjusted according to the data type of the column. For nested columns the minimum and maximum values are not computed. And truncation is only applied to text of binary strings.

Parameters:

Name Type Description Default
table_properties from pyiceberg.table.metadata.TableMetadata

The Iceberg table metadata properties. They are required to compute the mapping of column position to iceberg schema type id. It's also used to set the mode for column metrics collection

required
Source code in pyiceberg/io/pyarrow.py
def compute_statistics_plan(
    schema: Schema,
    table_properties: Dict[str, str],
) -> Dict[int, StatisticsCollector]:
    """
    Compute the statistics plan for all columns.

    The resulting list is assumed to have the same length and same order as the columns in the pyarrow table.
    This allows the list to map from the column index to the Iceberg column ID.
    For each element, the desired metrics collection that was provided by the user in the configuration
    is computed and then adjusted according to the data type of the column. For nested columns the minimum
    and maximum values are not computed. And truncation is only applied to text of binary strings.

    Args:
        table_properties (from pyiceberg.table.metadata.TableMetadata): The Iceberg table metadata properties.
            They are required to compute the mapping of column position to iceberg schema type id. It's also
            used to set the mode for column metrics collection
    """
    stats_cols = pre_order_visit(schema, PyArrowStatisticsCollector(schema, table_properties))
    result: Dict[int, StatisticsCollector] = {}
    for stats_col in stats_cols:
        result[stats_col.field_id] = stats_col
    return result

data_file_statistics_from_parquet_metadata(parquet_metadata, stats_columns, parquet_column_mapping)

Compute and return DataFileStatistics that includes the following.

  • record_count
  • column_sizes
  • value_counts
  • null_value_counts
  • nan_value_counts
  • column_aggregates
  • split_offsets

Parameters:

Name Type Description Default
parquet_metadata FileMetaData

A pyarrow metadata object.

required
stats_columns Dict[int, StatisticsCollector]

The statistics gathering plan. It is required to set the mode for column metrics collection

required
parquet_column_mapping Dict[str, int]

The mapping of the parquet file name to the field ID

required
Source code in pyiceberg/io/pyarrow.py
def data_file_statistics_from_parquet_metadata(
    parquet_metadata: pq.FileMetaData,
    stats_columns: Dict[int, StatisticsCollector],
    parquet_column_mapping: Dict[str, int],
) -> DataFileStatistics:
    """
    Compute and return DataFileStatistics that includes the following.

    - record_count
    - column_sizes
    - value_counts
    - null_value_counts
    - nan_value_counts
    - column_aggregates
    - split_offsets

    Args:
        parquet_metadata (pyarrow.parquet.FileMetaData): A pyarrow metadata object.
        stats_columns (Dict[int, StatisticsCollector]): The statistics gathering plan. It is required to
            set the mode for column metrics collection
        parquet_column_mapping (Dict[str, int]): The mapping of the parquet file name to the field ID
    """
    column_sizes: Dict[int, int] = {}
    value_counts: Dict[int, int] = {}
    split_offsets: List[int] = []

    null_value_counts: Dict[int, int] = {}
    nan_value_counts: Dict[int, int] = {}

    col_aggs = {}

    invalidate_col: Set[int] = set()
    for r in range(parquet_metadata.num_row_groups):
        # References:
        # https://github.com/apache/iceberg/blob/fc381a81a1fdb8f51a0637ca27cd30673bd7aad3/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java#L232
        # https://github.com/apache/parquet-mr/blob/ac29db4611f86a07cc6877b416aa4b183e09b353/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java#L184

        row_group = parquet_metadata.row_group(r)

        data_offset = row_group.column(0).data_page_offset
        dictionary_offset = row_group.column(0).dictionary_page_offset

        if row_group.column(0).has_dictionary_page and dictionary_offset < data_offset:
            split_offsets.append(dictionary_offset)
        else:
            split_offsets.append(data_offset)

        for pos in range(parquet_metadata.num_columns):
            column = row_group.column(pos)
            field_id = parquet_column_mapping[column.path_in_schema]

            stats_col = stats_columns[field_id]

            column_sizes.setdefault(field_id, 0)
            column_sizes[field_id] += column.total_compressed_size

            if stats_col.mode == MetricsMode(MetricModeTypes.NONE):
                continue

            value_counts[field_id] = value_counts.get(field_id, 0) + column.num_values

            if column.is_stats_set:
                try:
                    statistics = column.statistics

                    if statistics.has_null_count:
                        null_value_counts[field_id] = null_value_counts.get(field_id, 0) + statistics.null_count

                    if stats_col.mode == MetricsMode(MetricModeTypes.COUNTS):
                        continue

                    if field_id not in col_aggs:
                        col_aggs[field_id] = StatsAggregator(
                            stats_col.iceberg_type, statistics.physical_type, stats_col.mode.length
                        )

                    col_aggs[field_id].update_min(statistics.min)
                    col_aggs[field_id].update_max(statistics.max)

                except pyarrow.lib.ArrowNotImplementedError as e:
                    invalidate_col.add(field_id)
                    logger.warning(e)
            else:
                invalidate_col.add(field_id)
                logger.warning("PyArrow statistics missing for column %d when writing file", pos)

    split_offsets.sort()

    for field_id in invalidate_col:
        col_aggs.pop(field_id, None)
        null_value_counts.pop(field_id, None)

    return DataFileStatistics(
        record_count=parquet_metadata.num_rows,
        column_sizes=column_sizes,
        value_counts=value_counts,
        null_value_counts=null_value_counts,
        nan_value_counts=nan_value_counts,
        column_aggregates=col_aggs,
        split_offsets=split_offsets,
    )

parquet_path_to_id_mapping(schema)

Compute the mapping of parquet column path to Iceberg ID.

For each column, the parquet file metadata has a path_in_schema attribute that follows a specific naming scheme for nested columnds. This function computes a mapping of the full paths to the corresponding Iceberg IDs.

Parameters:

Name Type Description Default
schema Schema

The current table schema.

required
Source code in pyiceberg/io/pyarrow.py
def parquet_path_to_id_mapping(
    schema: Schema,
) -> Dict[str, int]:
    """
    Compute the mapping of parquet column path to Iceberg ID.

    For each column, the parquet file metadata has a path_in_schema attribute that follows
    a specific naming scheme for nested columnds. This function computes a mapping of
    the full paths to the corresponding Iceberg IDs.

    Args:
        schema (pyiceberg.schema.Schema): The current table schema.
    """
    result: Dict[str, int] = {}
    for pair in pre_order_visit(schema, ID2ParquetPathVisitor()):
        result[pair.parquet_path] = pair.field_id
    return result

visit_pyarrow(obj, visitor)

Apply a pyarrow schema visitor to any point within a schema.

The function traverses the schema in post-order fashion.

Parameters:

Name Type Description Default
obj Union[DataType, Schema]

An instance of a Schema or an IcebergType.

required
visitor PyArrowSchemaVisitor[T]

An instance of an implementation of the generic PyarrowSchemaVisitor base class.

required

Raises:

Type Description
NotImplementedError

If attempting to visit an unrecognized object type.

Source code in pyiceberg/io/pyarrow.py
@singledispatch
def visit_pyarrow(obj: Union[pa.DataType, pa.Schema], visitor: PyArrowSchemaVisitor[T]) -> T:
    """Apply a pyarrow schema visitor to any point within a schema.

    The function traverses the schema in post-order fashion.

    Args:
        obj (Union[pa.DataType, pa.Schema]): An instance of a Schema or an IcebergType.
        visitor (PyArrowSchemaVisitor[T]): An instance of an implementation of the generic PyarrowSchemaVisitor base class.

    Raises:
        NotImplementedError: If attempting to visit an unrecognized object type.
    """
    raise NotImplementedError(f"Cannot visit non-type: {obj}")