Skip to content

Python API

PyIceberg is based around catalogs to load tables. First step is to instantiate a catalog that loads tables. Let's use the following configuration to define a catalog called prod:

    uri: http://rest-catalog/ws/
    credential: t-1234:secret

This information must be placed inside a file called .pyiceberg.yaml located either in the $HOME or %USERPROFILE% directory (depending on whether the operating system is Unix-based or Windows-based, respectively) or in the $PYICEBERG_HOME directory (if the corresponding environment variable is set).

For more details on possible configurations refer to the specific page.

Then load the prod catalog:

from pyiceberg.catalog import load_catalog

catalog = load_catalog(
        "uri": "",
        "s3.endpoint": "",
        "py-io-impl": "",
        "s3.access-key-id": "admin",
        "s3.secret-access-key": "password",

If the catalog has not been initialized before, you need to run:


Let's create a namespace:


And then list them:

ns = catalog.list_namespaces()

assert ns == [("docs_example",)]

And then list tables in the namespace:


Create a table

To create a table from a catalog:

from pyiceberg.schema import Schema
from pyiceberg.types import (

schema = Schema(
    NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True),
    NestedField(field_id=2, name="symbol", field_type=StringType(), required=True),
    NestedField(field_id=3, name="bid", field_type=FloatType(), required=False),
    NestedField(field_id=4, name="ask", field_type=DoubleType(), required=False),
                field_id=4, name="created_by", field_type=StringType(), required=False

from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransform

partition_spec = PartitionSpec(
        source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day"

from pyiceberg.table.sorting import SortOrder, SortField
from pyiceberg.transforms import IdentityTransform

# Sort on the symbol
sort_order = SortOrder(SortField(source_id=2, transform=IdentityTransform()))


Load a table

Catalog table

Loading the bids table:

table = catalog.load_table("docs_example.bids")
# Equivalent to:
table = catalog.load_table(("docs_example", "bids"))
# The tuple syntax can be used if the namespace or table contains a dot.

This returns a Table that represents an Iceberg table that can be queried and altered.

Static table

To load a table directly from a metadata file (i.e., without using a catalog), you can use a StaticTable as follows:

from pyiceberg.table import StaticTable

static_table = StaticTable.from_metadata(

The static-table is considered read-only.

Schema evolution

PyIceberg supports full schema evolution through the Python API. It takes care of setting the field-IDs and makes sure that only non-breaking changes are done (can be overriden).

In the examples below, the .update_schema() is called from the table itself.

with table.update_schema() as update:
    update.add_column("some_field", IntegerType(), "doc")

You can also initiate a transaction if you want to make more changes than just evolving the schema:

with table.transaction() as transaction:
    with transaction.update_schema() as update_schema:
        update.add_column("some_other_field", IntegerType(), "doc")
    # ... Update properties etc

Add column

Using add_column you can add a column, without having to worry about the field-id:

with table.update_schema() as update:
    update.add_column("retries", IntegerType(), "Number of retries to place the bid")
    # In a struct
    update.add_column("details.confirmed_by", StringType(), "Name of the exchange")

Rename column

Renaming a field in an Iceberg table is simple:

with table.update_schema() as update:
    update.rename_column("retries", "num_retries")
    # This will rename `confirmed_by` to `exchange`
    update.rename_column("properties.confirmed_by", "exchange")

Move column

Move a field inside of struct:

with table.update_schema() as update:
    update.move_after("bid", "ask")
    # This will move `confirmed_by` before `exchange`
    update.move_before("details.created_by", "")

Update column

Update a fields' type, description or required.

with table.update_schema() as update:
    # Promote a float to a double
    update.update_column("bid", field_type=DoubleType())
    # Make a field optional
    update.update_column("symbol", required=False)
    # Update the documentation
    update.update_column("symbol", doc="Name of the share on the exchange")

Be careful, some operations are not compatible, but can still be done at your own risk by setting allow_incompatible_changes:

with table.update_schema(allow_incompatible_changes=True) as update:
    # Incompatible change, cannot require an optional field
    update.update_column("symbol", required=True)

Delete column

Delete a field, careful this is a incompatible change (readers/writers might expect this field):

with table.update_schema(allow_incompatible_changes=True) as update:

Table properties

Set and remove properties through the Transaction API:

with table.transaction() as transaction:

assert == {"abc": "def"}

with table.transaction() as transaction:

assert == {}

Or, without context manager:

table = table.transaction().set_properties(abc="def").commit_transaction()

assert == {"abc": "def"}

table = table.transaction().remove_properties("abc").commit_transaction()

assert == {}

Query the data

To query a table, a table scan is needed. A table scan accepts a filter, columns, optionally a limit and a snapshot ID:

from pyiceberg.catalog import load_catalog
from pyiceberg.expressions import GreaterThanOrEqual

catalog = load_catalog("default")
table = catalog.load_table("nyc.taxis")

scan = table.scan(
    row_filter=GreaterThanOrEqual("trip_distance", 10.0),
    selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),

# Or filter using a string predicate
scan = table.scan(
    row_filter="trip_distance > 10.0",

[task.file.file_path for task in scan.plan_files()]

The low level API plan_files methods returns a set of tasks that provide the files that might contain matching rows:


In this case it is up to the engine itself to filter the file itself. Below, to_arrow() and to_duckdb() that already do this for you.

Apache Arrow


This requires pyarrow to be installed.

Using PyIceberg it is filter out data from a huge table and pull it into a PyArrow table:

    row_filter=GreaterThanOrEqual("trip_distance", 10.0),
    selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),

This will return a PyArrow table:

VendorID: int64
tpep_pickup_datetime: timestamp[us, tz=+00:00]
tpep_dropoff_datetime: timestamp[us, tz=+00:00]
VendorID: [[2,1,2,1,1,...,2,2,2,2,2],[2,1,1,1,2,...,1,1,2,1,2],...,[2,2,2,2,2,...,2,6,6,2,2],[2,2,2,2,2,...,2,2,2,2,2]]
tpep_pickup_datetime: [[2021-04-01 00:28:05.000000,...,2021-04-30 23:44:25.000000]]
tpep_dropoff_datetime: [[2021-04-01 00:47:59.000000,...,2021-05-01 00:14:47.000000]]

This will only pull in the files that that might contain matching rows.



This requires pandas to be installed.

PyIceberg makes it easy to filter out data from a huge table and pull it into a Pandas dataframe locally. This will only fetch the relevant Parquet files for the query and apply the filter. This will reduce IO and therefore improve performance and reduce cost.

    row_filter="trip_distance >= 10.0",
    selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),

This will return a Pandas dataframe:

        VendorID      tpep_pickup_datetime     tpep_dropoff_datetime
0              2 2021-04-01 00:28:05+00:00 2021-04-01 00:47:59+00:00
1              1 2021-04-01 00:39:01+00:00 2021-04-01 00:57:39+00:00
2              2 2021-04-01 00:14:42+00:00 2021-04-01 00:42:59+00:00
3              1 2021-04-01 00:17:17+00:00 2021-04-01 00:43:38+00:00
4              1 2021-04-01 00:24:04+00:00 2021-04-01 00:56:20+00:00
...          ...                       ...                       ...
116976         2 2021-04-30 23:56:18+00:00 2021-05-01 00:29:13+00:00
116977         2 2021-04-30 23:07:41+00:00 2021-04-30 23:37:18+00:00
116978         2 2021-04-30 23:38:28+00:00 2021-05-01 00:12:04+00:00
116979         2 2021-04-30 23:33:00+00:00 2021-04-30 23:59:00+00:00
116980         2 2021-04-30 23:44:25+00:00 2021-05-01 00:14:47+00:00

[116981 rows x 3 columns]

It is recommended to use Pandas 2 or later, because it stores the data in an Apache Arrow backend which avoids copies of data.



This requires DuckDB to be installed.

A table scan can also be converted into a in-memory DuckDB table:

con = table.scan(
    row_filter=GreaterThanOrEqual("trip_distance", 10.0),
    selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),

Using the cursor that we can run queries on the DuckDB table:

        "SELECT tpep_dropoff_datetime - tpep_pickup_datetime AS duration FROM distant_taxi_trips LIMIT 4"



This requires Ray to be installed.

A table scan can also be converted into a Ray dataset:

ray_dataset = table.scan(
    row_filter=GreaterThanOrEqual("trip_distance", 10.0),
    selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),

This will return a Ray dataset:

        VendorID: int64,
        tpep_pickup_datetime: timestamp[us, tz=UTC],
        tpep_dropoff_datetime: timestamp[us, tz=UTC]

Using Ray Dataset API to interact with the dataset:

        "VendorID": 2,
        "tpep_pickup_datetime": datetime.datetime(2008, 12, 31, 23, 23, 50),
        "tpep_dropoff_datetime": datetime.datetime(2009, 1, 1, 0, 34, 31),
        "VendorID": 2,
        "tpep_pickup_datetime": datetime.datetime(2008, 12, 31, 23, 5, 3),
        "tpep_dropoff_datetime": datetime.datetime(2009, 1, 1, 16, 10, 18),