Skip to content

Emitters

Emitters are specialized operators that can break current stream in two or more phases that process events asynchronously.

Basic stream is composed of source and sink and events are processed synchronously in batches. When sink has processed all events in a batch it will await for downstream system to confirm and will ack the batch to its source to ensure at-least-once delivery guarantees. Emitters allow the processing to proceed even thought sink has not received any/all messages from a batch. This enables patterns such as:

  • Windowed aggregation and emission on time/count triggers
  • Request/response fan-out, where an emitter batches multiple events before emitting

Emitters will create a new source for every configured trigger and matching events will be emitted to that source. Each processing phase is decoupled from each other. Emitter can synchronize those phases but for many cases buffered data can be lost as upstream source had been already committed.

Similar to processors an upstream event can be transformed to new shape before emitting to next operator. The main difference is that processors must emit synchronously and an emitter can decide when the event is passed to the next stage.

The following example uses VoltDB emitter that calls procedures in batches and asynchronously pushes responses downstream:

package org.voltdb.stream.example;

import org.voltdb.stream.api.Sinks;
import org.voltdb.stream.api.pipeline.VoltPipeline;
import org.voltdb.stream.api.pipeline.VoltStreamBuilder;
import org.voltdb.stream.plugin.volt.api.BatchedVoltProcedureCallConfigBuilder;
import org.voltdb.stream.plugin.volt.api.VoltEmitters;
import org.voltdb.stream.plugin.volt.api.VoltProcedureTrigger;

public class LongRunningPipelineWithBulkProcedureCall implements VoltPipeline {

    @Override
    public void define(VoltStreamBuilder stream) {
        BatchedVoltProcedureCallConfigBuilder emitter = VoltEmitters
                .batchedAsyncProcedureCall("PassThroughProcedure")
                .withVoltClientResourceName(...);

        stream
                .withName("long running stream")
                .consumeFromSource(...)
                .processWith(uuid -> new Object[]{uuid})
                .terminateWithEmitter(emitter)
                .emit(VoltProcedureTrigger.onEachResponse)
                .processWith(table -> {
                    table.advanceRow();
                    return (String) table.get(0);
                })
                .terminateWithSink(Sinks.stdout());
    }
}