Return a table with rows that need to be updated in the target table based on the join columns.
The table is joined on the identifier columns, and then checked if there are any updated rows.
Those are selected and everything is renamed correctly.
Source code in pyiceberg/table/upsert_util.py
| def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols: list[str]) -> pa.Table:
"""
Return a table with rows that need to be updated in the target table based on the join columns.
The table is joined on the identifier columns, and then checked if there are any updated rows.
Those are selected and everything is renamed correctly.
"""
all_columns = set(source_table.column_names)
join_cols_set = set(join_cols)
non_key_cols = all_columns - join_cols_set
if has_duplicate_rows(target_table, join_cols):
raise ValueError("Target table has duplicate rows, aborting upsert")
if len(target_table) == 0:
# When the target table is empty, there is nothing to update :)
return source_table.schema.empty_table()
diff_expr = functools.reduce(operator.or_, [pc.field(f"{col}-lhs") != pc.field(f"{col}-rhs") for col in non_key_cols])
return (
source_table
# We already know that the schema is compatible, this is to fix large_ types
.cast(target_table.schema)
.join(target_table, keys=list(join_cols_set), join_type="inner", left_suffix="-lhs", right_suffix="-rhs")
.filter(diff_expr)
.drop_columns([f"{col}-rhs" for col in non_key_cols])
.rename_columns({f"{col}-lhs" if col not in join_cols else col: col for col in source_table.column_names})
# Finally cast to the original schema since it doesn't carry nullability:
# https://github.com/apache/arrow/issues/45557
).cast(target_table.schema)
|