Tumbling-count-window¶
The tumbling-count-window creates a-per-worker tumbling count window. In this implementation a window aggregate is stored only in the machine's memory.
The aggregate is emitted to configured sink via a emit point.
Based on VoltAggregateBuilder each event is transformed into a request.
Lastly a key can be provided so incoming event is routed to the right local window and aggregated.
Note that the window is a local, per-worker keyed aggregate, not a machine-wide keyed aggregate.
In case of downstream system error this tumbling-count-window will re-process events.
stream
.aggregateWithWindow(Windows.<Event>tumblingCountWindow()
.withMaxEventsPerWindow(500)
.withKeyExtractor(Event::getUserId)
.withAggregateDefinition(builder -> builder
.count()
.sum("amount", Event::getAmount)
.min("amount", Event::getAmount)
.max("amount", Event::getAmount)
.distinct("id", (EventToStringExtractor<Event>) Event::getId)
)
.withExceptionHandler((records, context, throwable) -> {
// handle error
})
)
.emit(WindowTrigger.atWindowEnd)
...
emitter:
tumbling-count-window:
maxEventsPerWindow: 500
keyExtractor: "com.example.Event::getUserId"
aggregateDefinition:
from: "com.example.Event"
builder:
- type: count
- type: sum (amount)
- type: min (amount)
- type: max (amount)
- type: distinct (id)
triggers:
- trigger: org.voltdb.stream.plugin.window.api.WindowTrigger#atWindowEnd
...
Java dependency management¶
Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.
<dependency>
<groupId>org.voltdb</groupId>
<artifactId>volt-stream-plugin-window-api</artifactId>
<version>1.8.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-window-api', version: '1.8.0'
Properties¶
maxEventsPerWindow¶
Defines how many events this window can accumulate before it gets closed. Must be greater than 0
Type: number
keyExtractor¶
The key is optional, if set the window will aggregate only events with the same key value. The key has to conform to Java equal and hashCode contract.
Type: object
aggregateDefinition¶
The aggregate is defined by the provided builder Required.
Type: object
timeConfig¶
Configuration for processing-time or event-time handling.
Type: object
Fields of timeConfig:
timeConfig.eventTimeExtractor¶
If provided, the time is sourced from the event itself. Without it, time comes from the worker's wall clock at the moment of processing.
Required when idleTimeoutClock is EVENT_TIME (event-time idle needs event timestamps).
Type: object
timeConfig.idleTimeout¶
Inactivity timeout after which a window (or session) is emitted and closed.
While the pipeline is busy, quiet keys are detected as part of normal batch processing and emitted once they cross this threshold. While the pipeline is fully silent, idle cleanup is driven by the periodic pipeline heartbeat (60s).
If idleTimeout is not set, idle cleanup is disabled and the window only closes via its primary trigger (count, time span, session gap, ...).
The clock used to measure silence is controlled by idleTimeoutClock.
Type: object
timeConfig.idleTimeoutClock¶
Specifies the clock used to measure inactivity for idleTimeout: - PROCESSING_TIME - silence measured in real time. Quiet keys flush after idleTimeout of real time, regardless of event-time progression. Suitable for live streams where the idle SLA is real time. - EVENT_TIME - silence measured in event time (requires eventTimeExtractor). Quiet keys flush only as the watermark advances by idleTimeout. Suitable for replays / historical processing where results must be deterministic.
Type: object
Supported values: processing_time, event_time.
exceptionHandler¶
Custom exception handler enabling interception of all errors related to this emitter.
Type: object
Available Triggers¶
org.voltdb.stream.api.pipeline.emitter.ExceptionTrigger.onError¶
When used, an emitter will create a sub stream of all events that were consumed.
org.voltdb.stream.plugin.window.api.WindowTrigger.atWindowEnd¶
Triggered after window has been closed.
org.voltdb.stream.plugin.window.api.WindowTrigger.atAggregate¶
Triggered after each event has been aggregated. An emitted aggregate is a snapshot of the aggregate after the event has been applied.
JSON Schema¶
You can validate or explore the configuration using its JSON Schema.
Metrics¶
Window metrics¶
Metric enum: org.voltdb.stream.plugin.window.WindowMetric
| Prometheus name | Type | Description |
|---|---|---|
voltsp_aggregations_unique_keys_total |
gauge |
Current number of unique aggregate keys held by a local window. |
voltsp_aggregations_created_total |
counter |
Number of aggregate windows created. |
voltsp_aggregations_emitted_total |
counter |
Number of aggregate snapshots emitted by a local window. |
voltsp_emitted_total |
counter |
Number of events emitted by a local window. This can be errors, original events, late events, etc. |
voltsp_idle_keys_total |
counter |
Number of idle aggregate keys removed by a local window. |
voltsp_late_event_total |
counter |
Number of late events observed by a local window. |