Skip to content

Session-window

The session-window creates a per-worker session window. In this implementation a window aggregate is stored only in the machine's memory. The aggregate is emitted to configured sink via an emit point.

A session window groups events that arrive at similar times. When no events arrive for the gap duration, the session closes. A window can be configured for maximum duration, when events arrive constantly and session cannot be closed. Based on VoltAggregateBuilder each event is transformed into an aggregate.

If eventTimeExtractor is provided, it is used to source the time of the event.

If 'keyExtractor' is provided, an aggregate is created per worker and per key. Note that the window is a local, per-worker keyed aggregate, not a machine-wide keyed aggregate.

In case of downstream system error this session-window will re-process events.

stream
  .aggregateWithWindow(Windows.<Event>sessionWindow()
      .withIdleTimeout(Duration.ofMinutes(5))
      .withMaximumDuration(Duration.ofMinutes(10))
      .withKeyExtractor(Event::getUserId)
      .withTimeConfigBuilder(builder -> builder
           .withEventTimeExtractor(Event::eventTime))
      .withAggregateDefinition(builder -> builder
          .count()
          .sum("amount", Event::getAmount)
          .min("amount", Event::getAmount)
          .max("amount", Event::getAmount)
          .distinct("id", (EventToStringExtractor<Event>) Event::getId)
      )
      .withExceptionHandler((records, context, throwable) -> {
          // handle error
      })
  )
  .emit(WindowTrigger.atWindowEnd)
  ...
emitter:
  session-window:
    idleTimeout: "5m"
    maximumDuration: "10m"
    keyExtractor: "com.example.Event::getUserId"
    timeConfig:
      eventTimeExtractor: "com.example.Event::getTimestamp"
    aggregateDefinition:
      from: "com.example.Event"
      builder:
        - type: count
        - type: sum (amount)
        - type: min (amount)
        - type: max (amount)
        - type: distinct (id)
    # exceptionHandler: "com.example.MyExceptionHandler"
  triggers:
    - trigger: org.voltdb.stream.plugin.window.api.WindowTrigger#atWindowEnd
      ...

Java dependency management

Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.

<dependency>
    <groupId>org.voltdb</groupId>
    <artifactId>volt-stream-plugin-window-api</artifactId>
    <version>1.8.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-window-api', version: '1.8.0'

Properties

maximumDuration

Defines the session maximum duration. Is calculated from the beginning of the window. A session closes after this period regardless of events arrive or not. When new events arrive they form a new session.

Type: object

keyExtractor

If set the window will aggregate only events with the same key value. The key has to conform to Java equal and hashCode contract.

Type: object

aggregateDefinition

The aggregate is defined by the provided builder Required.

Type: object

timeConfig

Configuration for event-time handling and late-event tolerance. Use it to control how the window determines time and how long it waits before closing when events arrive late.

Type: object

Fields of timeConfig:

timeConfig.eventTimeExtractor

If provided, the event time is sourced from the event itself. This enables event-time bucketing, watermark advancement, lateness detection and delayWindowClose. Without it, timestamps come from the worker's wall clock at the moment of processing.

Recommended whenever events can arrive out of order, may be replayed from history, or whenever correctness depends on the real-world time the event happened (billing, fraud detection, ...).

Type: object

timeConfig.delayWindowClose

This is an optional configuration.

It postpones window closing by the configured duration so that slightly out-of-order or late events still have a chance to land in their intended window.

It is meaningful only when an eventTimeExtractor is configured. Without an extractor, timestamps are the worker's wall clock at the moment of processing and are monotonic, so this setting only adds emission latency without any correctness benefit; leave it at 0 in that case.

It is recommended to keep its value relatively small - for example 1-10 seconds, just to handle disruptions in event arrival, given that the upstream system is still available. This configuration should not be used to mitigate upstream systems downtime.

Once the window is closed, any late events are routed to exception handler.

Type: object

Default value: 0s

timeConfig.idleTimeout

Inactivity timeout after which a window is emitted and closed.

While the pipeline is busy, quiet keys are detected as part of normal batch processing and emitted once they cross this threshold. While the pipeline is fully silent, idle cleanup is driven by the periodic pipeline heartbeat (60s).

If idleTimeout is not set, idle cleanup is disabled and the window only closes via its primary trigger (count, time span, session gap, ...).

The clock used to measure silence is controlled by idleTimeoutClock.

Type: object

timeConfig.idleTimeoutClock

Specifies the clock used to measure inactivity for idleTimeout: - PROCESSING_TIME - silence measured in real time. Quiet keys flush after idleTimeout of real time, regardless of event-time progression. Suitable for live streams where the idle SLA is real time. - EVENT_TIME - silence measured in event time (requires eventTimeExtractor). Quiet keys flush only as the watermark advances by idleTimeout. Suitable for replays / historical processing where results must be deterministic.

Type: object

Supported values: processing_time, event_time.

timeConfig.watermarkStrategy

Specifies the strategy for calculating watermarks. The watermark is monotonic and never moves backward. If a key extractor is configured, watermark progression is tracked independently per key.

The choice is meaningful only when an eventTimeExtractor is configured (timestamps come from events). Without an extractor, timestamps are the worker's wall clock at the moment of processing and arrive monotonically; MIN_EVENT_TIME pins the per-batch watermark to the first event's timestamp and MAX_EVENT_TIME to the last, so the two still differ by roughly the batch's processing duration but the lag is typically negligible for second-scale or larger windows.

Available strategies: - MIN_EVENT_TIME - uses the minimum observed event time in the batch as the watermark. This is the most conservative option and reduces the risk of marking slightly older events as late. - MAX_EVENT_TIME - uses the largest observed event time in the batch as the watermark. This advances the watermark faster, which can close windows sooner, but may classify older events as late earlier.

If you are unsure, use MIN_EVENT_TIME.

Type: object

Supported values: min_event_time, max_event_time.

Default value: MIN_EVENT_TIME

exceptionHandler

Custom exception handler enabling interception of all errors related to this emitter. Type: object

Available Triggers

org.voltdb.stream.api.pipeline.emitter.ExceptionTrigger.onError

When used, an emitter will create a sub stream of all events that were consumed.

org.voltdb.stream.plugin.window.api.WindowTrigger.atWindowEnd

Triggered after window has been closed.

org.voltdb.stream.plugin.window.api.WindowTrigger.atAggregate

Triggered after each event has been aggregated. An emitted aggregate is a snapshot of the aggregate after the event has been applied.

JSON Schema

You can validate or explore the configuration using its JSON Schema.

Metrics

Window metrics

Metric enum: org.voltdb.stream.plugin.window.WindowMetric

Prometheus name Type Description
voltsp_aggregations_unique_keys_total gauge Current number of unique aggregate keys held by a local window.
voltsp_aggregations_created_total counter Number of aggregate windows created.
voltsp_aggregations_emitted_total counter Number of aggregate snapshots emitted by a local window.
voltsp_emitted_total counter Number of events emitted by a local window. This can be errors, original events, late events, etc.
voltsp_idle_keys_total counter Number of idle aggregate keys removed by a local window.
voltsp_late_event_total counter Number of late events observed by a local window.