15.2. The Business Case for Streaming Data

Documentation

VoltDB Home » Documentation » Using VoltDB

15.2. The Business Case for Streaming Data

The streaming features of VoltDB provide a robust and flexible set of capabilities for connecting a VoltDB database to external systems. They can be configured in many different ways. At the most basic, they let you automate the import and export data from a VoltDB database. The following section demonstrate other ways these capabilities can simplify and automate common business processes, including:

15.2.1. Extract, Transform, Load (ETL)

Extract, transform, load (ETL) is a common business pattern where you extract data from a database, restructure and repurpose it, then load into another system. For example, an order processing database might have separate tables for customer data, orders, and product information. When it comes time to ship the order, information from all three tables is needed: the customer ID and product SKU from the order, the name and address from the customer record, and the product name and description from the product table. This information is merged and passed to the shipping management system.

Rather than writing a separate application to perform these tasks, VoltDB lets you integrate them in a single stored procedure. By creating a stream with the appropriate columns for the transformed data and assigning it as an export source and defining a target that matches the shipping management system, you can declare single stored procedure to complete the process:

CREATE STREAM shipping 
   EXPORT TO TARGET shipmgtsystem 
   ( order_number BIGINT,
     prod_sku BIGINT,
     prod_name VARCHAR(64),
     customer_name VARCHAR(64),
     customer_address VARCHAR(128) );
CREATE PROCEDURE shiporder AS
   INSERT INTO shipping SELECT
      o.id, p.sku, p.name, c.name, c.address
      FROM orders AS o, products AS p, customers AS c
      WHERE o.id = ? AND
         o.sku = p.sku  AND o.customer_id = c.id;

15.2.2. Change Data Capture

Change Data Capture is the process of recording all changes to the content of a database. Those changes can then be reused by inserting into another repository for redundancy, logging to a file, merging into another database or whatever the business workflow call for.

VoltDB simplifies change data capture by allowing you to export all or any subset of data changes to a table to any of the available export targets. When you declare a table as an export source with the EXPORT TO TARGET clause you can specify which actions trigger export using ON. Possible triggers are INSERT, UPDATE, UPDATE_NEW, UPDATE_OLD, and DELETE.

INSERT and DELETE are self-explanatory. UPDATE, on the other hand, generates two export records: one for the row before the update and one for the row after the update. To select only one or these records, you can use the actions UPDATE_OLD or UPDATE_NEW.

For change data capture, you can export all changes by specifying ON INSERT, UPDATE, DELETE. For example, the following schema definitions ensure that all data changes for the tables products and orders are exported to the targets offsiteprod and offsiteorder, respectively:

CREATE TABLE products EXPORT TO TARGET offsiteprod
   ON INSERT, UPDATE, DELETE
   [ ... ];
CREATE TABLE orders EXPORT TO TARGET offsiteorder
   ON INSERT, UPDATE, DELETE
   [ ... ];

Note that the built-in connectors include six columns of metadata at the beginning of the export data by default. For change data capture, the most important piece of metadata is the sixth column, with is a single byte value that indicates which action triggered the export. The external target can use this information to determine what to do with the record. The possible values for the operation indicator are shown in Table 15.2, “Export Metadata”.

15.2.3. Streaming Data Validation

VoltDB provides the necessary speed and features to implement an intelligent data pipeline — where information passing through a high performance stream is analyzed, validated and then accepted, rejected, or modified as necessary and passed on to the next stage of the pipeline. In this use case, the data in VoltDB is used as reference for comparison with the influx of data in the pipeline. VoltDB import connectors accept the incoming data, where it is submitted to a stored procedure. The stored procedure analyses the data against the reference tables, then inserts the validated content into a stream which is in turn declared as a source for an export connector that sends it along to its next target.

For example, VoltDB can be inserted into a Kafka pipeline by using:

  • A Kafka import connector as the input

  • A VoltDB stream and a Kafka export connector as the output

  • A stored procedure analyzing the input and inserting it into the stream

The following schema and configuration illustrate a simple example that checks if the data in a Kafka stream matches an existing user account with appropriate funds. The schema uses a reference table (account), a temporary table (incoming), and an export stream (outgoing). Any data matching the requirements is written to the export target; all other incoming data is dropped.

CREATE TABLE incoming
   ( trans_id BIGINT, amount BIGINT, user_id BIGINT );
CREATE STREAM outgoing EXPORT TO TARGET kafka_output 
   ( trans_id BIGINT, amount BIGINT, user_id BIGINT );

CREATE PROCEDURE validate AS 
  BEGIN
    INSERT INTO incoming (?,?,?); 
    INSERT INTO outgoing 
       SELECT i.trans_id, i.amount, i.userid
          FROM incoming AS i, account AS a
          WHERE i.user_id = a.user_id AND a.balance + i.amount > 0;
    TRUNCATE incoming;
  END;
<import>
  <configuration type="kafka">
     <property name="procedure">validate</configuration>
     <property name="brokers">kfkasrc1,kfksrc2</configuration>
     <property name="topics">transactions</configuration>
  </configuration>
</import>

<export>
  <configuration type="kafka" target="kafka_output">
     <property name="bootstrap.servers">kfkdest1,kfkdest2</configuration>
     <property name="topic.key">outgoing.transactions</configuration>
     <property name="skipinternals">true</configuration>
  </configuration>
</export>

15.2.4. Caching

Because of its architecture, VoltDB is excellent at handling high volume transactions. It is not as well suited for ad hoc analytical processing of extremely large volumes of historical data. But sometimes you need both. Caching allows current, high touch content to be accessible from a fast front-end repository while historical, less frequently accessed content is stored in slower, large back-end repositories (such as Hadoop) sometimes called data lakes.

Export, Time To Live (TTL), and automated tasks help automate the use of VoltDB as a hot cache. By declaring tables in VoltDB as export sources to a large back-end repository, any data added to VoltDB automatically gets added to the historical data lake. Once data in VoltDB is no longer "hot", it can be deleted but remains available from larger back-end servers.

In the simplest case, caching can be done by declaring the VoltDB tables with EXPORT TO TARGET and using ON INSERT, UPDATE_NEW so all data changes except deletes are exported to the data lake. You can then manually delete data from VoltDB when it becomes unnecessary in the cache.

CREATE TABLE sessions 
   EXPORT TO TARGET historical ON INSERT, UPDATE_NEW
   ( id BIGINT NOT NULL, 
     login TIMESTAMP, last_access TIMESTAMP [,...] );

To make it easier, VoltDB can automate the process of aging out old data. If the content is time sensitive, you can add USING TTL to the table declaration to automatically delete records once a column exceeds a certain time limit. You specify the reference column and the time limit in the USING TTL clause. For example, if you want to automatically delete any sessions that have not been accessed for more than two hours, you can change the sessions table declaration like so:

CREATE TABLE sessions
   EXPORT TO TARGET historical ON INSERT, UPDATE_NEW
   ( id BIGINT NOT NULL, user_id BIGINT, 
     login TIMESTAMP, last_access TIMESTAMP [,...] )
   USING TTL 2 hours ON COLUMN last_access;

If your expiration criteria is more complex than a single column value, you can use a stored procedure to identify rows that need deleting. To automate this process, you then define a task that executes the stored procedure on a regular basis. For example, if you want to remove sessions more frequently if there is no access after the initial login, you can define a stored procedure GhostSessions to remove inactive sessions, then execute that procedure periodically with the task RemoveGhosts. Note that the actual time limit can be made adjustable by a parameter passed to the task.

CREATE PROCEDURE GhostSessions AS
   DELETE FROM sessions 
     WHERE login = last_access AND DATEADD(MINUTE,?,login) < NOW;
CREATE TASK ON SCHEDULE EVERY 2 MINUTES
   PROCEDURE GhostSessions WITH (20); -- 20 minute limit

15.2.5. Archiving

Archiving is like caching in that older data is maintained in slower, large-scale repositories. The difference is that for archiving, rather than having copies of the current data in both locations, data is not moved to the archive until after it's usefulness in VoltDB expires.

You could simply export the data when you delete it from the VoltDB database. But since export is asynchronous, there will be a short period of time when the data is neither in VoltDB or in the archive. To avoid this situation, you can use migration rather than export, which ensures the data is not deleted from VoltDB until the export target acknowledges receipt of the migrated content.

For example, if we are archiving orders, we can include the MIGRATE TO TARGET clause in the table definition and then use the MIGRATE statement instead of DELETE to clear the records from VoltDB:

CREATE TABLE orders MIGRATE TO TARGET archive
     [ . . . ];

If you are archiving records based on age, you can use MIGRATE TO TARGET with USING TTL to automatically migrate the table rows once a specific column in the table expires. Used alone, USING TTL simply deletes records; used with MIGRATE TO TARGET it initiates a migration for the expired records:

CREATE TABLE orders MIGRATE TO TARGET archive
     [ . . . ]
   USING TTL 30 DAYS ON COLUMN order_completed;