Skip to content

Joins in VoltSP

Joins in Stream Processing

In stream processing, a join is an operation that combines two or more data streams based on a common key. While conceptually similar to joins in relational databases, streaming joins introduce several challenges:

  • Data Arrival Latency: What happens if one side of the join arrives much later than the other? The system must often wait or maintain state for both sides.
  • State Management: The system needs to keep track of all relevant keys and update the joined result as new data arrives.
  • Retention & Eviction: How long should a key be kept in memory or storage waiting for its match? Unbounded state can lead to resource exhaustion.

The VoltSP Approach

VoltSP does not provide a built-in DSL or specialized API for join operations within the streaming engine itself. Instead, joins are performed in VoltDB using stored procedures that execute actual SQL joins or utilize materialized views.

This approach follows the core VoltSP philosophy: workers remain stateless, and stateful coordination is delegated to the database.

Single Source vs. Multi-Source

In most scenarios, different event types (such as Order and Payment) arrive through a single source, for example, a shared Kafka topic. This is the primary use case we address here. However, if your data originates from multiple distinct sources, you can achieve a join by creating separate pipelines for each source. Each pipeline ingests data into the same set of VoltDB tables, where the stored procedure performs the join logic across all incoming data.

The "Join Procedure" Pattern

Instead of a built-in join engine, VoltSP leverages the power of VoltDB's stored procedures to implement a custom "Join Procedure" pattern. This pattern is something you, as the developer, create to handle the specific join logic required by your application.

A Join Procedure is a standard VoltDB stored procedure designed to handle multiple types of input events. It typically uses a flag or an event type field to determine how to process the incoming record.

Example logic in a custom Join Procedure: 1. Receive input event and its type (e.g., isEventTypeA). 2. Insert the event into the corresponding table (e.g., TABLE_A or TABLE_B). 3. Perform the join logic (either via SQL or by querying a view). 4. Return the joined result or emit it further down the pipeline.

Crucial Requirement

To ensure high performance and atomicity, both tables involved in the join MUST be partitioned on the same key (the join key). This allows the procedure to run as a single-partition transaction.

Implementation Scenarios

Join Without Aggregation

If you need to produce a joined record as soon as a match is found, use a standard SQL join within the procedure.

After inserting the incoming event into the corresponding table, execute a standard SQL join to combine the data as needed. If a match exists, the procedure returns the combined data. If not, it returns an empty result. This naturally handles cases where one side of the join has not arrived yet.

Performance Tip

Ensure both tables have proper indexes on the join key. If you have additional filters in a WHERE clause, include those columns in the index as well.

Join With Aggregation

When you need to join streams and compute aggregates (e.g., sum of payments for an order), Materialized Views are the preferred approach.

Instead of manual joins, you set up a materialized view in VoltDB that defines the join and the aggregation. Because VoltDB materialized views are updated incrementally, they are extremely fast.

In the Join Procedure: 1. Insert the incoming event into the corresponding table (this will automatically update the materialized view). 2. Query the materialized view for the updated aggregate for that specific key. 3. Return the result.

Performance Tip

The materialized view should have an index on the columns used for filtering (usually the join key). The base tables should have indexes that support the view's GROUP BY clause.

Advanced Optimizations

Since you have full control over the Join Procedure, you can implement custom logic that traditional streaming engines might struggle with:

Early Validation and Filtering

Validate and filter data before inserting it into the tables. If a record doesn't meet certain criteria, discard it immediately to avoid unnecessary writes and view/table updates. In some cases, performing a SELECT before an INSERT can be beneficial—for instance, checking if the record is already processed or if it meets specific business rules based on current state. This can be faster than blindly inserting and then handling conflicts or cleanup.

Avoiding Redundant Updates

If you are calculating something like a MAX value, and the incoming value is smaller than the current maximum, you might choose to skip the insert entirely if the historical data isn't needed for anything else. A preliminary SELECT can determine if the new data actually contributes to the aggregate, saving the cost of an INSERT and subsequent view updates.

The "Pending Buffer" Pattern (N-to-1 Joins)

For 1-to-1 or N-to-1 joins, you can replace automatic database joins with a manual buffering mechanism to eliminate physical join overhead. This is particularly useful when you need to manually calculate an aggregate using data from two different tables.

Since records can arrive in any order, you must buffer data until a complete pair (or set) is formed. This is why we use a "Pending" state. Once both sides of the join are present, you can manually combine them and update a result table.

  1. The Buffer: Use a temporary table to buffer records that arrive before their corresponding join record.
  2. Join Record Arrival: When the join record arrives, scan the temporary table for matching records, process them, and update the final aggregate.
  3. Post-Join Processing: Records arriving after the join record are processed immediately and aggregated, bypassing the buffer.

This pattern converts a complex join into simple inserts and fast scans of small buffer tables, significantly improving throughput by eliminating heavy join plans.

Performance Tip

Alternatively, you can insert the manually joined record into a VoltDB STREAM, which then incrementally updates a Materialized View configured on that stream.

State Retention and Eviction

To manage the size of your join state and ensure that only relevant, recent data is kept, use VoltDB's TTL (Time-To-Live) feature.

By applying a TTL to your base tables (and potentially views), VoltDB automatically deletes old data based on a timestamp. This handles data eviction efficiently, ensuring your storage doesn't grow indefinitely and your join operations only consider records that fall within the retention period.