Streaming pipelines are the long-running cousin of batch pipelines. They consume from a streaming source, write to a target, and commit offsets only after a durable target write — at-least-once end to end.
Decorator form
from ematix_flow import ematix
@ematix.connection
class kafka_prod:
kind = "kafka"
bootstrap_servers = "${KAFKA_BOOTSTRAP}"
group_id = "ematix-flow"
@ematix.streaming_pipeline(
source_connection="kafka_prod",
source_topic="events.v1",
target=Events,
target_connection="warehouse",
dead_letter_topic="events.v1.dlq",
)
def consume_events(batch):
return batch # pass-through; transform in Python if needed
Run it as a foreground process:
flow consume --module my_pipelines consume_events
Or wrap with systemd, supervisord, or a Kubernetes Deployment.
TOML form
For ops teams that want pipelines out of Python entirely:
[streaming_pipelines.consume_events]
source = { connection = "kafka_prod", topic = "events.v1" }
target = { connection = "warehouse", table = "analytics.events" }
dead_letter_topic = "events.v1.dlq"
batch_size = 1000
flow consume --config streaming.toml consume_events
Guarantees
- Manual offset commit / ack. ematix-flow calls
commit_offsets()on the source only after a durable target write. - At-least-once end to end is the default.
- Exactly-once. Kafka producer-side via transactions; consumer-
coordinated via
KafkaToKafkaEosPipeline. - DLQ. App-level (
dead_letter_topicroutes failed rows to a separate target) and broker-level (RabbitMQx-dead-letter-exchange, Pub/Subdead_letter_policy). - Schema Registry. Avro / Protobuf decode-encode via Confluent SR or Apicurio — declare it as a connection and reference it from the Kafka connection.