Skip to content

locations

LocationProvider

Bases: ABC

A base class for location providers, that provide file locations for a table's write tasks.

Parameters:

Name Type Description Default
table_location str

The table's base storage location.

required
table_properties Properties

The table's properties.

required
Source code in pyiceberg/table/locations.py
class LocationProvider(ABC):
    """A base class for location providers, that provide file locations for a table's write tasks.

    Args:
        table_location (str): The table's base storage location.
        table_properties (Properties): The table's properties.
    """

    table_location: str
    table_properties: Properties

    data_path: str
    metadata_path: str

    def __init__(self, table_location: str, table_properties: Properties):
        self.table_location = table_location
        self.table_properties = table_properties

        from pyiceberg.table import TableProperties

        if path := table_properties.get(TableProperties.WRITE_DATA_PATH):
            self.data_path = path.rstrip("/")
        else:
            self.data_path = f"{self.table_location.rstrip('/')}/data"

        if path := table_properties.get(TableProperties.WRITE_METADATA_PATH):
            self.metadata_path = path.rstrip("/")
        else:
            self.metadata_path = f"{self.table_location.rstrip('/')}/metadata"

    @abstractmethod
    def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str:
        """Return a fully-qualified data file location for the given filename.

        Args:
            data_file_name (str): The name of the data file.
            partition_key (Optional[PartitionKey]): The data file's partition key. If None, the data is not partitioned.

        Returns:
            str: A fully-qualified location URI for the data file.
        """

    def new_table_metadata_file_location(self, new_version: int = 0) -> str:
        """Return a fully-qualified metadata file location for a new table version.

        Args:
            new_version (int): Version number of the metadata file.

        Returns:
            str: fully-qualified URI for the new table metadata file.

        Raises:
            ValueError: If the version is negative.
        """
        if new_version < 0:
            raise ValueError(f"Table metadata version: `{new_version}` must be a non-negative integer")

        file_name = f"{new_version:05d}-{uuid.uuid4()}.metadata.json"
        return self.new_metadata_location(file_name)

    def new_metadata_location(self, metadata_file_name: str) -> str:
        """Return a fully-qualified metadata file location for the given filename.

        Args:
            metadata_file_name (str): Name of the metadata file.

        Returns:
            str: A fully-qualified location URI for the metadata file.
        """
        return f"{self.metadata_path}/{metadata_file_name}"

new_data_location(data_file_name, partition_key=None) abstractmethod

Return a fully-qualified data file location for the given filename.

Parameters:

Name Type Description Default
data_file_name str

The name of the data file.

required
partition_key Optional[PartitionKey]

The data file's partition key. If None, the data is not partitioned.

None

Returns:

Name Type Description
str str

A fully-qualified location URI for the data file.

Source code in pyiceberg/table/locations.py
@abstractmethod
def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str:
    """Return a fully-qualified data file location for the given filename.

    Args:
        data_file_name (str): The name of the data file.
        partition_key (Optional[PartitionKey]): The data file's partition key. If None, the data is not partitioned.

    Returns:
        str: A fully-qualified location URI for the data file.
    """

new_metadata_location(metadata_file_name)

Return a fully-qualified metadata file location for the given filename.

Parameters:

Name Type Description Default
metadata_file_name str

Name of the metadata file.

required

Returns:

Name Type Description
str str

A fully-qualified location URI for the metadata file.

Source code in pyiceberg/table/locations.py
def new_metadata_location(self, metadata_file_name: str) -> str:
    """Return a fully-qualified metadata file location for the given filename.

    Args:
        metadata_file_name (str): Name of the metadata file.

    Returns:
        str: A fully-qualified location URI for the metadata file.
    """
    return f"{self.metadata_path}/{metadata_file_name}"

new_table_metadata_file_location(new_version=0)

Return a fully-qualified metadata file location for a new table version.

Parameters:

Name Type Description Default
new_version int

Version number of the metadata file.

0

Returns:

Name Type Description
str str

fully-qualified URI for the new table metadata file.

Raises:

Type Description
ValueError

If the version is negative.

Source code in pyiceberg/table/locations.py
def new_table_metadata_file_location(self, new_version: int = 0) -> str:
    """Return a fully-qualified metadata file location for a new table version.

    Args:
        new_version (int): Version number of the metadata file.

    Returns:
        str: fully-qualified URI for the new table metadata file.

    Raises:
        ValueError: If the version is negative.
    """
    if new_version < 0:
        raise ValueError(f"Table metadata version: `{new_version}` must be a non-negative integer")

    file_name = f"{new_version:05d}-{uuid.uuid4()}.metadata.json"
    return self.new_metadata_location(file_name)

ObjectStoreLocationProvider

Bases: LocationProvider

Source code in pyiceberg/table/locations.py
class ObjectStoreLocationProvider(LocationProvider):
    HASH_BINARY_STRING_BITS = 20
    ENTROPY_DIR_LENGTH = 4
    ENTROPY_DIR_DEPTH = 3

    _include_partition_paths: bool

    def __init__(self, table_location: str, table_properties: Properties):
        super().__init__(table_location, table_properties)
        from pyiceberg.table import TableProperties

        self._include_partition_paths = property_as_bool(
            self.table_properties,
            TableProperties.WRITE_OBJECT_STORE_PARTITIONED_PATHS,
            TableProperties.WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT,
        )

    def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str:
        if self._include_partition_paths and partition_key:
            return self.new_data_location(f"{partition_key.to_path()}/{data_file_name}")

        hashed_path = self._compute_hash(data_file_name)

        return (
            f"{self.data_path}/{hashed_path}/{data_file_name}"
            if self._include_partition_paths
            else f"{self.data_path}/{hashed_path}-{data_file_name}"
        )

    @staticmethod
    def _compute_hash(data_file_name: str) -> str:
        # Bitwise AND to combat sign-extension; bitwise OR to preserve leading zeroes that `bin` would otherwise strip.
        top_mask = 1 << ObjectStoreLocationProvider.HASH_BINARY_STRING_BITS
        hash_code = mmh3.hash(data_file_name) & (top_mask - 1) | top_mask
        return ObjectStoreLocationProvider._dirs_from_hash(bin(hash_code)[-ObjectStoreLocationProvider.HASH_BINARY_STRING_BITS :])

    @staticmethod
    def _dirs_from_hash(file_hash: str) -> str:
        """Divides hash into directories for optimized orphan removal operation using ENTROPY_DIR_DEPTH and ENTROPY_DIR_LENGTH."""
        total_entropy_length = ObjectStoreLocationProvider.ENTROPY_DIR_DEPTH * ObjectStoreLocationProvider.ENTROPY_DIR_LENGTH

        hash_with_dirs = []
        for i in range(0, total_entropy_length, ObjectStoreLocationProvider.ENTROPY_DIR_LENGTH):
            hash_with_dirs.append(file_hash[i : i + ObjectStoreLocationProvider.ENTROPY_DIR_LENGTH])

        if len(file_hash) > total_entropy_length:
            hash_with_dirs.append(file_hash[total_entropy_length:])

        return "/".join(hash_with_dirs)

_dirs_from_hash(file_hash) staticmethod

Divides hash into directories for optimized orphan removal operation using ENTROPY_DIR_DEPTH and ENTROPY_DIR_LENGTH.

Source code in pyiceberg/table/locations.py
@staticmethod
def _dirs_from_hash(file_hash: str) -> str:
    """Divides hash into directories for optimized orphan removal operation using ENTROPY_DIR_DEPTH and ENTROPY_DIR_LENGTH."""
    total_entropy_length = ObjectStoreLocationProvider.ENTROPY_DIR_DEPTH * ObjectStoreLocationProvider.ENTROPY_DIR_LENGTH

    hash_with_dirs = []
    for i in range(0, total_entropy_length, ObjectStoreLocationProvider.ENTROPY_DIR_LENGTH):
        hash_with_dirs.append(file_hash[i : i + ObjectStoreLocationProvider.ENTROPY_DIR_LENGTH])

    if len(file_hash) > total_entropy_length:
        hash_with_dirs.append(file_hash[total_entropy_length:])

    return "/".join(hash_with_dirs)