Skip to main content

Data retention for partitioned assets

Dagster does not automatically clean up partition metadata when the underlying data is deleted, and there is no built-in retention policy that coordinates data deletion with partition management. To enforce a retention window, pair a data-deletion sensor with a partition-synchronization sensor against your dynamic partitions.

This pattern keeps the asset view in the Dagster UI accurate without accumulating partition metadata for data that no longer exists.

When to use this pattern

Reach for this pattern when you have:

  • Accumulating partition data that is past its useful retention window.
  • A partition view in Dagster that shows partitions for data that has already been deleted.
  • Misalignment between actual data availability and Dagster's partition view.

Set up dynamic partitions

Define your asset with a DynamicPartitionsDefinition. Dynamic partitions are required because they let you add and remove partition keys at runtime to reflect storage state:

src/<project_name>/defs/assets.py
@dg.asset(
partitions_def=dg.DynamicPartitionsDefinition(name="my_data_partitions"),
)
def my_partitioned_asset(context: dg.AssetExecutionContext):
partition_key = context.partition_key
return process_data_for_partition(partition_key)

Add a retention sensor

The retention sensor enforces the maximum age. It walks all current partition keys, deletes the underlying data for any partition older than your retention window, and removes the partition from Dagster:

src/<project_name>/defs/sensors.py
@dg.sensor(minimum_interval_seconds=86400)  # Daily
def data_retention_sensor(context: dg.SensorEvaluationContext):
"""Delete partition data and keys older than the retention window."""
retention_days = 90
cutoff_date = datetime.now() - timedelta(days=retention_days)

partitions_to_delete = []
for partition_key in context.instance.get_dynamic_partitions("my_data_partitions"):
partition_date = datetime.strptime(partition_key, "%Y-%m-%d")
if partition_date < cutoff_date:
partitions_to_delete.append(partition_key)

for partition_key in partitions_to_delete:
# Your storage-specific deletion (S3, filesystem, database, etc.)
delete_partition_data(partition_key)
context.instance.delete_dynamic_partition("my_data_partitions", partition_key)
context.log.info(f"Deleted partition: {partition_key}")

return dg.SkipReason(
f"Processed {len(partitions_to_delete)} partitions for deletion"
)

Add a synchronization sensor

The sync sensor reconciles Dagster's view with whatever partitions actually exist in storage. It adds keys for newly arrived data and removes keys whose underlying data was deleted out of band:

src/<project_name>/defs/sensors.py
@dg.sensor(minimum_interval_seconds=3600)  # Hourly
def partition_sync_sensor(context: dg.SensorEvaluationContext):
"""Keep Dagster partitions aligned with actual data."""
dagster_partitions = set(
context.instance.get_dynamic_partitions("my_data_partitions")
)
actual_partitions = get_existing_data_partitions() # Scan your storage

to_add = actual_partitions - dagster_partitions
for partition_key in to_add:
context.instance.add_dynamic_partition("my_data_partitions", partition_key)
context.log.info(f"Added partition: {partition_key}")

to_remove = dagster_partitions - actual_partitions
for partition_key in to_remove:
context.instance.delete_dynamic_partition("my_data_partitions", partition_key)
context.log.info(f"Removed orphaned partition: {partition_key}")

return dg.SkipReason(f"Synced partitions: +{len(to_add)}, -{len(to_remove)}")

tip

Adding and removing dynamic partition keys is a metadata-only operation in Dagster+ and does not consume credits. For details on what does count toward credits, see Insights.

Alternative approaches

  • Schedule deletion of old files in your storage layer (S3 lifecycle rules, database TTL, etc.) and let the sync sensor reconcile Dagster's view.
  • Use the DagsterInstance API to clean up old run and event logs as a separate concern.
  • Configure tick retention in your dagster.yaml for schedules and sensors. For details, see Instance configuration: data retention.

Operating the sensors

Monitor your retention sensors so you notice when they stop running:

  • Alert if either sensor's last successful tick is older than expected.
  • Alert if partition counts grow unexpectedly, which can indicate the retention sensor is failing silently.
  • Alert if the sync sensor reports large numbers of orphaned partitions, which can indicate the storage layer changed without Dagster being notified.