Pipelines carry their own schedule. You can run them from cron, systemd,
a Kubernetes CronJob, GitHub Actions, or the bundled long-running
scheduler — same code, same topological order, same retry semantics.
Cron
@ematix.pipeline(target=Events, target_connection="warehouse", schedule="*/5 * * * *")
def ingest_events(conn): ...
Then from the shell:
flow run-due --module my_pipelines
run-due looks at every pipeline whose schedule matches now() and
runs it. Wire that into the host’s cron at one-minute granularity.
One-shot
flow run --module my_pipelines ingest_events
Runs the pipeline once, ignoring schedule. Useful in CI or for manual backfills.
Programmatic
from my_pipelines import ingest_events
ingest_events.sync() # run now, synchronously
ingest_events.sync(force=True) # ignore watermark for one run
The same object you decorated is callable from any orchestrator.
Already on Airflow / Dagster / Prefect? Wrap .sync() in a task.
DAG dependencies
Pipelines can declare depends_on= edges. ematix-flow builds the DAG,
checks for cycles, and runs in topological order with exponential-
backoff retries between layers.
@ematix.pipeline(target=DailyRollup, schedule="0 1 * * *",
depends_on=[ingest_events, ingest_users])
def daily_rollup(conn):
return "SELECT date_trunc('day', received_at) AS day, COUNT(*) AS c FROM analytics.events GROUP BY 1"
Cycle detection runs at decoration time — a circular depends_on raises
immediately rather than at the next scheduled run.
Run history
Every run writes a row to the flow run-history store: pipeline name,
schedule, start/end, status, rows written, error message. Inspect with:
flow runs list --pipeline ingest_events --since 24h
flow runs show <run-id>
Next: Streaming pipelines.