Skip to content

concurrent

Concurrency concepts that support efficient multi-threading.

ExecutorFactory

Source code in pyiceberg/utils/concurrent.py
class ExecutorFactory:
    _instance: Optional[Executor] = None
    _instance_pid: Optional[int] = None

    @staticmethod
    def max_workers() -> Optional[int]:
        """Return the max number of workers configured."""
        return Config().get_int("max-workers")

    @staticmethod
    def get_or_create() -> Executor:
        """Return the same executor in each call."""
        # ThreadPoolExecutor cannot be shared across processes.  If a new pid is found it means
        # there is a new process so a new executor is needed.  Otherwise, the executor may be in
        # an invalid state and tasks submitted will not be started.
        if ExecutorFactory._instance_pid != os.getpid():
            ExecutorFactory._instance_pid = os.getpid()
            ExecutorFactory._instance = None

        if ExecutorFactory._instance is None:
            max_workers = ExecutorFactory.max_workers()
            ExecutorFactory._instance = ThreadPoolExecutor(max_workers=max_workers)

        return ExecutorFactory._instance

get_or_create() staticmethod

Return the same executor in each call.

Source code in pyiceberg/utils/concurrent.py
@staticmethod
def get_or_create() -> Executor:
    """Return the same executor in each call."""
    # ThreadPoolExecutor cannot be shared across processes.  If a new pid is found it means
    # there is a new process so a new executor is needed.  Otherwise, the executor may be in
    # an invalid state and tasks submitted will not be started.
    if ExecutorFactory._instance_pid != os.getpid():
        ExecutorFactory._instance_pid = os.getpid()
        ExecutorFactory._instance = None

    if ExecutorFactory._instance is None:
        max_workers = ExecutorFactory.max_workers()
        ExecutorFactory._instance = ThreadPoolExecutor(max_workers=max_workers)

    return ExecutorFactory._instance

max_workers() staticmethod

Return the max number of workers configured.

Source code in pyiceberg/utils/concurrent.py
@staticmethod
def max_workers() -> Optional[int]:
    """Return the max number of workers configured."""
    return Config().get_int("max-workers")