Skip to content

fsspec

FileIO implementation for reading and writing table files that uses fsspec compatible filesystems.

FsspecFileIO

Bases: FileIO

A FileIO implementation that uses fsspec.

Source code in pyiceberg/io/fsspec.py
class FsspecFileIO(FileIO):
    """A FileIO implementation that uses fsspec."""

    def __init__(self, properties: Properties):
        self._scheme_to_fs = {}
        self._scheme_to_fs.update(SCHEME_TO_FS)
        self._thread_locals = threading.local()
        super().__init__(properties=properties)

    def new_input(self, location: str) -> FsspecInputFile:
        """Get an FsspecInputFile instance to read bytes from the file at the given location.

        Args:
            location (str): A URI or a path to a local file.

        Returns:
            FsspecInputFile: An FsspecInputFile instance for the given location.
        """
        uri = urlparse(location)
        fs = self.get_fs(uri.scheme)
        return FsspecInputFile(location=location, fs=fs)

    def new_output(self, location: str) -> FsspecOutputFile:
        """Get an FsspecOutputFile instance to write bytes to the file at the given location.

        Args:
            location (str): A URI or a path to a local file.

        Returns:
            FsspecOutputFile: An FsspecOutputFile instance for the given location.
        """
        uri = urlparse(location)
        fs = self.get_fs(uri.scheme)
        return FsspecOutputFile(location=location, fs=fs)

    def delete(self, location: Union[str, InputFile, OutputFile]) -> None:
        """Delete the file at the given location.

        Args:
            location (Union[str, InputFile, OutputFile]): The URI to the file--if an InputFile instance or an
                OutputFile instance is provided, the location attribute for that instance is used as the location
                to delete.
        """
        if isinstance(location, (InputFile, OutputFile)):
            str_location = location.location  # Use InputFile or OutputFile location
        else:
            str_location = location

        uri = urlparse(str_location)
        fs = self.get_fs(uri.scheme)
        fs.rm(str_location)

    def get_fs(self, scheme: str) -> AbstractFileSystem:
        """Get a filesystem for a specific scheme, cached per thread."""
        if not hasattr(self._thread_locals, "get_fs_cached"):
            self._thread_locals.get_fs_cached = lru_cache(self._get_fs)

        return self._thread_locals.get_fs_cached(scheme)

    def _get_fs(self, scheme: str) -> AbstractFileSystem:
        """Get a filesystem for a specific scheme."""
        if scheme not in self._scheme_to_fs:
            raise ValueError(f"No registered filesystem for scheme: {scheme}")
        return self._scheme_to_fs[scheme](self.properties)

    def __getstate__(self) -> Dict[str, Any]:
        """Create a dictionary of the FsSpecFileIO fields used when pickling."""
        fileio_copy = copy(self.__dict__)
        del fileio_copy["_thread_locals"]
        return fileio_copy

    def __setstate__(self, state: Dict[str, Any]) -> None:
        """Deserialize the state into a FsSpecFileIO instance."""
        self.__dict__ = state
        self._thread_locals = threading.local()

__getstate__()

Create a dictionary of the FsSpecFileIO fields used when pickling.

Source code in pyiceberg/io/fsspec.py
def __getstate__(self) -> Dict[str, Any]:
    """Create a dictionary of the FsSpecFileIO fields used when pickling."""
    fileio_copy = copy(self.__dict__)
    del fileio_copy["_thread_locals"]
    return fileio_copy

__setstate__(state)

Deserialize the state into a FsSpecFileIO instance.

Source code in pyiceberg/io/fsspec.py
def __setstate__(self, state: Dict[str, Any]) -> None:
    """Deserialize the state into a FsSpecFileIO instance."""
    self.__dict__ = state
    self._thread_locals = threading.local()

delete(location)

Delete the file at the given location.

Parameters:

Name Type Description Default
location Union[str, InputFile, OutputFile]

The URI to the file--if an InputFile instance or an OutputFile instance is provided, the location attribute for that instance is used as the location to delete.

required
Source code in pyiceberg/io/fsspec.py
def delete(self, location: Union[str, InputFile, OutputFile]) -> None:
    """Delete the file at the given location.

    Args:
        location (Union[str, InputFile, OutputFile]): The URI to the file--if an InputFile instance or an
            OutputFile instance is provided, the location attribute for that instance is used as the location
            to delete.
    """
    if isinstance(location, (InputFile, OutputFile)):
        str_location = location.location  # Use InputFile or OutputFile location
    else:
        str_location = location

    uri = urlparse(str_location)
    fs = self.get_fs(uri.scheme)
    fs.rm(str_location)

get_fs(scheme)

Get a filesystem for a specific scheme, cached per thread.

Source code in pyiceberg/io/fsspec.py
def get_fs(self, scheme: str) -> AbstractFileSystem:
    """Get a filesystem for a specific scheme, cached per thread."""
    if not hasattr(self._thread_locals, "get_fs_cached"):
        self._thread_locals.get_fs_cached = lru_cache(self._get_fs)

    return self._thread_locals.get_fs_cached(scheme)

new_input(location)

Get an FsspecInputFile instance to read bytes from the file at the given location.

Parameters:

Name Type Description Default
location str

A URI or a path to a local file.

required

Returns:

Name Type Description
FsspecInputFile FsspecInputFile

An FsspecInputFile instance for the given location.

Source code in pyiceberg/io/fsspec.py
def new_input(self, location: str) -> FsspecInputFile:
    """Get an FsspecInputFile instance to read bytes from the file at the given location.

    Args:
        location (str): A URI or a path to a local file.

    Returns:
        FsspecInputFile: An FsspecInputFile instance for the given location.
    """
    uri = urlparse(location)
    fs = self.get_fs(uri.scheme)
    return FsspecInputFile(location=location, fs=fs)

new_output(location)

Get an FsspecOutputFile instance to write bytes to the file at the given location.

Parameters:

Name Type Description Default
location str

A URI or a path to a local file.

required

Returns:

Name Type Description
FsspecOutputFile FsspecOutputFile

An FsspecOutputFile instance for the given location.

Source code in pyiceberg/io/fsspec.py
def new_output(self, location: str) -> FsspecOutputFile:
    """Get an FsspecOutputFile instance to write bytes to the file at the given location.

    Args:
        location (str): A URI or a path to a local file.

    Returns:
        FsspecOutputFile: An FsspecOutputFile instance for the given location.
    """
    uri = urlparse(location)
    fs = self.get_fs(uri.scheme)
    return FsspecOutputFile(location=location, fs=fs)

FsspecInputFile

Bases: InputFile

An input file implementation for the FsspecFileIO.

Parameters:

Name Type Description Default
location str

A URI to a file location.

required
fs AbstractFileSystem

An fsspec filesystem instance.

required
Source code in pyiceberg/io/fsspec.py
class FsspecInputFile(InputFile):
    """An input file implementation for the FsspecFileIO.

    Args:
        location (str): A URI to a file location.
        fs (AbstractFileSystem): An fsspec filesystem instance.
    """

    def __init__(self, location: str, fs: AbstractFileSystem):
        self._fs = fs
        super().__init__(location=location)

    def __len__(self) -> int:
        """Return the total length of the file, in bytes."""
        object_info = self._fs.info(self.location)
        if size := object_info.get("Size"):
            return size
        elif size := object_info.get("size"):
            return size
        raise RuntimeError(f"Cannot retrieve object info: {self.location}")

    def exists(self) -> bool:
        """Check whether the location exists."""
        return self._fs.lexists(self.location)

    def open(self, seekable: bool = True) -> InputStream:
        """Create an input stream for reading the contents of the file.

        Args:
            seekable: If the stream should support seek, or if it is consumed sequential.

        Returns:
            OpenFile: An fsspec compliant file-like object.

        Raises:
            FileNotFoundError: If the file does not exist.
        """
        try:
            return self._fs.open(self.location, "rb")
        except FileNotFoundError as e:
            # To have a consistent error handling experience, make sure exception contains missing file location.
            raise e if e.filename else FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), self.location) from e

__len__()

Return the total length of the file, in bytes.

Source code in pyiceberg/io/fsspec.py
def __len__(self) -> int:
    """Return the total length of the file, in bytes."""
    object_info = self._fs.info(self.location)
    if size := object_info.get("Size"):
        return size
    elif size := object_info.get("size"):
        return size
    raise RuntimeError(f"Cannot retrieve object info: {self.location}")

exists()

Check whether the location exists.

Source code in pyiceberg/io/fsspec.py
def exists(self) -> bool:
    """Check whether the location exists."""
    return self._fs.lexists(self.location)

open(seekable=True)

Create an input stream for reading the contents of the file.

Parameters:

Name Type Description Default
seekable bool

If the stream should support seek, or if it is consumed sequential.

True

Returns:

Name Type Description
OpenFile InputStream

An fsspec compliant file-like object.

Raises:

Type Description
FileNotFoundError

If the file does not exist.

Source code in pyiceberg/io/fsspec.py
def open(self, seekable: bool = True) -> InputStream:
    """Create an input stream for reading the contents of the file.

    Args:
        seekable: If the stream should support seek, or if it is consumed sequential.

    Returns:
        OpenFile: An fsspec compliant file-like object.

    Raises:
        FileNotFoundError: If the file does not exist.
    """
    try:
        return self._fs.open(self.location, "rb")
    except FileNotFoundError as e:
        # To have a consistent error handling experience, make sure exception contains missing file location.
        raise e if e.filename else FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), self.location) from e

FsspecOutputFile

Bases: OutputFile

An output file implementation for the FsspecFileIO.

Parameters:

Name Type Description Default
location str

A URI to a file location.

required
fs AbstractFileSystem

An fsspec filesystem instance.

required
Source code in pyiceberg/io/fsspec.py
class FsspecOutputFile(OutputFile):
    """An output file implementation for the FsspecFileIO.

    Args:
        location (str): A URI to a file location.
        fs (AbstractFileSystem): An fsspec filesystem instance.
    """

    def __init__(self, location: str, fs: AbstractFileSystem):
        self._fs = fs
        super().__init__(location=location)

    def __len__(self) -> int:
        """Return the total length of the file, in bytes."""
        object_info = self._fs.info(self.location)
        if size := object_info.get("Size"):
            return size
        elif size := object_info.get("size"):
            return size
        raise RuntimeError(f"Cannot retrieve object info: {self.location}")

    def exists(self) -> bool:
        """Check whether the location exists."""
        return self._fs.lexists(self.location)

    def create(self, overwrite: bool = False) -> OutputStream:
        """Create an output stream for reading the contents of the file.

        Args:
            overwrite (bool): Whether to overwrite the file if it already exists.

        Returns:
            OpenFile: An fsspec compliant file-like object.

        Raises:
            FileExistsError: If the file already exists at the location and overwrite is set to False.

        Note:
            If overwrite is set to False, a check is first performed to verify that the file does not exist.
            This is not thread-safe and a possibility does exist that the file can be created by a concurrent
            process after the existence check yet before the output stream is created. In such a case, the default
            behavior will truncate the contents of the existing file when opening the output stream.
        """
        if not overwrite and self.exists():
            raise FileExistsError(f"Cannot create file, file already exists: {self.location}")
        return self._fs.open(self.location, "wb")

    def to_input_file(self) -> FsspecInputFile:
        """Return a new FsspecInputFile for the location at `self.location`."""
        return FsspecInputFile(location=self.location, fs=self._fs)

__len__()

Return the total length of the file, in bytes.

Source code in pyiceberg/io/fsspec.py
def __len__(self) -> int:
    """Return the total length of the file, in bytes."""
    object_info = self._fs.info(self.location)
    if size := object_info.get("Size"):
        return size
    elif size := object_info.get("size"):
        return size
    raise RuntimeError(f"Cannot retrieve object info: {self.location}")

create(overwrite=False)

Create an output stream for reading the contents of the file.

Parameters:

Name Type Description Default
overwrite bool

Whether to overwrite the file if it already exists.

False

Returns:

Name Type Description
OpenFile OutputStream

An fsspec compliant file-like object.

Raises:

Type Description
FileExistsError

If the file already exists at the location and overwrite is set to False.

Note

If overwrite is set to False, a check is first performed to verify that the file does not exist. This is not thread-safe and a possibility does exist that the file can be created by a concurrent process after the existence check yet before the output stream is created. In such a case, the default behavior will truncate the contents of the existing file when opening the output stream.

Source code in pyiceberg/io/fsspec.py
def create(self, overwrite: bool = False) -> OutputStream:
    """Create an output stream for reading the contents of the file.

    Args:
        overwrite (bool): Whether to overwrite the file if it already exists.

    Returns:
        OpenFile: An fsspec compliant file-like object.

    Raises:
        FileExistsError: If the file already exists at the location and overwrite is set to False.

    Note:
        If overwrite is set to False, a check is first performed to verify that the file does not exist.
        This is not thread-safe and a possibility does exist that the file can be created by a concurrent
        process after the existence check yet before the output stream is created. In such a case, the default
        behavior will truncate the contents of the existing file when opening the output stream.
    """
    if not overwrite and self.exists():
        raise FileExistsError(f"Cannot create file, file already exists: {self.location}")
    return self._fs.open(self.location, "wb")

exists()

Check whether the location exists.

Source code in pyiceberg/io/fsspec.py
def exists(self) -> bool:
    """Check whether the location exists."""
    return self._fs.lexists(self.location)

to_input_file()

Return a new FsspecInputFile for the location at self.location.

Source code in pyiceberg/io/fsspec.py
def to_input_file(self) -> FsspecInputFile:
    """Return a new FsspecInputFile for the location at `self.location`."""
    return FsspecInputFile(location=self.location, fs=self._fs)

S3RequestSigner

Bases: ABC

Abstract base class for S3 request signers.

Source code in pyiceberg/io/fsspec.py
class S3RequestSigner(abc.ABC):
    """Abstract base class for S3 request signers."""

    properties: Properties

    def __init__(self, properties: Properties) -> None:
        self.properties = properties

    @abc.abstractmethod
    def __call__(self, request: "AWSRequest", **_: Any) -> None:
        pass

S3V4RestSigner

Bases: S3RequestSigner

An S3 request signer that uses an external REST signing service to sign requests.

Source code in pyiceberg/io/fsspec.py
class S3V4RestSigner(S3RequestSigner):
    """An S3 request signer that uses an external REST signing service to sign requests."""

    _session: requests.Session

    def __init__(self, properties: Properties) -> None:
        super().__init__(properties)
        self._session = requests.Session()

    def __call__(self, request: "AWSRequest", **_: Any) -> None:
        signer_url = self.properties.get(S3_SIGNER_URI, self.properties[URI]).rstrip("/")  # type: ignore
        signer_endpoint = self.properties.get(S3_SIGNER_ENDPOINT, S3_SIGNER_ENDPOINT_DEFAULT)

        signer_headers = {}
        if token := self.properties.get(TOKEN):
            signer_headers = {"Authorization": f"Bearer {token}"}
        signer_headers.update(get_header_properties(self.properties))

        signer_body = {
            "method": request.method,
            "region": request.context["client_region"],
            "uri": request.url,
            "headers": {key: [val] for key, val in request.headers.items()},
        }

        response = self._session.post(f"{signer_url}/{signer_endpoint.strip()}", headers=signer_headers, json=signer_body)
        try:
            response.raise_for_status()
            response_json = response.json()
        except HTTPError as e:
            raise SignError(f"Failed to sign request {response.status_code}: {signer_body}") from e

        for key, value in response_json["headers"].items():
            request.headers.add_header(key, ", ".join(value))

        request.url = response_json["uri"]