Customizing existing component types
dg
and Dagster Components are under active development. You may encounter feature gaps, and the APIs may change. To report issues or give feedback, please join the #dg-components channel in the Dagster Community Slack.
You can customize the behavior of a component beyond what is available in the component.yaml
file by creating a subclass of the component type.
There are two ways you can customize a component:
- For one-off customizations, you can create a local component type, defined in a Python file in the same directory as your
component.yaml
file. Customarily, this local component type is defined in a file namedcomponent.py
in the component directory. - For customizations which may be reused across multiple components, you can create a global component type, defined in a Python file in the
lib
directory. This requires that your project is adg
plugin (projects scaffolded using thedg
CLI are automatically plugins).
Creating a customized component type
We'll use the SlingReplicationCollectionComponent
as an example. First, we'll scaffold a project with the dg
CLI:
dg scaffold project my-project \
&& cd my-project/src \
&& uv add dagster-sling \
&& dg scaffold dagster_sling.SlingReplicationCollectionComponent my_sling_sync
tree my_project/defs
my_project/defs
├── __init__.py
└── my_sling_sync
├── component.yaml
└── replication.yaml
2 directories, 3 files
- Local component type
- Global component type
To define a local component type, you can create a subclass of your desired component in a file named component.py
in the same directory as your component.yaml
file:
from dagster_sling import SlingReplicationCollectionComponent
class CustomSlingReplicationComponent(SlingReplicationCollectionComponent):
"""Customized Sling component."""
Then, we update the type
field in our component.yaml
file to reference this new component type. It should be the fully qualified name of the type:
type: my_project.defs.my_sling_sync.component.CustomSlingReplicationComponent
attributes:
replications:
- path: replication.yaml
tree my_project
my_project
├── __init__.py
├── definitions.py
├── defs
│ ├── __init__.py
│ └── my_sling_sync
│ ├── component.py
│ ├── component.yaml
│ └── replication.yaml
└── lib
└── __init__.py
4 directories, 7 files
To define a global component type, you can use the dg
CLI to scaffold a new component type:
dg scaffold component-type CustomSlingReplicationComponent
Creating a Dagster component type at /.../my-project/src/my_project/lib/custom_sling_replication_component.py.
Scaffolded files for Dagster component type at /.../my-project/src/my_project/lib/custom_sling_replication_component.py.
tree my_project
my_project
├── __init__.py
├── definitions.py
├── defs
│ ├── __init__.py
│ └── my_sling_sync
│ ├── component.yaml
│ └── replication.yaml
└── lib
├── __init__.py
└── custom_sling_replication_component.py
4 directories, 7 files
We can modify the generated component type by editing the component.py
file in the lib
directory:
from dagster_sling import SlingReplicationCollectionComponent
class CustomSlingReplicationComponent(SlingReplicationCollectionComponent):
"""Customized Sling component."""
Finally, we update the type
field in our component.yaml
file to reference this new component type.
type: my_project.lib.CustomSlingReplicationComponent
attributes:
replications:
- path: replication.yaml
Once we have created our component type subclass, we can customize its behavior by overriding methods from the parent class.
Customizing execution
For components which define executable assets, it is customary for the component type to implement an execute
method, which can be overridden to customize execution behavior.
For example, we can modify our custom subclass of SlingReplicationCollectionComponent
to add a debug log message during execution:
from collections.abc import Iterator
from dagster_sling import (
SlingReplicationCollectionComponent,
SlingReplicationSpecModel,
SlingResource,
)
import dagster as dg
class CustomSlingReplicationComponent(SlingReplicationCollectionComponent):
def execute(
self,
context: dg.AssetExecutionContext,
sling: SlingResource,
replication_spec_model: SlingReplicationSpecModel,
) -> Iterator:
context.log.info("*******************CUSTOM*************************")
return sling.replicate(context=context, debug=True)
Adding component-level templating scope
By default, the Jinja scopes available for use in a component's YAML file are:
env
: A function that allows you to access environment variables.automation_condition
: A scope allowing you to access all static constructors of theAutomationCondition
class.
It can be useful to add additional scope options to your component type. For example, you may have a custom automation condition that you'd like to use in your component.
To do so, you can define a function that returns an AutomationCondition
and define a get_additional_scope
method on your subclass:
from collections.abc import Mapping
from typing import Any
from dagster_sling import SlingReplicationCollectionComponent
import dagster as dg
class CustomSlingReplicationComponent(SlingReplicationCollectionComponent):
@classmethod
def get_additional_scope(cls) -> Mapping[str, Any]:
def _custom_cron(cron_schedule: str) -> dg.AutomationCondition:
return (
dg.AutomationCondition.on_cron(cron_schedule)
& ~dg.AutomationCondition.in_progress()
)
return {"custom_cron": _custom_cron}
This can then be used in your component.yaml
file:
- Local component type
- Global component type
type: my_project.defs.my_sling_sync.component.CustomSlingReplicationComponent
attributes:
replications:
- path: replication.yaml
asset_post_processors:
- attributes:
automation_condition: "{{ custom_cron('@daily') }}"
type: my_project.lib.CustomSlingReplicationComponent
attributes:
replications:
- path: replication.yaml
asset_post_processors:
- attributes:
automation_condition: "{{ custom_cron('@daily') }}"