Streaming sources can carry SQL transforms, Python UDFs, and windowed aggregations — the same primitives as the batch side, running over a continuous stream of Arrow batches.
SQL transforms
@ematix.streaming_pipeline(
source_connection="kafka_prod",
source_topic="events.v1",
target=DailyByTenant,
target_connection="warehouse",
transform="""
SELECT
tenant_id,
DATE_TRUNC('day', received_at) AS day,
COUNT(*) AS event_count
FROM source
GROUP BY tenant_id, day
""",
)
def rollup(batch): return batch
source is the streaming batch, registered as a virtual table per
batch. The SQL runs in-process against Arrow — no JVM, no shuffle
to a separate cluster.
Python UDFs
Scalar:
import ematix_flow as ef
@ef.udf(return_type=ef.types.Float64)
def fahrenheit_to_celsius(f: float) -> float:
return (f - 32) * 5 / 9
Aggregate (UDAF):
@ef.udaf(return_type=ef.types.Float64, state_type=ef.types.Struct(...))
class RunningAverage:
def init(self): ...
def update(self, state, x): ...
def merge(self, a, b): ...
def finalize(self, state): ...
Both are usable from transform="..." SQL or directly from Python.
Windows
Tumbling, hopping, session — declared on the pipeline:
@ematix.streaming_pipeline(
source_connection="kafka_prod",
source_topic="events.v1",
target=PerMinute,
target_connection="warehouse",
window={"kind": "tumbling", "size": "1 minute", "time_column": "received_at"},
transform="""
SELECT
window_start,
tenant_id,
COUNT(*) AS n
FROM source
GROUP BY window_start, tenant_id
""",
)
def per_minute(batch): return batch
Session windows accept gap= instead of size=. Hopping windows
accept both size= and slide=.
That’s the full surface. Head to Specs for the “why bother” half.