Skip to main content

dbt patterns and best practices

This guide covers advanced patterns and best practices for integrating dbt with Dagster, helping you build more maintainable data pipelines.

Preventing concurrent dbt snapshots

dbt snapshots track changes to data over time by comparing current data to previous snapshots. Running snapshots concurrently can corrupt these tables, so it's critical to ensure only one snapshot operation runs at a time.

Option 1: Separate snapshots from other models

Create separate dbt component definitions to isolate snapshots from your regular dbt models. First, scaffold two dbt components:

# Create component for regular models
dg scaffold defs dagster_dbt.DbtProjectComponent dbt_models

# Create component for snapshots
dg scaffold defs dagster_dbt.DbtProjectComponent dbt_snapshots

Configure the regular models component to exclude snapshots:

my_project/defs/dbt_models/defs.yaml
type: dagster_dbt.DbtProjectComponent

attributes:
project: '{{ project_root }}/dbt'
exclude: "resource_type:snapshot"

Configure the snapshots component with concurrency control:

my_project/defs/dbt_snapshots/defs.yaml
type: dagster_dbt.DbtProjectComponent

attributes:
project: '{{ project_root }}/dbt'
select: "resource_type:snapshot"

post_processing:
assets:
- target: "*"
attributes:
pool: "dbt-snapshots"

Option 2: Configure concurrency pools

Configure your Dagster instance to create pools with maximum concurrency of 1. Add this configuration to your dagster.yaml (for Dagster Open Source) or deployment settings (for Dagster+):

dagster.yaml
concurrency:
pools:
dbt-snapshots:
limit: 1
granularity: 'op'

Then set the pool limit for the snapshot pool:

# Set pool limit using CLI
dagster instance concurrency set dbt-snapshots 1

Option 3: Manage multiple snapshot groups with Dagster components

For large projects with many snapshots, you can create multiple snapshot groups while still preventing concurrency issues within each group. Create separate Dagster components for different business domains:

# Create component for sales snapshots
dg scaffold defs dagster_dbt.DbtProjectComponent dbt_snapshots_sales

# Create component for inventory snapshots
dg scaffold defs dagster_dbt.DbtProjectComponent dbt_snapshots_inventory

Sales snapshots component:

my_project/defs/dbt_snapshots_sales/defs.yaml
type: dagster_dbt.DbtProjectComponent

attributes:
project: '{{ project_root }}/dbt'
select: "resource_type:snapshot,path:snapshots/sales/*"

post_processing:
assets:
- target: "*"
attributes:
pool: "sales-snapshots"

Inventory snapshots component:

my_project/defs/dbt_snapshots_inventory/defs.yaml
type: dagster_dbt.DbtProjectComponent

attributes:
project: '{{ project_root }}/dbt'
select: "resource_type:snapshot,path:snapshots/inventory/*"

post_processing:
assets:
- target: "*"
attributes:
pool: "inventory-snapshots"

Configure separate pool limits for each domain. This approach allows snapshots from different business domains to run in parallel while preventing concurrent execution within each domain, reducing the risk of corruption while maintaining reasonable performance.

Organizing dbt assets into groups

By default, all dbt assets land in a single dbt group. To split them into meaningful groups, subclass DagsterDbtTranslator and override get_group_name. Common grouping strategies:

  • Model directory (e.g., marts/, staging/, intermediate/) for layer-based pipelines.
  • dbt tags (e.g., finance, marketing) when you already use tags to organize models.
  • meta configuration for explicit per-model overrides defined in dbt source control.
src/<project_name>/defs/dbt_assets.py
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_group_name(self, dbt_resource_props: Mapping[str, Any]) -> Optional[str]:
# Group by model directory.
model_path = dbt_resource_props.get("original_file_path", "")
if "marts/" in model_path:
return "marts"
if "staging/" in model_path:
return "staging"
if "intermediate/" in model_path:
return "intermediate"

# Group by tag.
tags = dbt_resource_props.get("tags", [])
if "finance" in tags:
return "finance"
if "marketing" in tags:
return "marketing"

# Group by meta config.
meta = dbt_resource_props.get("config", {}).get("meta", {})
if "dagster_group" in meta:
return meta["dagster_group"]

return "dbt_models"


@dbt_assets(
manifest=MANIFEST_PATH,
dagster_dbt_translator=CustomDagsterDbtTranslator(),
)
def grouped_dbt_assets(context: dg.AssetExecutionContext, dbt_cli: DbtCliResource):
yield from dbt_cli.cli(["build"], context=context).stream()

Pick one strategy or combine them; any model that doesn't match the rules falls back to the default group returned at the end.

Running tagged dbt tests

dbt tests are not Dagster assets. They are operations that run against assets, so build_dbt_asset_selection with a tag filters the assets that have that tag, not the tests.

To run only the tests with a specific tag (including source tests), invoke the dbt CLI directly through DbtCliResource and use dbt's selection syntax:

src/<project_name>/defs/dbt_tests.py
@dg.op(required_resource_keys={"dbt"})
def run_tagged_tests(context):
"""Run only tests with specific tags using the dbt CLI directly."""
dbt = context.resources.dbt
return dbt.cli(["test", "--select", "tag:hourly_tests"], context=context)


@dg.job(resource_defs={"dbt": DbtCliResource(project_dir="path/to/dbt")})
def hourly_tests_job():
run_tagged_tests()


hourly_test_schedule = dg.ScheduleDefinition(
name="hourly_tests_schedule",
cron_schedule="0 * * * *",
job=hourly_tests_job,
execution_timezone="UTC",
)

The asset-based approach using build_dbt_asset_selection with tags is the right tool for selecting models, but it cannot select tests on sources or untagged tests on tagged models. Use the CLI pattern above when test selection is what you need.

Running specific dbt models from the UI

To materialize a specific subset of dbt models on demand, navigate to your code location's Assets tab and use the asset selection input. The input accepts dbt selection syntax similar to dbt -s:

  • Model names directly: model1 model2
  • dbt selection syntax: tag:staging, +model, model+
  • Filter by group, tag, or asset key

This is the typical path for ad-hoc runs (recovering from outages, materializing a small set of new models) without writing a custom job.

Macro changes are not detected by code_version_changed

AutomationCondition.code_version_changed() does not detect changes to dbt macros. Code versions for dbt assets are derived from each model's raw_code or raw_sql in the manifest.json. Macros and ephemeral models are not imported into the Dagster asset graph, so their content is not part of the code version calculation.

If you change a macro that downstream models depend on, the consuming models' code versions do not change, and the automation condition will not mark them stale. Until dagster#22566 is resolved, treat macro changes as a manual trigger:

  • Identify which models depend on the changed macros.
  • Manually materialize those assets after deploying the macro change.
  • For workflows where this happens often, consider a custom automation condition that watches macro file modifications, or fall back to a time-based trigger.

Recovering from snapshot SQL compilation errors after package updates

dbt snapshots can start failing across all environments simultaneously after a dbt deps update if a transitive package changes a macro that snapshot-related models compile against. Symptoms include syntax errors at column positions that look unrelated to your code (for example, syntax error line 15 at position 21 unexpected ')' from Snowflake) and local environments working until packages are reinstalled.

Reset the dbt environment to clear cached dependencies:

dbt clean && dbt deps

Then re-run the snapshots. To prevent recurrence, pin package versions in packages.yml rather than letting them float, and test snapshots in a development environment after any package upgrade before promoting to production.

Recovering from infrastructure interruptions

To gracefully recover from infrastructure interruptions, such as a Kubernetes node eviction or a pod termination, use the FROM_ASSET_FAILURE run retry strategy with a dagster/retry_on_asset_or_op_failure setting value of false to use persisted asset materialization records from the event log and automatically exclude already-materialized assets during retry. This enables recovering without requiring persisted dbt artifacts. See Configuring run retries.

src/my_project/jobs.py
asset_job = dg.define_asset_job(
name="asset_job",
selection=dg.AssetSelection.assets(asset_one, asset_two, asset_three),
tags={
"dagster/max_retries": "3",
"dagster/retry_strategy": "FROM_ASSET_FAILURE",
"dagster/retry_on_asset_or_op_failure": "false",
},
)