Skip to content

fsspec

FileIO implementation for reading and writing table files that uses fsspec compatible filesystems.

FsspecFileIO

Bases: FileIO

A FileIO implementation that uses fsspec.

Source code in pyiceberg/io/fsspec.py
class FsspecFileIO(FileIO):
    """A FileIO implementation that uses fsspec."""

    def __init__(self, properties: Properties):
        self._scheme_to_fs = {}
        self._scheme_to_fs.update(SCHEME_TO_FS)
        self.get_fs: Callable[[str], AbstractFileSystem] = lru_cache(self._get_fs)
        super().__init__(properties=properties)

    def new_input(self, location: str) -> FsspecInputFile:
        """Get an FsspecInputFile instance to read bytes from the file at the given location.

        Args:
            location (str): A URI or a path to a local file.

        Returns:
            FsspecInputFile: An FsspecInputFile instance for the given location.
        """
        uri = urlparse(location)
        fs = self.get_fs(uri.scheme)
        return FsspecInputFile(location=location, fs=fs)

    def new_output(self, location: str) -> FsspecOutputFile:
        """Get an FsspecOutputFile instance to write bytes to the file at the given location.

        Args:
            location (str): A URI or a path to a local file.

        Returns:
            FsspecOutputFile: An FsspecOutputFile instance for the given location.
        """
        uri = urlparse(location)
        fs = self.get_fs(uri.scheme)
        return FsspecOutputFile(location=location, fs=fs)

    def delete(self, location: Union[str, InputFile, OutputFile]) -> None:
        """Delete the file at the given location.

        Args:
            location (Union[str, InputFile, OutputFile]): The URI to the file--if an InputFile instance or an
                OutputFile instance is provided, the location attribute for that instance is used as the location
                to delete.
        """
        if isinstance(location, (InputFile, OutputFile)):
            str_location = location.location  # Use InputFile or OutputFile location
        else:
            str_location = location

        uri = urlparse(str_location)
        fs = self.get_fs(uri.scheme)
        fs.rm(str_location)

    def _get_fs(self, scheme: str) -> AbstractFileSystem:
        """Get a filesystem for a specific scheme."""
        if scheme not in self._scheme_to_fs:
            raise ValueError(f"No registered filesystem for scheme: {scheme}")
        return self._scheme_to_fs[scheme](self.properties)

delete(location)

Delete the file at the given location.

Parameters:

Name Type Description Default
location Union[str, InputFile, OutputFile]

The URI to the file--if an InputFile instance or an OutputFile instance is provided, the location attribute for that instance is used as the location to delete.

required
Source code in pyiceberg/io/fsspec.py
def delete(self, location: Union[str, InputFile, OutputFile]) -> None:
    """Delete the file at the given location.

    Args:
        location (Union[str, InputFile, OutputFile]): The URI to the file--if an InputFile instance or an
            OutputFile instance is provided, the location attribute for that instance is used as the location
            to delete.
    """
    if isinstance(location, (InputFile, OutputFile)):
        str_location = location.location  # Use InputFile or OutputFile location
    else:
        str_location = location

    uri = urlparse(str_location)
    fs = self.get_fs(uri.scheme)
    fs.rm(str_location)

new_input(location)

Get an FsspecInputFile instance to read bytes from the file at the given location.

Parameters:

Name Type Description Default
location str

A URI or a path to a local file.

required

Returns:

Name Type Description
FsspecInputFile FsspecInputFile

An FsspecInputFile instance for the given location.

Source code in pyiceberg/io/fsspec.py
def new_input(self, location: str) -> FsspecInputFile:
    """Get an FsspecInputFile instance to read bytes from the file at the given location.

    Args:
        location (str): A URI or a path to a local file.

    Returns:
        FsspecInputFile: An FsspecInputFile instance for the given location.
    """
    uri = urlparse(location)
    fs = self.get_fs(uri.scheme)
    return FsspecInputFile(location=location, fs=fs)

new_output(location)

Get an FsspecOutputFile instance to write bytes to the file at the given location.

Parameters:

Name Type Description Default
location str

A URI or a path to a local file.

required

Returns:

Name Type Description
FsspecOutputFile FsspecOutputFile

An FsspecOutputFile instance for the given location.

Source code in pyiceberg/io/fsspec.py
def new_output(self, location: str) -> FsspecOutputFile:
    """Get an FsspecOutputFile instance to write bytes to the file at the given location.

    Args:
        location (str): A URI or a path to a local file.

    Returns:
        FsspecOutputFile: An FsspecOutputFile instance for the given location.
    """
    uri = urlparse(location)
    fs = self.get_fs(uri.scheme)
    return FsspecOutputFile(location=location, fs=fs)

FsspecInputFile

Bases: InputFile

An input file implementation for the FsspecFileIO.

Parameters:

Name Type Description Default
location str

A URI to a file location.

required
fs AbstractFileSystem

An fsspec filesystem instance.

required
Source code in pyiceberg/io/fsspec.py
class FsspecInputFile(InputFile):
    """An input file implementation for the FsspecFileIO.

    Args:
        location (str): A URI to a file location.
        fs (AbstractFileSystem): An fsspec filesystem instance.
    """

    def __init__(self, location: str, fs: AbstractFileSystem):
        self._fs = fs
        super().__init__(location=location)

    def __len__(self) -> int:
        """Return the total length of the file, in bytes."""
        object_info = self._fs.info(self.location)
        if size := object_info.get("Size"):
            return size
        elif size := object_info.get("size"):
            return size
        raise RuntimeError(f"Cannot retrieve object info: {self.location}")

    def exists(self) -> bool:
        """Check whether the location exists."""
        return self._fs.lexists(self.location)

    def open(self, seekable: bool = True) -> InputStream:
        """Create an input stream for reading the contents of the file.

        Args:
            seekable: If the stream should support seek, or if it is consumed sequential.

        Returns:
            OpenFile: An fsspec compliant file-like object.

        Raises:
            FileNotFoundError: If the file does not exist.
        """
        try:
            return self._fs.open(self.location, "rb")
        except FileNotFoundError as e:
            # To have a consistent error handling experience, make sure exception contains missing file location.
            raise e if e.filename else FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), self.location) from e

__len__()

Return the total length of the file, in bytes.

Source code in pyiceberg/io/fsspec.py
def __len__(self) -> int:
    """Return the total length of the file, in bytes."""
    object_info = self._fs.info(self.location)
    if size := object_info.get("Size"):
        return size
    elif size := object_info.get("size"):
        return size
    raise RuntimeError(f"Cannot retrieve object info: {self.location}")

exists()

Check whether the location exists.

Source code in pyiceberg/io/fsspec.py
def exists(self) -> bool:
    """Check whether the location exists."""
    return self._fs.lexists(self.location)

open(seekable=True)

Create an input stream for reading the contents of the file.

Parameters:

Name Type Description Default
seekable bool

If the stream should support seek, or if it is consumed sequential.

True

Returns:

Name Type Description
OpenFile InputStream

An fsspec compliant file-like object.

Raises:

Type Description
FileNotFoundError

If the file does not exist.

Source code in pyiceberg/io/fsspec.py
def open(self, seekable: bool = True) -> InputStream:
    """Create an input stream for reading the contents of the file.

    Args:
        seekable: If the stream should support seek, or if it is consumed sequential.

    Returns:
        OpenFile: An fsspec compliant file-like object.

    Raises:
        FileNotFoundError: If the file does not exist.
    """
    try:
        return self._fs.open(self.location, "rb")
    except FileNotFoundError as e:
        # To have a consistent error handling experience, make sure exception contains missing file location.
        raise e if e.filename else FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), self.location) from e

FsspecOutputFile

Bases: OutputFile

An output file implementation for the FsspecFileIO.

Parameters:

Name Type Description Default
location str

A URI to a file location.

required
fs AbstractFileSystem

An fsspec filesystem instance.

required
Source code in pyiceberg/io/fsspec.py
class FsspecOutputFile(OutputFile):
    """An output file implementation for the FsspecFileIO.

    Args:
        location (str): A URI to a file location.
        fs (AbstractFileSystem): An fsspec filesystem instance.
    """

    def __init__(self, location: str, fs: AbstractFileSystem):
        self._fs = fs
        super().__init__(location=location)

    def __len__(self) -> int:
        """Return the total length of the file, in bytes."""
        object_info = self._fs.info(self.location)
        if size := object_info.get("Size"):
            return size
        elif size := object_info.get("size"):
            return size
        raise RuntimeError(f"Cannot retrieve object info: {self.location}")

    def exists(self) -> bool:
        """Check whether the location exists."""
        return self._fs.lexists(self.location)

    def create(self, overwrite: bool = False) -> OutputStream:
        """Create an output stream for reading the contents of the file.

        Args:
            overwrite (bool): Whether to overwrite the file if it already exists.

        Returns:
            OpenFile: An fsspec compliant file-like object.

        Raises:
            FileExistsError: If the file already exists at the location and overwrite is set to False.

        Note:
            If overwrite is set to False, a check is first performed to verify that the file does not exist.
            This is not thread-safe and a possibility does exist that the file can be created by a concurrent
            process after the existence check yet before the output stream is created. In such a case, the default
            behavior will truncate the contents of the existing file when opening the output stream.
        """
        if not overwrite and self.exists():
            raise FileExistsError(f"Cannot create file, file already exists: {self.location}")
        return self._fs.open(self.location, "wb")

    def to_input_file(self) -> FsspecInputFile:
        """Return a new FsspecInputFile for the location at `self.location`."""
        return FsspecInputFile(location=self.location, fs=self._fs)

__len__()

Return the total length of the file, in bytes.

Source code in pyiceberg/io/fsspec.py
def __len__(self) -> int:
    """Return the total length of the file, in bytes."""
    object_info = self._fs.info(self.location)
    if size := object_info.get("Size"):
        return size
    elif size := object_info.get("size"):
        return size
    raise RuntimeError(f"Cannot retrieve object info: {self.location}")

create(overwrite=False)

Create an output stream for reading the contents of the file.

Parameters:

Name Type Description Default
overwrite bool

Whether to overwrite the file if it already exists.

False

Returns:

Name Type Description
OpenFile OutputStream

An fsspec compliant file-like object.

Raises:

Type Description
FileExistsError

If the file already exists at the location and overwrite is set to False.

Note

If overwrite is set to False, a check is first performed to verify that the file does not exist. This is not thread-safe and a possibility does exist that the file can be created by a concurrent process after the existence check yet before the output stream is created. In such a case, the default behavior will truncate the contents of the existing file when opening the output stream.

Source code in pyiceberg/io/fsspec.py
def create(self, overwrite: bool = False) -> OutputStream:
    """Create an output stream for reading the contents of the file.

    Args:
        overwrite (bool): Whether to overwrite the file if it already exists.

    Returns:
        OpenFile: An fsspec compliant file-like object.

    Raises:
        FileExistsError: If the file already exists at the location and overwrite is set to False.

    Note:
        If overwrite is set to False, a check is first performed to verify that the file does not exist.
        This is not thread-safe and a possibility does exist that the file can be created by a concurrent
        process after the existence check yet before the output stream is created. In such a case, the default
        behavior will truncate the contents of the existing file when opening the output stream.
    """
    if not overwrite and self.exists():
        raise FileExistsError(f"Cannot create file, file already exists: {self.location}")
    return self._fs.open(self.location, "wb")

exists()

Check whether the location exists.

Source code in pyiceberg/io/fsspec.py
def exists(self) -> bool:
    """Check whether the location exists."""
    return self._fs.lexists(self.location)

to_input_file()

Return a new FsspecInputFile for the location at self.location.

Source code in pyiceberg/io/fsspec.py
def to_input_file(self) -> FsspecInputFile:
    """Return a new FsspecInputFile for the location at `self.location`."""
    return FsspecInputFile(location=self.location, fs=self._fs)