Every pipeline declares a mode=. Four are built in.
append
The default. New rows are inserted into the target. No deduplication, no overwrites. Watermarks (see below) make incremental appends safe across restarts.
mode="append"
truncate
Empty the target, then write. Useful for refreshing a materialised view-style table from an upstream source of truth.
mode="truncate"
merge (a.k.a. scd1)
Upsert on the merge keys. Existing rows are overwritten, new rows are inserted.
mode="merge" # uses the target's primary key by default
mode={"name": "merge", "keys": ["event_id", "tenant_id"]}
How merge keys are resolved (highest precedence first):
- Explicit
keys=[...]on themode=dict. - Columns marked
pk()on the target table. - A single-column
unique=Trueconstraint.
If none resolve, the framework refuses to run the pipeline rather than silently doing the wrong thing.
scd2
Slowly-changing-dimension Type 2. Rows get valid_from / valid_to /
is_current columns. Updates close the open row and insert a new one.
mode="scd2"
Incremental loads + watermarks
For sources that have a monotonically-increasing column (an
updated_at, event_time, or auto-incrementing id), declare a
watermark and let ematix-flow track where it left off:
@ematix.pipeline(
target=Events,
target_connection="warehouse",
schedule="*/5 * * * *",
mode="append",
watermark="received_at",
)
def ingest_events(conn, watermark):
return f"""
SELECT event_id, name, received_at
FROM raw.events
WHERE received_at > {watermark!r}
ORDER BY received_at
"""
Watermarks are persisted in the run-history store, so restart-safe.
Next: Scheduling.