Dagster Pipes subprocess reference
This reference shows usage of Dagster Pipes with other entities in the Dagster system. For a step-by-step walkthrough, refer to the Dagster Pipes tutorial.
Specifying environment variables and extras
When launching the subprocess, you may want to make environment variables or additional parameters available in the external process. Extras are arbitrary, user-defined parameters made available on the context object in the external process.
- External code in external_code.py
- Dagster code in dagster_code.py
In the external code, you can access extras via the PipesContext
object:
import os
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes
def main():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
total_orders = len(orders_df)
# get the Dagster Pipes context
context = PipesContext.get()
# get all extras provided by Dagster asset
print(context.extras)
# get the value of an extra
print(context.get_extra("foo"))
# get env var
print(os.environ["MY_ENV_VAR_IN_SUBPROCESS"])
The run
method to the PipesSubprocessClient
resource also accepts env
and extras
, which allow you to specify environment variables and extra arguments when executing the subprocess:
Note: We're using os.environ
in this example, but Dagster's recommendation is to use EnvVar
in production.
import shutil
import dagster as dg
@dg.asset
def subprocess_asset(
context: dg.AssetExecutionContext, pipes_subprocess_client: dg.PipesSubprocessClient
) -> dg.MaterializeResult:
cmd = [shutil.which("python"), dg.file_relative_path(__file__, "external_code.py")]
return pipes_subprocess_client.run(
command=cmd,
context=context,
extras={"foo": "bar"},
env={
"MY_ENV_VAR_IN_SUBPROCESS": "my_value",
},
).get_materialize_result()
defs = dg.Definitions(
assets=[subprocess_asset],
resources={"pipes_subprocess_client": dg.PipesSubprocessClient()},
)
Working with @asset_check
Sometimes, you may not want to materialize an asset, but instead want to report a data quality check result. When your asset has data quality checks defined in @dg.asset_check
:
- External code in external_code.py
- Dagster code in dagster_code.py
From the external code, you can report to Dagster that an asset check has been performed via PipesContext
. Note that asset_key
in this case is required, and must match the asset key defined in @dg.asset_check
:
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes
def main():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
# get the Dagster Pipes context
context = PipesContext.get()
# send structured metadata back to Dagster
context.report_asset_check(
asset_key="my_asset",
passed=orders_df[["item_id"]].notnull().all().bool(),
check_name="no_empty_order_check",
)
On Dagster's side, the PipesClientCompletedInvocation
object returned from PipesSubprocessClient
includes a get_asset_check_result
method, which you can use to access the AssetCheckResult
event reported by the subprocess.
import shutil
import dagster as dg
@dg.asset
def my_asset(): ...
@dg.asset_check(asset="my_asset")
def no_empty_order_check(
context: dg.AssetCheckExecutionContext,
pipes_subprocess_client: dg.PipesSubprocessClient,
) -> dg.AssetCheckResult:
cmd = [
shutil.which("python"),
dg.file_relative_path(__file__, "external_code.py"),
]
results = pipes_subprocess_client.run(
command=cmd, context=context.op_execution_context
).get_results()
if not results:
return dg.AssetCheckResult(passed=True)
return dg.AssetCheckResult(passed=False)
defs = dg.Definitions(
assets=[my_asset],
asset_checks=[no_empty_order_check],
resources={"pipes_subprocess_client": dg.PipesSubprocessClient()},
)
Working with multi-assets
Sometimes, you may invoke a single call to an API that results in multiple tables being updated, or you may have a single script that computes multiple assets. In these cases, you can use Dagster Pipes to report back on multiple assets at once.
- External code in external_code.py
- Dagster code in dagster_code.py
Note: When working with multi-assets, PipesContext
may only be called once per unique asset key. If called more than once, an error similar to the following will surface:
Calling {method} with asset key {asset_key} is undefined. Asset has already been materialized, so no additional data can be reported for it
Instead, you’ll need to set the asset_key
parameter for each instance of PipesContext
:
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes
def main():
orders_df = pd.DataFrame(
{"order_id": [1, 2, 3], "item_id": [432, 878, 102], "user_id": ["a", "b", "a"]}
)
total_orders = len(orders_df)
total_users = orders_df["user_id"].nunique()
# get the Dagster Pipes context
context = PipesContext.get()
# send structured metadata back to Dagster. asset_key is required when there are multiple assets
context.report_asset_materialization(
asset_key="orders", metadata={"total_orders": total_orders}
)
context.report_asset_materialization(
asset_key="users", metadata={"total_users": total_users}
)
In the Dagster code, you can use @dg.multi_asset
to define a single asset that represents multiple assets. The PipesClientCompletedInvocation
object returned from PipesSubprocessClient
includes a get_results
method, which you can use to access all the events, such as multiple AssetMaterializations
and AssetCheckResults
, reported by the subprocess:
import shutil
import dagster as dg
@dg.multi_asset(specs=[dg.AssetSpec("orders"), dg.AssetSpec("users")])
def subprocess_asset(
context: dg.AssetExecutionContext, pipes_subprocess_client: dg.PipesSubprocessClient
):
cmd = [
shutil.which("python"),
dg.file_relative_path(__file__, "external_code.py"),
]
return pipes_subprocess_client.run(command=cmd, context=context).get_results()
defs = dg.Definitions(
assets=[subprocess_asset],
resources={"pipes_subprocess_client": dg.PipesSubprocessClient()},
)
Passing custom data
Sometimes, you may want to pass data back from the external process for use in the orchestration code for purposes other than reporting directly to Dagster such as use in creating an output. In this example we use custom messages to create an I/O managed output that is returned from the asset.
- External code in external_code.py
- Dagster code in dagster_code.py
In the external code, we send messages using report_custom_message
. The message can be any data that is JSON serializable.
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes
def main():
# get the Dagster Pipes context
context = PipesContext.get()
# compute the full orders data
orders = pd.DataFrame(
{
"order_id": [1, 2, 3],
"item_id": [321, 654, 987],
"order_details": [..., ..., ...], # imagine large data,
# and more columns
}
)
# send a smaller table to be I/O managed by Dagster and passed to downstream assets
summary_table = pd.DataFrame(orders[["order_id", "item_id"]])
context.report_custom_message(summary_table.to_dict())
context.report_asset_materialization(metadata={"total_orders": len(orders)})
In the Dagster code we receive custom messages using get_custom_messages
.
import shutil
import pandas as pd
import dagster as dg
@dg.asset
def subprocess_asset(
context: dg.AssetExecutionContext,
pipes_subprocess_client: dg.PipesSubprocessClient,
) -> dg.Output[pd.DataFrame]:
cmd = [shutil.which("python"), dg.file_relative_path(__file__, "external_code.py")]
result = pipes_subprocess_client.run(
command=cmd,
context=context,
)
# a small summary table gets reported as a custom message
messages = result.get_custom_messages()
if len(messages) != 1:
raise Exception("summary not reported")
summary_df = pd.DataFrame(messages[0])
# grab any reported metadata off of the materialize result
metadata = result.get_materialize_result().metadata
# return the summary table to be loaded by Dagster for downstream assets
return dg.Output(
value=summary_df,
metadata=metadata,
)
defs = dg.Definitions(
assets=[subprocess_asset],
resources={"pipes_subprocess_client": dg.PipesSubprocessClient()},
)