Skip to content

Time and Watermarks in VoltSP Window Aggregations

This page explains how time-based windows (SessionWindow, SlidingWindow and TumblingTimeWindow) handle time. The time handling can be controlled by timeConfig to meet a level of correctness and latency your pipeline requires.

For example in Java:

// given a stream of events
package org.acme;

public record Event(String userId, Instant timestamp) {}   

// define a stream that aggregates events by user and emits them every hour
stream
    ...
    .aggregateWithWindow(Windows.<Event>tumblingTimeWindow()
        .withWindowSpan(Duration.ofHours(1))
        .withKeyExtractor(Event::userId)
        // controls the time handling
        .withTimeConfigBuilder(builder -> builder
            .withEventTimeExtractor(Event::timestamp)
            .withIdleTimeout(Duration.ofMinutes(5))
        )
        .withAggregateDefinition(builder -> builder
            .count()
            ...
        )
        .withExceptionHandler((records, context, throwable) -> {
            ...
        })
    )
    .emit(WindowTrigger.atWindowEnd)
    ...

In YAML:

version: 1
name: "WindowedPipeline"
source:
  ...
emitter:
  tumbling-time-window:
    windowSpan: 1h
    keyExtractor: org.acme.Event::userId
    timeConfig:
      eventTimeExtractor: "org.acme.Event::timestamp"
      idleTimeout: 5m
    aggregateDefinition:
      from: org.acme.Event
      builder:
      - type: count
        ...
    triggers:
    - trigger: org.voltdb.stream.plugin.window.api.WindowTrigger#atWindowEnd
      processors:
          ...
      sink:
          ...

Overview

When events flow into a windowed aggregation, the window has to make two decisions for every event:

  1. Which window does this event belong to? — based on the event's timestamp.
  2. When is a window ready to emit? — based on how far time has progressed in the stream.

You shape those decisions with a set of settings:

  • eventTimeExtractoroptional; controls where event timestamps come from. If not set, time is sourced from the worker's wall clock at processing time.
  • watermarkStrategyoptional; controls how aggressively the stream's notion of time advances.
  • delayWindowCloseoptional; controls how long to keep a window open after it would otherwise close. This setting absorbs late or slightly out-of-order events. The default is 0 - no delay.
  • idleTimeoutrequired for a session window, optional for other types of windows; how long a window may sit without new events before it is closed and emitted.
  • idleTimeoutClockoptional; controls which clock measures the silence (wall clock or event time).

The Processing Time

The default time strategy is Processing Time. The time is sourced from the worker's, local clock at the moment of processing – hence the name. Use when results should reflect when the system saw the event, not when it happened. Suitable for monitoring, throughput-style metrics, or systems with no reliable event timestamp.

Events cannot be late or be out of order. The window will emit exactly when the real time has elapsed or the window is marked as idle.

In Java:

stream
    ...
    .aggregateWithWindow(Windows.<Event>tumblingTimeWindow()
        .withWindowSpan(Duration.ofHours(1))
        .withKeyExtractor(Event::userId)
        ...
    )
    .emit(WindowTrigger.atWindowEnd)
    ...

In YAML:

version: 1
name: "WindowedPipeline"
source:
  ...
emitter:
  tumbling-time-window:
    windowSpan: 1h
    keyExtractor: "org.acme.Event::userId"
    ...

The Event Time

In this strategy the source of time is detached from a local clock. Use when correctness depends on the real-world time of the event — for example billing, fraud detection, or aggregations over user-perceived intervals. Required when events can arrive out of order or are replayed from history.

To source a time from an event, set the eventTimeExtractor.

In Java:

stream
    ...
    .aggregateWithWindow(Windows.<Event>tumblingTimeWindow()
        ...
        .withTimeConfigBuilder(builder -> builder
            .withEventTimeExtractor(Event::timestamp)
        )
        ...
    )
    .emit(WindowTrigger.atWindowEnd)
    ...

In YAML:

version: 1
name: "WindowedPipeline"
source:
    ...
emitter:
  tumbling-time-window:
    timeConfig:
      eventTimeExtractor: "org.acme.Event::timestamp"
    ...

Sourcing time for Idle timeout

By default, when a custom time extractor is set, the time is sourced from the event for aggregation and emits heuristics. This behavior can be overridden by setting idleTimeoutClock to PROCESSING_TIME. The window will detect idleness based on real time.

In Java:

.withTimeConfigBuilder(config -> config
    .withEventTimeExtractor(Event::timestamp)
    .withIdleTimeout(Duration.ofMinutes(5))
    .withIdleTimeoutClock(TimeSource.EVENT_TIME))

In YAML:

timeConfig:
  eventTimeExtractor: "com.example.Event::timestamp"
  idleTimeout: 5m
  idleTimeoutClock: EVENT_TIME

Visualizing the difference

Given a Tumbling Time Window of 10-second span, consider four events e1..e4 with these characteristics:

event time (when created) processing time (arrival time)
e1 2s 3s
e2 4s 5s
e3 8s 13s
e4 15s 9s

Note that:

  • e4 arrives before e3
  • e4's event time is later than its arrival time (e.g. worker's and external source's clocks are slightly out of sync).

Under Processing Time strategy, the worker's wall clock at the moment of processing is treated as the event's timestamp.
With a 10-second tumbling window ([0s,10s), [10s,20s), …), events are therefore bucketed by their arrival time:

arrival:    0    e1     e2              e4         e3   |    20s
            ├────●──────●───────────────●──────────●────┤────►

window [0s,10s):    { e1, e2, e4 }   ← closes at wall clock = 10s
window [10s,20s):   { e3 }           ← closes at wall clock = 20s

Under Event Time strategy when the time extractor is provided, the events are bucketed by the event's own timestamp:

event time: 0    e1     e2          e3            e4    |    20s
            ├────●──────●───────────●─────────────●─────┤────►

window [0s,10s):    { e1, e2, e3 }   ← e3 is aggregated based on its timestamp
window [10s,20s):   { e4 }

Note. The same input stream produces different aggregates depending on the time strategy. Also note that event time strategy can mark event as late. See Late events for details.

Watermarks

A watermark is the stream's answer to "we don't expect events older than this anymore!" It is a single timestamp that only moves forward. A window becomes eligible to close once the watermark has passed its end.

An independent watermark is created for each event key under a single processing worker. Per worker a single global watermark is maintained to aid idleness detection.

Watermarks are used under both time sourcing strategies and answer when windowed aggregation decides to close.
What differs is what feeds the watermark:

  • Under event time strategy the watermark is updated from each event's extracted timestamp.
  • Under processing time strategy the watermark is updated from the worker's wall clock at the moment of processing — so the watermark effectively tracks wall-clock time.

A few properties to keep in mind when reasoning about results:

  • The watermark only moves forward. It will never go backwards, even if an event with an older timestamp arrives.
  • It updates with traffic. The watermark advances as events are processed, not on a fixed schedule. If no events arrive, the watermark does not move.
  • It is created by key. When the window is configured with a key extractor, each key has its own watermark. A slow source of a single key does not stall the rest of the stream; A fast source of a single key does not prematurely close another key's window.
  • When no key extractor is provided, all events share a single watermark.

Watermark progression at a glance

VoltSP defines two progression strategies - MIN_EVENT_TIME and MAX_EVENT_TIME.

The diagrams below use the default MIN_EVENT_TIME strategy — the watermark advances to the smallest event time observed in each batch.

Suppose batch 1 contains three events with event times 2s, 4s and 9s:

                  batch 1
event time:   0───●────●────●─────────────────────────►
                  e1   e2   e3
                  2s   4s   9s

after batch 1 commits:
                  ▲
watermark ────────┘  (= smallest event time in the batch = 2s)

Batch 2 brings three more events at 5s, 8s and 15s:

                  batch 1                batch 2
event time:   0───●────●────●────────────●────●────●──►
                  e1   e2   e3           e4   e5   e6
                  2s   4s   9s           5s   8s   15s

late check (each event's time vs. current watermark = 2s):
  e4 = 5s   → 5s  > 2s   ✓ accepted
  e5 = 8s   → 8s  > 2s   ✓ accepted
  e6 = 15s  → 15s > 2s   ✓ accepted

after batch 2 commits:
                                         ▲
watermark moves to ──────────────────────┘  (= 5s, the smallest in batch 2)

Notice how MIN_EVENT_TIME keeps the watermark conservative — even though batch 2 contains an event at 15s, the watermark only advances to 5s, because that is the earliest time still arriving from the source. Anything older than 5s arriving from now on will be considered late:

later, a late e_late with event time 3s arrives:

event time:   0────●──────────►
                   e_late
                   3s
                      ▲
watermark ────────────┘  (= 5s, from batch 2)

3s < 5s → e_late is late.

This is the core invariant: the watermark only advances, never retreats, so once it has moved past a timestamp every later arrival with an older timestamp is considered too late to participate in its window.

MIN_EVENT_TIME vs. MAX_EVENT_TIME

watermarkStrategy controls how aggressively the watermark advances as events flow in:

Strategy Behaviour
MIN_EVENT_TIME Conservative. The watermark advances slowly. Fewer events are classified as late and fewer windows close at once. Recommended default.
MAX_EVENT_TIME Aggressive. The watermark advances faster, so windows close sooner — but more late-arriving events risk being dropped.

If you are not sure which one to pick, leave it on MIN_EVENT_TIME. It is the default.

The choice only matters when timestamps come from events.
Let's revisit how events are aggregated, if the watermarkStrategy was set to MAX_EVENT_TIME.

After first batch of events:

                  batch 1
event time:   0───●────●────●─────────────────────────►
                  e1   e2   e3
                  2s   4s   9s

after batch 1 commits:
                            ▲
watermark ──────────────────┘  (= 9s)

After second batch of events:

                  batch 1                batch 2
event time:   0───●────●────●────────────●────●────●──►
                  e1   e2   e3           e4   e5   e6
                  2s   4s   9s           5s   8s   15s

late check (each event's time vs. current watermark = 9s):
  e4 = 5s   → 5s  < 9s   ❌ declined
  e5 = 8s   → 8s  < 9s   ❌ declined
  e6 = 15s  → 15s > 9s   ✓ accepted

after batch 2 commits:
                                                     ▲
watermark moves to ──────────────────────────────────┘  (= 15s)

Late events

An event is considered late when its timestamp is before the current watermark. Late events do not aggregate into their nominal window. Instead, they are routed to the window's exception handler, if you configured one. Without an exception handler late events are silently dropped.

The watermark used to detect late events already accounts for delayWindowClose (see below).

Future outliers

When event time strategy is used, it may happen that a remote source emits events out of order. This can produce late events and also future events. Time-based windows will create a future aggregate for each future event. VoltSP chooses correctness over slightly increased memory consumption. Once a watermark advances to the future time, the aggregate will be emitted.

Delay (grace period)

delayWindowClose keeps a window open for a configurable extra duration after it would otherwise be closed. With a delay of D, a window whose end is at T will not close until time has progressed past T + D. This gives slightly out-of-order events a chance to land in their intended window.

Recommended values are small — typically 1–10 seconds. The setting is meant to absorb minor jitter in event arrival, not to compensate for upstream outages. If your upstream is down for minutes at a time, fix the upstream rather than inflating delayWindowClose.

Once a window closes, further events that would have belonged to it are treated as late events.

Interaction with watermarkStrategy

delayWindowClose and watermarkStrategy are two knobs on the same dial. The effective watermark used to close windows and detect late events is watermark − delayWindowClose. watermarkStrategy decides how the watermark advances per batch:

  • MIN_EVENT_TIME advances it to the earliest event in the batch.
  • MAX_EVENT_TIME advances it to the latest event in the batch.

For a batch spanning S (= max event time − min event time), the two strategies leave the effective watermark S apart. You can compensate in either direction by adjusting the delay:

  • MAX_EVENT_TIME with delayWindowClose = D + S is roughly as conservative as MIN_EVENT_TIME with delayWindowClose = D.
  • MIN_EVENT_TIME with delayWindowClose = D − S (clamped at 0) is roughly as aggressive as MAX_EVENT_TIME with delayWindowClose = D.

So the choice between strategies is not binary: if you prefer MAX_EVENT_TIME for its faster reaction to event-time progress but worry about late events being dropped, raise delayWindowClose to match the lateness tolerance you would have had under MIN_EVENT_TIME. The trade-off is the same either way — accepting more late events costs extra emission latency.

MIN_EVENT_TIME remains the recommended default because it tolerates out-of-order events without needing knowledge of the batch span S, which can vary with traffic.

Idle timeout

idleTimeout lets a window close because nothing has happened for a while for a given key. It is required for SessionWindow and optional for other windows, where it adds an extra trigger on top of their primary close condition.

When configured, the window emits any data it is holding for a key once that key has been silent for at least idleTimeout. This works in two scenarios:

  • While the pipeline is busy. If other keys keep receiving events, the window will notice quiet keys as part of its normal processing and emit them once they cross the threshold.
  • While the pipeline is silent. If the whole pipeline stops receiving events, the window relies on the pipeline-wide heartbeat (see below) to fire idle cleanup. This prevents data from sitting in the window indefinitely when traffic stops altogether.

If you do not set idleTimeout, neither scenario applies and the window will close only on its primary trigger (time span, session gap, count, …).

See Sourcing time for Idle timeout for the trade-off between real-time and event-time idle detection.

Pipeline heartbeat

The pipeline heartbeat is a periodic wake-up signal sent to all sinks in the pipeline at a fixed interval of 60 seconds. Windows use it to honour idleTimeout even when no events are flowing.

Worked example: building up the configuration

This section walks one dataset through several configurations to show which problem each setting actually solves.

The data

Five events arrive at the worker. Their event times and the batch in which they arrive are:

Event Processing Time Event time Arrival batch Note
e1 7s 2s 1
e2 7s 5s 1
e4 8s 12s 2 arrives before e3
e3 9s 8s 3
e5 10s 25s 4

The window is a 10-second tumbling window: [0,10s), [10,20s), [20,30s), ….

Scenario 0 — Processing Time (default)

In Java:

.aggregateWithWindow(Windows.<Event>tumblingTimeWindow()
    .withWindowSpan(Duration.ofHours(1))
    .withKeyExtractor(Event::userId)
    .withAggregateDefinition(builder -> builder
        .count()
            ...
    )
    .withExceptionHandler((records, context, throwable) -> {
        ...
    })
    ...

Events are bucketed by arrival, not by their real-world timestamps. Only e5 is aggregated into a second window.

Batch Processed Aggregates created Is emitted Watermark
1 1 (e1, e2) No 7s
2 1 (e1, e2, e4) No 8s
3 1 (e1, e2, e4, e3) No 9s
4 1 (e1, e2, e4, e3)
2 (e5)
Yes
No

10s

Scenario 1 — switch to Event Time

In Java:

.aggregateWithWindow(Windows.<Event>tumblingTimeWindow()
    .withWindowSpan(Duration.ofHours(1))
    .withKeyExtractor(Event::userId)
    // introduce event time by adding a time extractor
    .withTimeConfigBuilder(builder -> builder
        .withEventTimeExtractor(Event::timestamp)
    )
    .withAggregateDefinition(builder -> ...)
    ...

Aggregates are now decided by each event's own timestamp. e3 belongs to [0,10s) aggregate but is already late, As second batch moved watermark to 12s:

Batch Processed Aggregates created Is emitted Watermark
1 1 (e1, e2) No 2s
2 1 (e1, e2)
2 (e4)
Yes
No

12s
3 2 (e4) No 12s
4 1 (e4)
3 (e5)
Yes
No

25s

After batch 2 the watermark sits at 12s, so window [0,10s) is already eligible to emit and does so with {e1, e2}. When e3 arrives in batch 3 with event time 8s, its lateness check is 8s < 12s → is marked late. It is routed to the exception handler, or silently dropped if none is configured.

Scenario 2 — add delayWindowClose

In Java:

.aggregateWithWindow(Windows.<Event>tumblingTimeWindow()
    .withWindowSpan(Duration.ofHours(1))
    .withKeyExtractor(Event::userId)
    // introduce event time
    .withTimeConfigBuilder(builder -> builder
        .withEventTimeExtractor(Event::timestamp)
        .withDelayWindowClose(Duration.ofSeconds(5))
    )
    .withAggregateDefinition(builder -> ...)
    ...

The watermark used for closing and lateness is now watermark − 5s:

Batch Processed Aggregates created Is emitted Watermark Effective
1 1 (e1, e2) No 2s 0s
2 1 (e1, e2)
2 (e4)
No
No

12s
7s
3 1 (e1, e2, e3)
2 (e4)
No
No
12s 7s
4 1 (e1, e2, e3)
2 (e4)
3 (e5)
Yes
Yes
No


25s


20s

After batch 2 the effective watermark is 7s, so [0,10s) (end = 10s) stays open. When e3 arrives in batch 3 the lateness check is 8s < 7s and 8s < 10s → false. e3 lands in [0,10s) where it belongs. The window finally closes after batch 4 pushes the effective watermark past 10s, and emits {e1, e2, e3}.

This is the correctness payoff of delayWindowClose: out-of-order arrivals up to the configured delay no longer fall off the cliff.

Scenario 3 — picking the watermark strategy

So far every batch from batch 2 onward held at most one event, so MIN_EVENT_TIME and MAX_EVENT_TIME would behave identically here. The strategies diverge only when a batch spans event time — when a single batch contains both an early and a late event.

Let's add more events

Event Event time Arrival batch Note
e1 2s 1
e2 5s 1
e4 12s 2 arrives before e3
e6 9s 2 arrives before e3
e3 8s 3
e5 25s 4

Let's push those events through the window configured in Java:

.aggregateWithWindow(Windows.<Event>tumblingTimeWindow()
    .withWindowSpan(Duration.ofHours(1))
    .withKeyExtractor(Event::userId)
    // introduce event time
    .withTimeConfigBuilder(builder -> builder
        .withEventTimeExtractor(Event::timestamp)
        .withWatermarkStrategy(WatermarkStrategy.MAX_EVENT_TIME)
    )
    .withAggregateDefinition(builder -> ...)
    ...

Batch Processed Aggregates created Is emitted Watermark
1 1 (e1, e2) No 5s
2 1 (e1, e2, e6)
2 (e4)
Yes
No

12s
3 2 (e4) No 12s
4 2 (e4)
3 (e5)
Yes
No

25s

When switching to MIN_EVENT_TIME, the aggregate would be different

Batch Processed Aggregates created Is emitted Watermark
1 1 (e1, e2) No 2s
2 1 (e1, e2, e6)
2 (e4)
No
No

9s
3 1 (e1, e2, e6, e3)
2 (e4)
No
No

9s
4 1 (e1, e2, e6, e3)
2 (e4)
3(e5)
Yes
Yes
No

25s

Setting a delay to 3s, which is a second batch span, aggregates events in the same way as setting MIN_EVENT_TIME.

.aggregateWithWindow(Windows.<Event>tumblingTimeWindow()
    .withWindowSpan(Duration.ofHours(1))
    .withKeyExtractor(Event::userId)
    // introduce event time
    .withTimeConfigBuilder(builder -> builder
        .withEventTimeExtractor(Event::timestamp)
        .withWatermarkStrategy(WatermarkStrategy.MAX_EVENT_TIME)
        .withDelayWindowClose(Duration.ofSeconds(3))
    )
    .withAggregateDefinition(builder -> ...)
    ...

Batch Processed Aggregates created Is emitted Watermark Effective
1 1 (e1, e2) No 5s 2s
2 1 (e1, e2, e6)
2 (e4)
No
No

12s
9s
3 1 (e1, e2, e6, e3)
2 (e4)
No
No

12s
9s
4 1 (e1, e2, e6, e3)
2 (e4)
3(e5)
Yes
Yes
No

25s


22s
  • Under MIN_EVENT_TIME (watermark = 9s), e3 escapes lateness even without a delay..
  • Under MAX_EVENT_TIME (watermark = 12s), e3 needs delay ≥ 3s.

Same data, same window — MAX_EVENT_TIME consumes 3 extra seconds of delay budget, exactly the within-batch event-time spread (12s − 9s). The wider the spread, the bigger the gap. See Interaction with watermarkStrategy for how to compensate explicitly.

MIN_EVENT_TIME is the recommended default for this reason: it absorbs within-batch out-of-order without requiring you to know the batch spread up-front.

Scenario 4 — handle quiet keys with idleTimeout

The configuration above relies on later traffic (e5) to push the watermark past 10s and 20s, so first and second aggregate can be emitted. If e5 never arrived — say a particular user falls silent — [0,10s) would sit open forever. idleTimeout rescues this case:

.aggregateWithWindow(Windows.<Event>tumblingTimeWindow()
    .withWindowSpan(Duration.ofHours(1))
    .withKeyExtractor(Event::userId)
    // introduce event time
    .withTimeConfigBuilder(builder -> builder
        .withEventTimeExtractor(Event::timestamp)
        .withWatermarkStrategy(WatermarkStrategy.MAX_EVENT_TIME)
        .withDelayWindowClose(Duration.ofSeconds(3))
        .withIdleTimeout(Duration.ofMinutes(5))
    )
    .withAggregateDefinition(builder -> ...)
    ...

After 5 minutes of silence the window emits whatever it holds for that key. The pipeline heartbeat keeps this honest even when all keys go silent. By default silence is measured against the worker's wall clock; see Sourcing time for Idle timeout to switch to event-time semantics.