Skip to content

output

ConsoleOutput

Bases: Output

Writes to the console.

Source code in pyiceberg/cli/output.py
class ConsoleOutput(Output):
    """Writes to the console."""

    verbose: bool

    def __init__(self, **properties: Any) -> None:
        self.verbose = properties.get("verbose", False)

    @property
    def _table(self) -> RichTable:
        return RichTable.grid(padding=(0, 2))

    def exception(self, ex: Exception) -> None:
        if self.verbose:
            Console(stderr=True).print_exception()
        else:
            Console(stderr=True).print(ex)

    def identifiers(self, identifiers: List[Identifier]) -> None:
        table = self._table
        for identifier in identifiers:
            table.add_row(".".join(identifier))

        Console().print(table)

    def describe_table(self, table: Table) -> None:
        metadata = table.metadata
        table_properties = self._table

        for key, value in metadata.properties.items():
            table_properties.add_row(key, value)

        schema_tree = Tree(f"Schema, id={table.metadata.current_schema_id}")
        for field in table.schema().fields:
            schema_tree.add(str(field))

        snapshot_tree = Tree("Snapshots")
        for snapshot in metadata.snapshots:
            manifest_list_str = f": {snapshot.manifest_list}" if snapshot.manifest_list else ""
            snapshot_tree.add(f"Snapshot {snapshot.snapshot_id}, schema {snapshot.schema_id}{manifest_list_str}")

        output_table = self._table
        output_table.add_row("Table format version", str(metadata.format_version))
        output_table.add_row("Metadata location", table.metadata_location)
        output_table.add_row("Table UUID", str(table.metadata.table_uuid))
        output_table.add_row("Last Updated", str(metadata.last_updated_ms))
        output_table.add_row("Partition spec", str(table.spec()))
        output_table.add_row("Sort order", str(table.sort_order()))
        output_table.add_row("Current schema", schema_tree)
        output_table.add_row("Current snapshot", str(table.current_snapshot()))
        output_table.add_row("Snapshots", snapshot_tree)
        output_table.add_row("Properties", table_properties)
        Console().print(output_table)

    def files(self, table: Table, history: bool) -> None:
        if history:
            snapshots = table.metadata.snapshots
        else:
            if snapshot := table.current_snapshot():
                snapshots = [snapshot]
            else:
                snapshots = []

        snapshot_tree = Tree(f"Snapshots: {'.'.join(table.identifier)}")
        io = table.io

        for snapshot in snapshots:
            manifest_list_str = f": {snapshot.manifest_list}" if snapshot.manifest_list else ""
            list_tree = snapshot_tree.add(f"Snapshot {snapshot.snapshot_id}, schema {snapshot.schema_id}{manifest_list_str}")

            manifest_list = snapshot.manifests(io)
            for manifest in manifest_list:
                manifest_tree = list_tree.add(f"Manifest: {manifest.manifest_path}")
                for manifest_entry in manifest.fetch_manifest_entry(io, discard_deleted=False):
                    manifest_tree.add(f"Datafile: {manifest_entry.data_file.file_path}")
        Console().print(snapshot_tree)

    def describe_properties(self, properties: Properties) -> None:
        output_table = self._table
        for k, v in properties.items():
            output_table.add_row(k, v)
        Console().print(output_table)

    def text(self, response: str) -> None:
        Console().print(response)

    def schema(self, schema: Schema) -> None:
        output_table = self._table
        for field in schema.fields:
            output_table.add_row(field.name, str(field.field_type), field.doc or "")
        Console().print(output_table)

    def spec(self, spec: PartitionSpec) -> None:
        Console().print(str(spec))

    def uuid(self, uuid: Optional[UUID]) -> None:
        Console().print(str(uuid) if uuid else "missing")

    def version(self, version: str) -> None:
        Console().print(version)

    def describe_refs(self, ref_details: List[Tuple[str, SnapshotRefType, Dict[str, str]]]) -> None:
        refs_table = RichTable(title="Snapshot Refs")
        refs_table.add_column("Ref")
        refs_table.add_column("Type")
        refs_table.add_column("Max ref age ms")
        refs_table.add_column("Min snapshots to keep")
        refs_table.add_column("Max snapshot age ms")
        for name, type, ref_detail in ref_details:
            refs_table.add_row(
                name, type, ref_detail["max_ref_age_ms"], ref_detail["min_snapshots_to_keep"], ref_detail["max_snapshot_age_ms"]
            )
        Console().print(refs_table)

JsonOutput

Bases: Output

Writes json to stdout.

Source code in pyiceberg/cli/output.py
class JsonOutput(Output):
    """Writes json to stdout."""

    verbose: bool

    def __init__(self, **properties: Any) -> None:
        self.verbose = properties.get("verbose", False)

    def _out(self, d: Any) -> None:
        print(json.dumps(d))

    def exception(self, ex: Exception) -> None:
        self._out({"type": ex.__class__.__name__, "message": str(ex)})

    def identifiers(self, identifiers: List[Identifier]) -> None:
        self._out([".".join(identifier) for identifier in identifiers])

    def describe_table(self, table: Table) -> None:
        class FauxTable(IcebergBaseModel):
            """Just to encode it using Pydantic."""

            identifier: Identifier
            metadata_location: str
            metadata: TableMetadata

        print(
            FauxTable(
                identifier=table.identifier, metadata=table.metadata, metadata_location=table.metadata_location
            ).model_dump_json()
        )

    def describe_properties(self, properties: Properties) -> None:
        self._out(properties)

    def text(self, response: str) -> None:
        print(json.dumps(response))

    def schema(self, schema: Schema) -> None:
        print(schema.model_dump_json())

    def files(self, table: Table, history: bool) -> None:
        pass

    def spec(self, spec: PartitionSpec) -> None:
        print(spec.model_dump_json())

    def uuid(self, uuid: Optional[UUID]) -> None:
        self._out({"uuid": str(uuid) if uuid else "missing"})

    def version(self, version: str) -> None:
        self._out({"version": version})

    def describe_refs(self, refs: List[Tuple[str, SnapshotRefType, Dict[str, str]]]) -> None:
        self._out([
            {"name": name, "type": type, detail_key: detail_val}
            for name, type, detail in refs
            for detail_key, detail_val in detail.items()
        ])

Output

Bases: ABC

Output interface for exporting.

Source code in pyiceberg/cli/output.py
class Output(ABC):
    """Output interface for exporting."""

    @abstractmethod
    def exception(self, ex: Exception) -> None: ...

    @abstractmethod
    def identifiers(self, identifiers: List[Identifier]) -> None: ...

    @abstractmethod
    def describe_table(self, table: Table) -> None: ...

    @abstractmethod
    def files(self, table: Table, history: bool) -> None: ...

    @abstractmethod
    def describe_properties(self, properties: Properties) -> None: ...

    @abstractmethod
    def text(self, response: str) -> None: ...

    @abstractmethod
    def schema(self, schema: Schema) -> None: ...

    @abstractmethod
    def spec(self, spec: PartitionSpec) -> None: ...

    @abstractmethod
    def uuid(self, uuid: Optional[UUID]) -> None: ...

    @abstractmethod
    def version(self, version: str) -> None: ...

    @abstractmethod
    def describe_refs(self, refs: List[Tuple[str, SnapshotRefType, Dict[str, str]]]) -> None: ...