Iceberg (dagster-iceberg)
This library provides an integration with the Iceberg table format.
For more information on getting started, see the Dagster & Iceberg documentation.
I/O Managers
- dagster_iceberg.io_manager.arrow.PyArrowIcebergIOManager IOManagerDefinition [source]
- preview
This API is currently in preview, and may have breaking changes in patch version releases. This API is not considered ready for production use.
An I/O manager definition that reads inputs from and writes outputs to Iceberg tables using PyArrow.
Examples:
import pandas as pd
import pyarrow as pa
from dagster import Definitions, asset
from dagster_iceberg.config import IcebergCatalogConfig
from dagster_iceberg.io_manager.arrow import PyArrowIcebergIOManager
CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
CATALOG_WAREHOUSE = (
"file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
)
resources = {
"io_manager": PyArrowIcebergIOManager(
name="test",
config=IcebergCatalogConfig(
properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
),
namespace="dagster",
)
}
@asset
def iris_dataset() -> pa.Table:
pa.Table.from_pandas(
pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
)
defs = Definitions(assets=[iris_dataset], resources=resources)If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the I/O manager. For assets, the schema will be determined from the asset key, as in the above example. For ops, the schema can be specified by including a “schema” entry in output metadata. If none of these is provided, the schema will default to “public”. The I/O manager will check if the namespace exists in the Iceberg catalog. It does not automatically create the namespace if it does not exist.
@op(
out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pa.Table:
...To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the
In
orAssetIn
.@asset(
ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pa.Table):
# my_table will just contain the data from column "a"
...
- dagster_iceberg.io_manager.daft.DaftIcebergIOManager IOManagerDefinition [source]
- preview
This API is currently in preview, and may have breaking changes in patch version releases. This API is not considered ready for production use.
An I/O manager definition that reads inputs from and writes outputs to Iceberg tables using Daft.
Examples:
import daft as da
import pandas as pd
from dagster import Definitions, asset
from dagster_iceberg.config import IcebergCatalogConfig
from dagster_iceberg.io_manager.daft import DaftIcebergIOManager
CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
CATALOG_WAREHOUSE = (
"file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
)
resources = {
"io_manager": DaftIcebergIOManager(
name="test",
config=IcebergCatalogConfig(
properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
),
namespace="dagster",
)
}
@asset
def iris_dataset() -> da.DataFrame:
return da.from_pandas(
pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
)
defs = Definitions(assets=[iris_dataset], resources=resources)If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the I/O manager. For assets, the schema will be determined from the asset key, as in the above example. For ops, the schema can be specified by including a “schema” entry in output metadata. If none of these is provided, the schema will default to “public”. The I/O manager will check if the namespace exists in the Iceberg catalog. It does not automatically create the namespace if it does not exist.
@op(
out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> da.DataFrame:
...To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the
In
orAssetIn
.@asset(
ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: da.DataFrame):
# my_table will just contain the data from column "a"
...
- dagster_iceberg.io_manager.pandas.PandasIcebergIOManager IOManagerDefinition [source]
- preview
This API is currently in preview, and may have breaking changes in patch version releases. This API is not considered ready for production use.
An I/O manager definition that reads inputs from and writes outputs to Iceberg tables using pandas.
Examples:
import pandas as pd
from dagster import Definitions, asset
from dagster_iceberg.config import IcebergCatalogConfig
from dagster_iceberg.io_manager.pandas import PandasIcebergIOManager
CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
CATALOG_WAREHOUSE = (
"file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
)
resources = {
"io_manager": PandasIcebergIOManager(
name="test",
config=IcebergCatalogConfig(
properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
),
namespace="dagster",
)
}
@asset
def iris_dataset() -> pd.DataFrame:
return pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
defs = Definitions(assets=[iris_dataset], resources=resources)If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the I/O manager. For assets, the schema will be determined from the asset key, as in the above example. For ops, the schema can be specified by including a “schema” entry in output metadata. If none of these is provided, the schema will default to “public”. The I/O manager will check if the namespace exists in the Iceberg catalog. It does not automatically create the namespace if it does not exist.
@op(
out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
...To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the
In
orAssetIn
.@asset(
ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame):
# my_table will just contain the data from column "a"
...
- dagster_iceberg.io_manager.polars.PolarsIcebergIOManager IOManagerDefinition [source]
- preview
This API is currently in preview, and may have breaking changes in patch version releases. This API is not considered ready for production use.
An I/O manager definition that reads inputs from and writes outputs to Iceberg tables using Polars.
Examples:
import polars as pl
from dagster import Definitions, asset
from dagster_iceberg.config import IcebergCatalogConfig
from dagster_iceberg.io_manager.polars import PolarsIcebergIOManager
CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
CATALOG_WAREHOUSE = (
"file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
)
resources = {
"io_manager": PolarsIcebergIOManager(
name="test",
config=IcebergCatalogConfig(
properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
),
namespace="dagster",
)
}
@asset
def iris_dataset() -> pl.DataFrame:
return pl.read_csv(
"https://docs.dagster.io/assets/iris.csv",
has_header=False,
new_columns=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
defs = Definitions(assets=[iris_dataset], resources=resources)If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the I/O manager. For assets, the schema will be determined from the asset key, as in the above example. For ops, the schema can be specified by including a “schema” entry in output metadata. If none of these is provided, the schema will default to “public”. The I/O manager will check if the namespace exists in the Iceberg catalog. It does not automatically create the namespace if it does not exist.
@op(
out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pl.DataFrame:
...To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the
In
orAssetIn
.@asset(
ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pl.DataFrame):
# my_table will just contain the data from column "a"
...
- dagster_iceberg.io_manager.spark.SparkIcebergIOManager IOManagerDefinition [source]
- preview
This API is currently in preview, and may have breaking changes in patch version releases. This API is not considered ready for production use.
An I/O manager definition that reads inputs from and writes outputs to Iceberg tables using PySpark.
This I/O manager is only designed to work with Spark Connect.
Example:
from dagster import Definitions, asset
from dagster_iceberg.io_manager.spark import SparkIcebergIOManager
from pyspark.sql import SparkSession
from pyspark.sql.connect.dataframe import DataFrame
resources = {
"io_manager": SparkIcebergIOManager(
catalog_name="test",
namespace="dagster",
remote_url="spark://localhost",
)
}
@asset
def iris_dataset() -> DataFrame:
spark = SparkSession.builder.remote("sc://localhost").getOrCreate()
return spark.read.csv(
"https://docs.dagster.io/assets/iris.csv",
schema=(
"sepal_length_cm FLOAT, "
"sepal_width_cm FLOAT, "
"petal_length_cm FLOAT, "
"petal_width_cm FLOAT, "
"species STRING"
),
)
defs = Definitions(assets=[iris_dataset], resources=resources)
Resources
- dagster_iceberg.resource.IcebergTableResource ResourceDefinition [source]
- preview
This API is currently in preview, and may have breaking changes in patch version releases. This API is not considered ready for production use.
Resource for interacting with a PyIceberg table.
Example:
from dagster import Definitions, asset
from dagster_iceberg import IcebergTableResource
@asset
def my_table(iceberg_table: IcebergTableResource):
df = iceberg_table.load().to_pandas()
warehouse_path = "/path/to/warehouse"
defs = Definitions(
assets=[my_table],
resources={
"iceberg_table": IcebergTableResource(
name="my_catalog",
config=IcebergCatalogConfig(
properties={
"uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
"warehouse": f"file://{warehouse_path}",
}
),
table="my_table",
namespace="my_namespace",
)
},
)
Config
class
dagster_iceberg.config.IcebergCatalogConfig [source]- preview
This API is currently in preview, and may have breaking changes in patch version releases. This API is not considered ready for production use.
Configuration for Iceberg Catalogs.
See the Catalogs section for configuration options.
You can configure the Iceberg IO manager:
- Using a
.pyiceberg.yaml
configuration file. - Through environment variables.
- Using the
IcebergCatalogConfig
configuration object.
For more information about the first two configuration options, see Setting Configuration Values.
Example:
from dagster_iceberg.config import IcebergCatalogConfig
from dagster_iceberg.io_manager.arrow import PyArrowIcebergIOManager
warehouse_path = "/path/to/warehouse"
io_manager = PyArrowIcebergIOManager(
name="my_catalog",
config=IcebergCatalogConfig(
properties={
"uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
"warehouse": f"file://{warehouse_path}",
}
),
namespace="my_namespace",
) - Using a
Base Classes
class
dagster_iceberg.io_manager.base.IcebergIOManager [source]- preview
This API is currently in preview, and may have breaking changes in patch version releases. This API is not considered ready for production use.
Base class for an I/O manager definition that reads inputs from and writes outputs to Iceberg tables.
Examples:
import pandas as pd
import pyarrow as pa
from dagster import Definitions, asset
from dagster_iceberg.config import IcebergCatalogConfig
from dagster_iceberg.io_manager.arrow import PyArrowIcebergIOManager
CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
CATALOG_WAREHOUSE = (
"file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
)
resources = {
"io_manager": PyArrowIcebergIOManager(
name="test",
config=IcebergCatalogConfig(
properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
),
namespace="dagster",
)
}
@asset
def iris_dataset() -> pa.Table:
pa.Table.from_pandas(
pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
)
defs = Definitions(assets=[iris_dataset], resources=resources)If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the I/O manager. For assets, the schema will be determined from the asset key, as in the above example. For ops, the schema can be specified by including a “schema” entry in output metadata. If none of these is provided, the schema will default to “public”. The I/O manager will check if the namespace exists in the Iceberg catalog. It does not automatically create the namespace if it does not exist.
@op(
out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pa.Table:
...To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the
In
orAssetIn
.@asset(
ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pa.Table):
# my_table will just contain the data from column "a"
...
class
dagster_iceberg.handler.IcebergBaseTypeHandler [source]- preview
This API is currently in preview, and may have breaking changes in patch version releases. This API is not considered ready for production use.
Base class for a type handler that reads inputs from and writes outputs to Iceberg tables.