Joins in VoltSP¶
Joins in Stream Processing¶
In stream processing, a join is an operation that combines two or more data streams based on a common key. While conceptually similar to joins in relational databases, streaming joins introduce several challenges:
- Data Arrival Latency: What happens if one side of the join arrives much later than the other? The system must often wait or maintain state for both sides.
- State Management: The system needs to keep track of all relevant keys and update the joined result as new data arrives.
- Retention & Eviction: How long should a key be kept in memory or storage waiting for its match? Unbounded state can lead to resource exhaustion.
The VoltSP Approach¶
VoltSP does not provide a built-in DSL or specialized API for join operations within the streaming engine itself. Instead, joins are performed in VoltDB using stored procedures that execute actual SQL joins or utilize materialized views.
This approach follows the core VoltSP philosophy: workers remain stateless, and stateful coordination is delegated to the database.
Single Source vs. Multi-Source¶
In most scenarios, different event types (such as Order and Payment) arrive through a
single source, for example, a shared Kafka topic. This is the primary use case we address
here. However, if your data originates from multiple distinct sources, you can achieve a
join by creating separate pipelines for each source. Each pipeline ingests data into the
same set of VoltDB tables, where the stored procedure performs the join logic across all
incoming data.
The "Join Procedure" Pattern¶
Instead of a built-in join engine, VoltSP leverages the power of VoltDB's stored procedures to implement a custom "Join Procedure" pattern. This pattern is something you, as the developer, create to handle the specific join logic required by your application.
A Join Procedure is a standard VoltDB stored procedure designed to handle multiple types of input events. It typically uses a flag or an event type field to determine how to process the incoming record.
Example logic in a custom Join Procedure:
1. Receive input event and its type (e.g., isEventTypeA).
2. Insert the event into the corresponding table (e.g., TABLE_A or TABLE_B).
3. Perform the join logic (either via SQL or by querying a view).
4. Return the joined result or emit it further down the pipeline.
Crucial Requirement
To ensure high performance and atomicity, both tables involved in the join MUST be partitioned on the same key (the join key). This allows the procedure to run as a single-partition transaction.
Implementation Scenarios¶
Join Without Aggregation¶
If you need to produce a joined record as soon as a match is found, use a standard SQL join within the procedure.
After inserting the incoming event into the corresponding table, execute a standard SQL join to combine the data as needed. If a match exists, the procedure returns the combined data. If not, it returns an empty result. This naturally handles cases where one side of the join has not arrived yet.
Performance Tip
Ensure both tables have proper indexes on the join key. If you have
additional filters in a WHERE clause, include those columns in the index as well.
Join With Aggregation¶
When you need to join streams and compute aggregates (e.g., sum of payments for an order), Materialized Views are the preferred approach.
Instead of manual joins, you set up a materialized view in VoltDB that defines the join and the aggregation. Because VoltDB materialized views are updated incrementally, they are extremely fast.
In the Join Procedure: 1. Insert the incoming event into the corresponding table (this will automatically update the materialized view). 2. Query the materialized view for the updated aggregate for that specific key. 3. Return the result.
Performance Tip
The materialized view should have an index on the columns used for
filtering (usually the join key). The base tables should have indexes that support the view's GROUP BY clause.
Advanced Optimizations¶
Since you have full control over the Join Procedure, you can implement custom logic that traditional streaming engines might struggle with:
Early Validation and Filtering¶
Validate and filter data before inserting it into the tables. If a record doesn't meet
certain criteria, discard it immediately to avoid unnecessary writes and view/table updates.
In some cases, performing a SELECT before an INSERT can be beneficial—for instance,
checking if the record is already processed or if it meets specific business rules based on
current state. This can be faster than blindly inserting and then handling conflicts or
cleanup.
Avoiding Redundant Updates¶
If you are calculating something like a MAX value, and the incoming value is smaller than
the current maximum, you might choose to skip the insert entirely if the historical data
isn't needed for anything else. A preliminary SELECT can determine if the new data
actually contributes to the aggregate, saving the cost of an INSERT and subsequent view
updates.
The "Pending Buffer" Pattern (N-to-1 Joins)¶
For 1-to-1 or N-to-1 joins, you can replace automatic database joins with a manual buffering mechanism to eliminate physical join overhead. This is particularly useful when you need to manually calculate an aggregate using data from two different tables.
Since records can arrive in any order, you must buffer data until a complete pair (or set) is formed. This is why we use a "Pending" state. Once both sides of the join are present, you can manually combine them and update a result table.
- The Buffer: Use a temporary table to buffer records that arrive before their corresponding join record.
- Join Record Arrival: When the join record arrives, scan the temporary table for matching records, process them, and update the final aggregate.
- Post-Join Processing: Records arriving after the join record are processed immediately and aggregated, bypassing the buffer.
This pattern converts a complex join into simple inserts and fast scans of small buffer tables, significantly improving throughput by eliminating heavy join plans.
Performance Tip
Alternatively, you can insert the manually joined record
into a VoltDB STREAM, which then incrementally updates a Materialized View
configured on that stream.
State Retention and Eviction¶
To manage the size of your join state and ensure that only relevant, recent data is kept, use VoltDB's TTL (Time-To-Live) feature.
By applying a TTL to your base tables (and potentially views), VoltDB automatically deletes old data based on a timestamp. This handles data eviction efficiently, ensuring your storage doesn't grow indefinitely and your join operations only consider records that fall within the retention period.