Skip to content

config

Config

Source code in pyiceberg/utils/config.py
class Config:
    config: RecursiveDict

    def __init__(self) -> None:
        config = self._from_configuration_files() or {}
        config = merge_config(config, self._from_environment_variables(config))
        self.config = FrozenDict(**config)

    @staticmethod
    def _from_configuration_files() -> Optional[RecursiveDict]:
        """Load the first configuration file that its finds.

        Will first look in the PYICEBERG_HOME env variable,
        and then in the home directory.
        """

        def _load_yaml(directory: Optional[str]) -> Optional[RecursiveDict]:
            if directory:
                path = os.path.join(directory, PYICEBERG_YML)
                if os.path.isfile(path):
                    with open(path, encoding=UTF8) as f:
                        yml_str = f.read()
                    file_config = strictyaml.load(yml_str).data
                    file_config_lowercase = _lowercase_dictionary_keys(file_config)
                    return file_config_lowercase
            return None

        # Give priority to the PYICEBERG_HOME directory
        if pyiceberg_home_config := _load_yaml(os.environ.get(PYICEBERG_HOME)):
            return pyiceberg_home_config
        # Look into the home directory
        if pyiceberg_home_config := _load_yaml(os.path.expanduser("~")):
            return pyiceberg_home_config
        # Didn't find a config
        return None

    @staticmethod
    def _from_environment_variables(config: RecursiveDict) -> RecursiveDict:
        """Read the environment variables, to check if there are any prepended by PYICEBERG_.

        Args:
            config: Existing configuration that's being amended with configuration from environment variables.

        Returns:
            Amended configuration.
        """

        def set_property(_config: RecursiveDict, path: List[str], config_value: str) -> None:
            while len(path) > 0:
                element = path.pop(0)
                if len(path) == 0:
                    # We're at the end
                    _config[element] = config_value
                else:
                    # We have to go deeper
                    if element not in _config:
                        _config[element] = {}
                    if isinstance(_config[element], dict):
                        _config = _config[element]  # type: ignore
                    else:
                        raise ValueError(
                            f"Incompatible configurations, merging dict with a value: {'.'.join(path)}, value: {config_value}"
                        )

        for env_var, config_value in os.environ.items():
            # Make it lowercase to make it case-insensitive
            env_var_lower = env_var.lower()
            if env_var_lower.startswith(PYICEBERG.lower()):
                key = env_var_lower[len(PYICEBERG) :]
                parts = key.split("__", maxsplit=2)
                parts_normalized = [part.replace("__", ".").replace("_", "-") for part in parts]
                set_property(config, parts_normalized, config_value)

        return config

    def get_default_catalog_name(self) -> str:
        """Return the default catalog name.

        Returns: The name of the default catalog in `default-catalog`.
                 Returns `default` when the key cannot be found in the config file.
        """
        if default_catalog_name := self.config.get(DEFAULT_CATALOG):
            if not isinstance(default_catalog_name, str):
                raise ValueError(f"Default catalog name should be a str: {default_catalog_name}")
            return default_catalog_name
        return DEFAULT

    def get_catalog_config(self, catalog_name: str) -> Optional[RecursiveDict]:
        if CATALOG in self.config:
            catalog_name_lower = catalog_name.lower()
            catalogs = self.config[CATALOG]
            if not isinstance(catalogs, dict):
                raise ValueError(f"Catalog configurations needs to be an object: {catalog_name}")
            if catalog_name_lower in catalogs:
                catalog_conf = catalogs[catalog_name_lower]
                assert isinstance(catalog_conf, dict), f"Configuration path catalogs.{catalog_name_lower} needs to be an object"
                return catalog_conf
        return None

    def get_int(self, key: str) -> Optional[int]:
        if (val := self.config.get(key)) is not None:
            try:
                return int(val)  # type: ignore
            except ValueError as err:
                raise ValueError(f"{key} should be an integer or left unset. Current value: {val}") from err
        return None

    def get_bool(self, key: str) -> Optional[bool]:
        if (val := self.config.get(key)) is not None:
            try:
                return strtobool(val)  # type: ignore
            except ValueError as err:
                raise ValueError(f"{key} should be a boolean or left unset. Current value: {val}") from err
        return None

get_default_catalog_name()

Return the default catalog name.

The name of the default catalog in `default-catalog`.

Type Description
str

Returns default when the key cannot be found in the config file.

Source code in pyiceberg/utils/config.py
def get_default_catalog_name(self) -> str:
    """Return the default catalog name.

    Returns: The name of the default catalog in `default-catalog`.
             Returns `default` when the key cannot be found in the config file.
    """
    if default_catalog_name := self.config.get(DEFAULT_CATALOG):
        if not isinstance(default_catalog_name, str):
            raise ValueError(f"Default catalog name should be a str: {default_catalog_name}")
        return default_catalog_name
    return DEFAULT

merge_config(lhs, rhs)

Merge right-hand side into the left-hand side.

Source code in pyiceberg/utils/config.py
def merge_config(lhs: RecursiveDict, rhs: RecursiveDict) -> RecursiveDict:
    """Merge right-hand side into the left-hand side."""
    new_config = lhs.copy()
    for rhs_key, rhs_value in rhs.items():
        if rhs_key in new_config:
            lhs_value = new_config[rhs_key]
            if isinstance(lhs_value, dict) and isinstance(rhs_value, dict):
                # If they are both dicts, then we have to go deeper
                new_config[rhs_key] = merge_config(lhs_value, rhs_value)
            else:
                # Take the non-null value, with precedence on rhs
                new_config[rhs_key] = rhs_value or lhs_value
        else:
            # New key
            new_config[rhs_key] = rhs_value

    return new_config