Time and Watermarks in VoltSP Window Aggregations¶
This page explains how time-based windows (SessionWindow, SlidingWindow and TumblingTimeWindow) handle time.
The time handling can be controlled by timeConfig to meet a level of correctness and latency your pipeline requires.
For example in Java:
// given a stream of events
package org.acme;
public record Event(String userId, Instant timestamp) {}
// define a stream that aggregates events by user and emits them every hour
stream
...
.aggregateWithWindow(Windows.<Event>tumblingTimeWindow()
.withWindowSpan(Duration.ofHours(1))
.withKeyExtractor(Event::userId)
// controls the time handling
.withTimeConfigBuilder(builder -> builder
.withEventTimeExtractor(Event::timestamp)
.withIdleTimeout(Duration.ofMinutes(5))
)
.withAggregateDefinition(builder -> builder
.count()
...
)
.withExceptionHandler((records, context, throwable) -> {
...
})
)
.emit(WindowTrigger.atWindowEnd)
...
In YAML:
version: 1
name: "WindowedPipeline"
source:
...
emitter:
tumbling-time-window:
windowSpan: 1h
keyExtractor: org.acme.Event::userId
timeConfig:
eventTimeExtractor: "org.acme.Event::timestamp"
idleTimeout: 5m
aggregateDefinition:
from: org.acme.Event
builder:
- type: count
...
triggers:
- trigger: org.voltdb.stream.plugin.window.api.WindowTrigger#atWindowEnd
processors:
...
sink:
...
Overview¶
When events flow into a windowed aggregation, the window has to make two decisions for every event:
- Which window does this event belong to? — based on the event's timestamp.
- When is a window ready to emit? — based on how far time has progressed in the stream.
You shape those decisions with a set of settings:
eventTimeExtractor— optional; controls where event timestamps come from. If not set, time is sourced from the worker's wall clock at processing time.watermarkStrategy— optional; controls how aggressively the stream's notion of time advances.delayWindowClose— optional; controls how long to keep a window open after it would otherwise close. This setting absorbs late or slightly out-of-order events. The default is0- no delay.idleTimeout— required for a session window, optional for other types of windows; how long a window may sit without new events before it is closed and emitted.idleTimeoutClock— optional; controls which clock measures the silence (wall clock or event time).
The Processing Time¶
The default time strategy is Processing Time. The time is sourced from the worker's, local clock at the moment of processing – hence the name. Use when results should reflect when the system saw the event, not when it happened. Suitable for monitoring, throughput-style metrics, or systems with no reliable event timestamp.
Events cannot be late or be out of order. The window will emit exactly when the real time has elapsed or the window is marked as idle.
In Java:
stream
...
.aggregateWithWindow(Windows.<Event>tumblingTimeWindow()
.withWindowSpan(Duration.ofHours(1))
.withKeyExtractor(Event::userId)
...
)
.emit(WindowTrigger.atWindowEnd)
...
In YAML:
version: 1
name: "WindowedPipeline"
source:
...
emitter:
tumbling-time-window:
windowSpan: 1h
keyExtractor: "org.acme.Event::userId"
...
The Event Time¶
In this strategy the source of time is detached from a local clock. Use when correctness depends on the real-world time of the event — for example billing, fraud detection, or aggregations over user-perceived intervals. Required when events can arrive out of order or are replayed from history.
To source a time from an event, set the eventTimeExtractor.
In Java:
stream
...
.aggregateWithWindow(Windows.<Event>tumblingTimeWindow()
...
.withTimeConfigBuilder(builder -> builder
.withEventTimeExtractor(Event::timestamp)
)
...
)
.emit(WindowTrigger.atWindowEnd)
...
In YAML:
version: 1
name: "WindowedPipeline"
source:
...
emitter:
tumbling-time-window:
timeConfig:
eventTimeExtractor: "org.acme.Event::timestamp"
...
Sourcing time for Idle timeout¶
By default, when a custom time extractor is set, the time is sourced from the event for aggregation and emits heuristics.
This behavior can be overridden by setting idleTimeoutClock to PROCESSING_TIME. The window will detect idleness based on real time.
In Java:
.withTimeConfigBuilder(config -> config
.withEventTimeExtractor(Event::timestamp)
.withIdleTimeout(Duration.ofMinutes(5))
.withIdleTimeoutClock(TimeSource.EVENT_TIME))
In YAML:
timeConfig:
eventTimeExtractor: "com.example.Event::timestamp"
idleTimeout: 5m
idleTimeoutClock: EVENT_TIME
Visualizing the difference¶
Given a Tumbling Time Window of 10-second span, consider four events e1..e4 with these characteristics:
| event time (when created) | processing time (arrival time) | |
|---|---|---|
e1 |
2s | 3s |
e2 |
4s | 5s |
e3 |
8s | 13s |
e4 |
15s | 9s |
Note that:
e4arrives beforee3e4's event time is later than its arrival time (e.g. worker's and external source's clocks are slightly out of sync).
Under Processing Time strategy, the worker's wall clock at the moment of processing is treated as the event's timestamp.
With a 10-second tumbling window ([0s,10s), [10s,20s), …), events are therefore bucketed by their arrival time:
arrival: 0 e1 e2 e4 e3 | 20s
├────●──────●───────────────●──────────●────┤────►
window [0s,10s): { e1, e2, e4 } ← closes at wall clock = 10s
window [10s,20s): { e3 } ← closes at wall clock = 20s
Under Event Time strategy when the time extractor is provided, the events are bucketed by the event's own timestamp:
event time: 0 e1 e2 e3 e4 | 20s
├────●──────●───────────●─────────────●─────┤────►
window [0s,10s): { e1, e2, e3 } ← e3 is aggregated based on its timestamp
window [10s,20s): { e4 }
Note. The same input stream produces different aggregates depending on the time strategy. Also note that event time strategy can mark event as late. See Late events for details.
Watermarks¶
A watermark is the stream's answer to "we don't expect events older than this anymore!" It is a single timestamp that only moves forward. A window becomes eligible to close once the watermark has passed its end.
An independent watermark is created for each event key under a single processing worker. Per worker a single global watermark is maintained to aid idleness detection.
Watermarks are used under both time sourcing strategies and answer when windowed aggregation decides to close.
What differs is what feeds the watermark:
- Under event time strategy the watermark is updated from each event's extracted timestamp.
- Under processing time strategy the watermark is updated from the worker's wall clock at the moment of processing — so the watermark effectively tracks wall-clock time.
A few properties to keep in mind when reasoning about results:
- The watermark only moves forward. It will never go backwards, even if an event with an older timestamp arrives.
- It updates with traffic. The watermark advances as events are processed, not on a fixed schedule. If no events arrive, the watermark does not move.
- It is created by key. When the window is configured with a key extractor, each key has its own watermark. A slow source of a single key does not stall the rest of the stream; A fast source of a single key does not prematurely close another key's window.
- When no key extractor is provided, all events share a single watermark.
Watermark progression at a glance¶
VoltSP defines two progression strategies - MIN_EVENT_TIME and MAX_EVENT_TIME.
The diagrams below use the default MIN_EVENT_TIME strategy — the watermark advances to the smallest event time observed in each batch.
Suppose batch 1 contains three events with event times 2s, 4s and 9s:
batch 1
event time: 0───●────●────●─────────────────────────►
e1 e2 e3
2s 4s 9s
after batch 1 commits:
▲
watermark ────────┘ (= smallest event time in the batch = 2s)
Batch 2 brings three more events at 5s, 8s and 15s:
batch 1 batch 2
event time: 0───●────●────●────────────●────●────●──►
e1 e2 e3 e4 e5 e6
2s 4s 9s 5s 8s 15s
late check (each event's time vs. current watermark = 2s):
e4 = 5s → 5s > 2s ✓ accepted
e5 = 8s → 8s > 2s ✓ accepted
e6 = 15s → 15s > 2s ✓ accepted
after batch 2 commits:
▲
watermark moves to ──────────────────────┘ (= 5s, the smallest in batch 2)
Notice how MIN_EVENT_TIME keeps the watermark conservative — even
though batch 2 contains an event at 15s, the watermark only advances to
5s, because that is the earliest time still arriving from the source.
Anything older than 5s arriving from now on will be considered late:
later, a late e_late with event time 3s arrives:
event time: 0────●──────────►
e_late
3s
▲
watermark ────────────┘ (= 5s, from batch 2)
3s < 5s → e_late is late.
This is the core invariant: the watermark only advances, never retreats, so once it has moved past a timestamp every later arrival with an older timestamp is considered too late to participate in its window.
MIN_EVENT_TIME vs. MAX_EVENT_TIME¶
watermarkStrategy controls how aggressively the watermark advances as events flow in:
| Strategy | Behaviour |
|---|---|
MIN_EVENT_TIME |
Conservative. The watermark advances slowly. Fewer events are classified as late and fewer windows close at once. Recommended default. |
MAX_EVENT_TIME |
Aggressive. The watermark advances faster, so windows close sooner — but more late-arriving events risk being dropped. |
If you are not sure which one to pick, leave it on MIN_EVENT_TIME. It is the default.
The choice only matters when timestamps come from events.
Let's revisit how events are aggregated, if the watermarkStrategy was set to MAX_EVENT_TIME.
After first batch of events:
batch 1
event time: 0───●────●────●─────────────────────────►
e1 e2 e3
2s 4s 9s
after batch 1 commits:
▲
watermark ──────────────────┘ (= 9s)
After second batch of events:
batch 1 batch 2
event time: 0───●────●────●────────────●────●────●──►
e1 e2 e3 e4 e5 e6
2s 4s 9s 5s 8s 15s
late check (each event's time vs. current watermark = 9s):
e4 = 5s → 5s < 9s ❌ declined
e5 = 8s → 8s < 9s ❌ declined
e6 = 15s → 15s > 9s ✓ accepted
after batch 2 commits:
▲
watermark moves to ──────────────────────────────────┘ (= 15s)
Late events¶
An event is considered late when its timestamp is before the current watermark. Late events do not aggregate into their nominal window. Instead, they are routed to the window's exception handler, if you configured one. Without an exception handler late events are silently dropped.
The watermark used to detect late events already accounts for delayWindowClose (see below).
Future outliers¶
When event time strategy is used, it may happen that a remote source emits events out of order. This can produce late events and also future events. Time-based windows will create a future aggregate for each future event. VoltSP chooses correctness over slightly increased memory consumption. Once a watermark advances to the future time, the aggregate will be emitted.
Delay (grace period)¶
delayWindowClose keeps a window open for a configurable extra duration
after it would otherwise be closed. With a delay of D, a window whose end
is at T will not close until time has progressed past T + D. This gives
slightly out-of-order events a chance to land in their intended window.
Recommended values are small — typically 1–10 seconds. The setting is meant
to absorb minor jitter in event arrival, not to compensate for upstream
outages. If your upstream is down for minutes at a time, fix the upstream
rather than inflating delayWindowClose.
Once a window closes, further events that would have belonged to it are treated as late events.
Interaction with watermarkStrategy¶
delayWindowClose and watermarkStrategy are two knobs on the same dial.
The effective watermark used to close windows and detect late
events is watermark − delayWindowClose. watermarkStrategy decides
how the watermark advances per batch:
MIN_EVENT_TIMEadvances it to the earliest event in the batch.MAX_EVENT_TIMEadvances it to the latest event in the batch.
For a batch spanning S (= max event time − min event time), the two
strategies leave the effective watermark S apart. You can compensate
in either direction by adjusting the delay:
MAX_EVENT_TIMEwithdelayWindowClose = D + Sis roughly as conservative asMIN_EVENT_TIMEwithdelayWindowClose = D.MIN_EVENT_TIMEwithdelayWindowClose = D − S(clamped at0) is roughly as aggressive asMAX_EVENT_TIMEwithdelayWindowClose = D.
So the choice between strategies is not binary: if you prefer
MAX_EVENT_TIME for its faster reaction to event-time progress but
worry about late events being dropped, raise delayWindowClose to
match the lateness tolerance you would have had under MIN_EVENT_TIME.
The trade-off is the same either way — accepting more late events
costs extra emission latency.
MIN_EVENT_TIME remains the recommended default because it tolerates
out-of-order events without needing knowledge of the batch span S,
which can vary with traffic.
Idle timeout¶
idleTimeout lets a window close because nothing has happened for a while for a given key.
It is required for SessionWindow and optional for other windows, where it adds an extra trigger on top of their primary close condition.
When configured, the window emits any data it is holding for a key once that
key has been silent for at least idleTimeout. This works in two scenarios:
- While the pipeline is busy. If other keys keep receiving events, the window will notice quiet keys as part of its normal processing and emit them once they cross the threshold.
- While the pipeline is silent. If the whole pipeline stops receiving events, the window relies on the pipeline-wide heartbeat (see below) to fire idle cleanup. This prevents data from sitting in the window indefinitely when traffic stops altogether.
If you do not set idleTimeout, neither scenario applies and the window
will close only on its primary trigger (time span, session gap, count, …).
See Sourcing time for Idle timeout for the trade-off between real-time and event-time idle detection.
Pipeline heartbeat¶
The pipeline heartbeat is a periodic wake-up signal sent to all sinks in the
pipeline at a fixed interval of 60 seconds. Windows use it to honour
idleTimeout even when no events are flowing.
Worked example: building up the configuration¶
This section walks one dataset through several configurations to show which problem each setting actually solves.
The data¶
Five events arrive at the worker. Their event times and the batch in which they arrive are:
| Event | Processing Time | Event time | Arrival batch | Note |
|---|---|---|---|---|
e1 |
7s | 2s | 1 | |
e2 |
7s | 5s | 1 | |
e4 |
8s | 12s | 2 | arrives before e3 |
e3 |
9s | 8s | 3 | |
e5 |
10s | 25s | 4 |
The window is a 10-second tumbling window: [0,10s), [10,20s), [20,30s), ….
Scenario 0 — Processing Time (default)¶
In Java:
.aggregateWithWindow(Windows.<Event>tumblingTimeWindow()
.withWindowSpan(Duration.ofHours(1))
.withKeyExtractor(Event::userId)
.withAggregateDefinition(builder -> builder
.count()
...
)
.withExceptionHandler((records, context, throwable) -> {
...
})
...
Events are bucketed by arrival, not by their real-world timestamps. Only e5 is aggregated into a second window.
| Batch Processed | Aggregates created | Is emitted | Watermark |
|---|---|---|---|
| 1 | 1 (e1, e2) |
No | 7s |
| 2 | 1 (e1, e2, e4) |
No | 8s |
| 3 | 1 (e1, e2, e4, e3) |
No | 9s |
| 4 | 1 (e1, e2, e4, e3)2 ( e5) |
Yes No |
10s |
Scenario 1 — switch to Event Time¶
In Java:
.aggregateWithWindow(Windows.<Event>tumblingTimeWindow()
.withWindowSpan(Duration.ofHours(1))
.withKeyExtractor(Event::userId)
// introduce event time by adding a time extractor
.withTimeConfigBuilder(builder -> builder
.withEventTimeExtractor(Event::timestamp)
)
.withAggregateDefinition(builder -> ...)
...
Aggregates are now decided by each event's own timestamp.
e3 belongs to [0,10s) aggregate but is already late, As second batch moved watermark to 12s:
| Batch Processed | Aggregates created | Is emitted | Watermark |
|---|---|---|---|
| 1 | 1 (e1, e2) |
No | 2s |
| 2 | 1 (e1, e2)2 ( e4) |
Yes No |
12s |
| 3 | 2 (e4) |
No | 12s |
| 4 | 1 (e4)3 ( e5) |
Yes No |
25s |
After batch 2 the watermark sits at 12s, so window [0,10s) is already
eligible to emit and does so with {e1, e2}. When e3 arrives in
batch 3 with event time 8s, its lateness check is 8s < 12s → is marked late.
It is routed to the exception handler, or silently dropped if none is
configured.
Scenario 2 — add delayWindowClose¶
In Java:
.aggregateWithWindow(Windows.<Event>tumblingTimeWindow()
.withWindowSpan(Duration.ofHours(1))
.withKeyExtractor(Event::userId)
// introduce event time
.withTimeConfigBuilder(builder -> builder
.withEventTimeExtractor(Event::timestamp)
.withDelayWindowClose(Duration.ofSeconds(5))
)
.withAggregateDefinition(builder -> ...)
...
The watermark used for closing and lateness is now watermark − 5s:
| Batch Processed | Aggregates created | Is emitted | Watermark | Effective |
|---|---|---|---|---|
| 1 | 1 (e1, e2) |
No | 2s | 0s |
| 2 | 1 (e1, e2)2 ( e4) |
No No |
12s |
7s |
| 3 | 1 (e1, e2, e3)2 ( e4) |
No No |
12s | 7s |
| 4 | 1 (e1, e2, e3)2 ( e4) 3 ( e5) |
Yes Yes No |
25s |
20s |
After batch 2 the effective watermark is 7s, so [0,10s) (end = 10s) stays open.
When e3 arrives in batch 3 the lateness check is 8s < 7s and 8s < 10s → false. e3 lands in [0,10s) where it belongs.
The window finally closes after batch 4 pushes the effective watermark
past 10s, and emits {e1, e2, e3}.
This is the correctness payoff of delayWindowClose: out-of-order
arrivals up to the configured delay no longer fall off the cliff.
Scenario 3 — picking the watermark strategy¶
So far every batch from batch 2 onward held at most one event, so
MIN_EVENT_TIME and MAX_EVENT_TIME would behave identically here.
The strategies diverge only when a batch spans event time — when a
single batch contains both an early and a late event.
Let's add more events
| Event | Event time | Arrival batch | Note |
|---|---|---|---|
e1 |
2s | 1 | |
e2 |
5s | 1 | |
e4 |
12s | 2 | arrives before e3 |
e6 |
9s | 2 | arrives before e3 |
e3 |
8s | 3 | |
e5 |
25s | 4 |
Let's push those events through the window configured in Java:
.aggregateWithWindow(Windows.<Event>tumblingTimeWindow()
.withWindowSpan(Duration.ofHours(1))
.withKeyExtractor(Event::userId)
// introduce event time
.withTimeConfigBuilder(builder -> builder
.withEventTimeExtractor(Event::timestamp)
.withWatermarkStrategy(WatermarkStrategy.MAX_EVENT_TIME)
)
.withAggregateDefinition(builder -> ...)
...
| Batch Processed | Aggregates created | Is emitted | Watermark |
|---|---|---|---|
| 1 | 1 (e1, e2) |
No | 5s |
| 2 | 1 (e1, e2, e6)2 ( e4) |
Yes No |
12s |
| 3 | 2 (e4) |
No | 12s |
| 4 | 2 (e4) 3 ( e5) |
Yes No |
25s |
When switching to MIN_EVENT_TIME, the aggregate would be different
| Batch Processed | Aggregates created | Is emitted | Watermark |
|---|---|---|---|
| 1 | 1 (e1, e2) |
No | 2s |
| 2 | 1 (e1, e2, e6)2 ( e4) |
No No |
9s |
| 3 | 1 (e1, e2, e6, e3)2 ( e4) |
No No |
9s |
| 4 | 1 (e1, e2, e6, e3)2 ( e4)3( e5) |
Yes Yes No |
25s |
Setting a delay to 3s, which is a second batch span, aggregates events in the same way as setting MIN_EVENT_TIME.
.aggregateWithWindow(Windows.<Event>tumblingTimeWindow()
.withWindowSpan(Duration.ofHours(1))
.withKeyExtractor(Event::userId)
// introduce event time
.withTimeConfigBuilder(builder -> builder
.withEventTimeExtractor(Event::timestamp)
.withWatermarkStrategy(WatermarkStrategy.MAX_EVENT_TIME)
.withDelayWindowClose(Duration.ofSeconds(3))
)
.withAggregateDefinition(builder -> ...)
...
| Batch Processed | Aggregates created | Is emitted | Watermark | Effective |
|---|---|---|---|---|
| 1 | 1 (e1, e2) |
No | 5s | 2s |
| 2 | 1 (e1, e2, e6)2 ( e4) |
No No |
12s |
9s |
| 3 | 1 (e1, e2, e6, e3)2 ( e4) |
No No |
12s |
9s |
| 4 | 1 (e1, e2, e6, e3)2 ( e4)3( e5) |
Yes Yes No |
25s |
22s |
- Under
MIN_EVENT_TIME(watermark = 9s),e3escapes lateness even without a delay.. - Under
MAX_EVENT_TIME(watermark = 12s),e3needsdelay ≥ 3s.
Same data, same window — MAX_EVENT_TIME consumes 3 extra seconds of
delay budget, exactly the within-batch event-time spread (12s − 9s).
The wider the spread, the bigger the gap. See Interaction with watermarkStrategy for how to compensate explicitly.
MIN_EVENT_TIME is the recommended default for this reason: it absorbs within-batch out-of-order without requiring you to know the batch spread up-front.
Scenario 4 — handle quiet keys with idleTimeout¶
The configuration above relies on later traffic (e5) to push the watermark past 10s and 20s, so first and second aggregate can be emitted.
If e5 never arrived — say a particular user falls silent — [0,10s) would sit open forever.
idleTimeout rescues this case:
.aggregateWithWindow(Windows.<Event>tumblingTimeWindow()
.withWindowSpan(Duration.ofHours(1))
.withKeyExtractor(Event::userId)
// introduce event time
.withTimeConfigBuilder(builder -> builder
.withEventTimeExtractor(Event::timestamp)
.withWatermarkStrategy(WatermarkStrategy.MAX_EVENT_TIME)
.withDelayWindowClose(Duration.ofSeconds(3))
.withIdleTimeout(Duration.ofMinutes(5))
)
.withAggregateDefinition(builder -> ...)
...
After 5 minutes of silence the window emits whatever it holds for that key. The pipeline heartbeat keeps this honest even when all keys go silent. By default silence is measured against the worker's wall clock; see Sourcing time for Idle timeout to switch to event-time semantics.