Skip to content

sorting

UpdateSortOrder

Bases: UpdateTableMetadata['UpdateSortOrder']

Source code in pyiceberg/table/update/sorting.py
class UpdateSortOrder(UpdateTableMetadata["UpdateSortOrder"]):
    _transaction: Transaction
    _last_assigned_order_id: Optional[int]
    _case_sensitive: bool
    _fields: List[SortField]

    def __init__(self, transaction: Transaction, case_sensitive: bool = True) -> None:
        super().__init__(transaction)
        self._fields: List[SortField] = []
        self._case_sensitive: bool = case_sensitive
        self._last_assigned_order_id: Optional[int] = None

    def _column_name_to_id(self, column_name: str) -> int:
        """Map the column name to the column field id."""
        return (
            self._transaction.table_metadata.schema()
            .find_field(
                name_or_id=column_name,
                case_sensitive=self._case_sensitive,
            )
            .field_id
        )

    def _add_sort_field(
        self,
        source_id: int,
        transform: Transform[Any, Any],
        direction: SortDirection,
        null_order: NullOrder,
    ) -> UpdateSortOrder:
        """Add a sort field to the sort order list."""
        self._fields.append(
            SortField(
                source_id=source_id,
                transform=transform,
                direction=direction,
                null_order=null_order,
            )
        )
        return self

    def _reuse_or_create_sort_order_id(self) -> int:
        """Return the last assigned sort order id or create a new one."""
        new_sort_order_id = INITIAL_SORT_ORDER_ID
        for sort_order in self._transaction.table_metadata.sort_orders:
            new_sort_order_id = max(new_sort_order_id, sort_order.order_id)
            if sort_order.fields == self._fields:
                return sort_order.order_id
            elif new_sort_order_id <= sort_order.order_id:
                new_sort_order_id = sort_order.order_id + 1
        return new_sort_order_id

    def asc(
        self, source_column_name: str, transform: Transform[Any, Any], null_order: NullOrder = NullOrder.NULLS_LAST
    ) -> UpdateSortOrder:
        """Add a sort field with ascending order."""
        return self._add_sort_field(
            source_id=self._column_name_to_id(source_column_name),
            transform=transform,
            direction=SortDirection.ASC,
            null_order=null_order,
        )

    def desc(
        self, source_column_name: str, transform: Transform[Any, Any], null_order: NullOrder = NullOrder.NULLS_LAST
    ) -> UpdateSortOrder:
        """Add a sort field with descending order."""
        return self._add_sort_field(
            source_id=self._column_name_to_id(source_column_name),
            transform=transform,
            direction=SortDirection.DESC,
            null_order=null_order,
        )

    def _apply(self) -> SortOrder:
        """Return the sort order."""
        if next(iter(self._fields), None) is None:
            return UNSORTED_SORT_ORDER
        else:
            return SortOrder(*self._fields, order_id=self._reuse_or_create_sort_order_id())

    def _commit(self) -> UpdatesAndRequirements:
        """Apply the pending changes and commit."""
        new_sort_order = self._apply()
        requirements: Tuple[TableRequirement, ...] = ()
        updates: Tuple[TableUpdate, ...] = ()

        if (
            self._transaction.table_metadata.default_sort_order_id != new_sort_order.order_id
            and self._transaction.table_metadata.sort_order_by_id(new_sort_order.order_id) is None
        ):
            self._last_assigned_order_id = new_sort_order.order_id
            updates = (AddSortOrderUpdate(sort_order=new_sort_order), SetDefaultSortOrderUpdate(sort_order_id=-1))
        else:
            updates = (SetDefaultSortOrderUpdate(sort_order_id=new_sort_order.order_id),)

        required_last_assigned_sort_order_id = self._transaction.table_metadata.default_sort_order_id
        requirements = (AssertDefaultSortOrderId(default_sort_order_id=required_last_assigned_sort_order_id),)

        return updates, requirements

asc(source_column_name, transform, null_order=NullOrder.NULLS_LAST)

Add a sort field with ascending order.

Source code in pyiceberg/table/update/sorting.py
def asc(
    self, source_column_name: str, transform: Transform[Any, Any], null_order: NullOrder = NullOrder.NULLS_LAST
) -> UpdateSortOrder:
    """Add a sort field with ascending order."""
    return self._add_sort_field(
        source_id=self._column_name_to_id(source_column_name),
        transform=transform,
        direction=SortDirection.ASC,
        null_order=null_order,
    )

desc(source_column_name, transform, null_order=NullOrder.NULLS_LAST)

Add a sort field with descending order.

Source code in pyiceberg/table/update/sorting.py
def desc(
    self, source_column_name: str, transform: Transform[Any, Any], null_order: NullOrder = NullOrder.NULLS_LAST
) -> UpdateSortOrder:
    """Add a sort field with descending order."""
    return self._add_sort_field(
        source_id=self._column_name_to_id(source_column_name),
        transform=transform,
        direction=SortDirection.DESC,
        null_order=null_order,
    )