Chapter 15. Streaming Data: Import, Export, and Migration

Documentation

VoltDB Home » Documentation » Using VoltDB

Chapter 15. Streaming Data: Import, Export, and Migration

Earlier chapters discuss features of VoltDB as a standalone component of your business application. But like most technologies, VoltDB is often used within a diverse and heterogeneous computing ecosystem where it needs to "play well" with other services This chapter describes features of VoltDB that help integrate it with other databases, systems, and applications to simplify, automate, and speed up your business processes.

Just as VoltDB as a database aims to provide the optimal transaction throughput, VoltDB as a data service aims to efficiently and reliably transfer data to and from other services. Of course, you can always write custom code to integrate VoltDB into your application environment, calling stored procedures to move data in and out of the database. However, the VoltDB feature set simplifies and automates the process of streaming data into, out of, and through VoltDB allowing your application to focus on the important work of analyzing, processing, and modifying the data in flight through secure, reliable transactions. To make this possible, VoltDB introduces five key concepts:

  • Streams

  • Import

  • Export

  • Migration

  • Topics

Streams operate much like regular database tables. You define them with a CREATE statement like tables, they consist of columns and you insert data into streams the same way you insert data into tables using the INSERT statement. You can define views that aggregate the data as it passes through the stream. Interactions with streams within a stored procedure are transactional just like tables. The only difference is a stream does not store any data in the database. This allows you to use all the consistency and reliability of a transactional database and the familiar syntax of SQL to manage data "in flight" without necessarily having to save it to persistent storage. Of course, since there is no storage associated with streams, they are for INSERT only. Any attempt to SELECT, UPDATE, or DELETE data from a stream results in an error.

Import automates the process of pulling data from external sources and inserting it into the database workflow through the same stored procedures your applications use. The import connectors are declared as part of the database configuration and stop and start with the database. The key point being that the database manages the entire import process and ensures the durability of the data while it is within VoltDB. Alternately, you can use one of the VoltDB data loading utilities to push data into the VoltDB database from a variety of sources.

Export automates the reverse process from import: it manages copying any data written to an export table or stream and sending it to the associated external target, whether it be a file, a service such as Kafka, or another database. The export targets are defined in the database configuration file, while the connection of a table or stream to it specific export target is done in the data definition language (DDL) CREATE statement using the EXPORT TO TARGET clause.

Topics are similar to import and export in that topics let you stream data into and out of the VoltDB database. The differences are that a single topic can perform both import and output, there can be multiple consumers and producers for a single topic, and it is the external producers and consumers that control how and when data is transferred rather than VoltDB pulling from and pushing to individual external targets. You identify the stream or table to use for output to the topic by specifying EXPORT TO TOPIC in the CREATE STREAM or CREATE TABLE statement. You then configure the topic, including the stored procedure to use for input, in the configuration file. Another difference between export and topics is that, because topics do not have a single output consumer, there is no single event that determines when the data transfer is complete. Instead, you must define a retention/expiration policy (based on time or size) for when data is no longer needed and can be deleted from the queue.

Migration is a special case where export is more fully integrated into the business workflow. When you define a table or view with the MIGRATE TO... clause instead of EXPORT TO..., data is not deleted from the VoltDB table or view until it is successfully written to the associated target or topic. You trigger a migration of data using an explicit MIGRATE statement or you can declare the schema object with USING TTL to schedule the migration based on a timestamp within the data records and an expiration time defined as the TTL value.

How you configure these features depends on your specific business goals. The bulk of this chapter describes how to declare and configure import, export and migration in detail. The next two sections provide an overview of how data streaming works and how to use these features to perform common business activities.

15.1. How Data Streaming Works in VoltDB

Import associates incoming data with a stored procedure that determines what is done with the data. Export associates a database object (a table or stream) with an external target, where the external target determines how the exported data is handled. But in both cases the handling of streamed data follows three key principles:

  • Interaction with the VoltDB database is transactional, providing the same ACID guarantees as all other transactions.

  • Interaction with the external system occurs as a separate asynchronous process, avoiding any negative impact on the latency of ongoing transactions in the VoltDB database.

  • The VoltDB server takes care of starting and stopping the import and export subsystems when the database starts and stops. The server also takes responsibility for managing streaming data "in flight" — ensuring that no data is lost once it enters the subsystem and before it reaches its final destination.

VoltDB database achieves these goals is by having separate export and import connectors handle the data as it passes from one system to the next as shown in Figure 15.1, “Overview of Data Streaming”.

Figure 15.1. Overview of Data Streaming

Overview of Data Streaming

In the case of topics, there is no specific source or target; multiple producers and consumers can write to and read from the topic. And the stored procedure that receives the incoming data can do whatever you choose with that content: it can write it to the stream as output for the same topic, it can write into other topics, it can write into other database tables, or any combination, providing the ultimate flexibility to meet your business logic needs, as shown in Figure 15.2, “Overview of Topics”.

Figure 15.2. Overview of Topics

Overview of Topics

Which streaming features you use depend on your business requirements. The key point is that orchestrating multiple disparate systems is complex and error prone and the VoltDB streaming services free you from these complexities by ensuring that all operations start and stop automatically as part of the server process, the data in flight is made durable across database sessions, and that all data is delivered at least once or retained until delivery is possible.

The following sections provide an overview of each service. Later sections describe the services and built-in connectors in more detail. You can also define your own custom import and export connectors, as described in the VoltDB Guide to Performance and Customization.

15.1.1. Understanding Import

To import data into VoltDB from an external system you have two options: you can use one of the standard VoltDB data loading utilities (such as csvloader) or you can define an import connector in the database configuration file that associates the external source with a stored procedure. The data loading utilities are standalone external applications that push data into the VoltDB database. VoltDB import connectors use a pull model. In other words, the connector periodically checks the data source to determine if new content is available. If so, the connector retrieves the data and passes it to the stored procedure where it can analyze the data, validate it, manipulate it, insert it into the database, or even pass it along to an export stream; whatever your application needs.

The creation of the import connector is done using the <configuration> tag within the <import> ... </import> element of the configuration file. The attributes of the <configuration> tag specify the type of import connector to use (Kafka, Kinesis, or custom) and, optionally, the input format (CSV by default). The <property> tags within the configuration specify the actual data source, the stored procedure to use as a destination, and any other connector-specific attributes you wish to set.

For example, to process data from a Kafka topic, the connector definition must specify the type (kafka), the addresses of one or more Kafka brokers as the source, the name of the topic (or topics), and the stored procedure to process the data. If the data does not need additional processing, you can use the default stored procedure that VoltDB generates for each table to insert the data directly into the database. The following configuration reads the Kafka topics nyse and nasdaq in CSV format and inserts records into the stocks table using the default insert procedure:

<import>
   <configuration type="kafka" format="csv">
     <property name="brokers">kafkasvr1:9092,kafkasvr2:9092</property>
     <property name="topics">nyse,nasdaq</property>
     <property name="procedure">STOCKS.insert</property>
  </configuration>
</import>

Having the import connectors defined in the configuration file lets VoltDB manage the entire import process, from starting and stopping the connectors to making sure the specified stored procedure exists, fetching the data in batches and ensuring nothing is lost in transit. You can even add, delete, or modify the connector definitions on the fly by updating the database configuration file while the database is running.

VoltDB provides built-in import connectors for Kafka and Kinesis. Section 15.4, “VoltDB Import Connectors” describes these built-in connectors and the required and optional properties for each. Section 15.5, “VoltDB Import Formatters” provides additional information about the input formatters that prepare the incoming data for the stored procedure.

15.1.2. Understanding Export

To export data from VoltDB to an external system you define a database table or stream as the export source by including the EXPORT TO TARGET clause in the DDL definition and associating that data source with a logical target name. For example, to associate the stream alerts with a target called systemlog, you would declare a stream like so:

CREATE STREAM alerts 
   EXPORT TO TARGET systemlog 
   ( {column-definition} [,...] );

For tables, you can also specify when data is queued for export. By default, data inserted into export tables with the INSERT statement (or UPSERT that results in a new record being inserted) is queued to the target, similar to streams. However, you can customize the export to write on any combination of data manipulation language (DML) statements, using the ON clause. For example, to include updates into the export steam, the CREATE TABLE statement might look like this:

CREATE TABLE orders 
   EXPORT TO TARGET orderprocessing ON INSERT, UPDATE
   ( {column-definition} [,...] );

As soon as you declare a stream or table as exporting to a target, any data written to that source (or in the case of tables, the export actions you specified in the CREATE TABLE statement) is queued for the export stream. You associate the named target with a specific connector and external system in the <export> ... </export> section of the database configuration file. Note that you can define the target either before or after declaring the source, and you can add, remove, or modify the export configuration at any time before or after the database is started.

In the configuration file you define the export connector using the <configuration> element, identifying the target name and type of connector to use. Within the <configuration> element you then identify the specific external target to use and any necessary connector-specific attributes in <property> tags. For example, to write export data to files locally on the database servers, you use the file connector and specify attributes such as the file prefix, location, and roll-over frequency as properties:

<export>
   <configuration target="systemlog" type="file">
     <property name="type">csv</property>
     <property name="nonce">syslog</property>
     <property name="period">60</property>
     <!-- roll every hour (60 minutes) -->
  </configuration>
</export>

VoltDB supports built-in connectors for five types of external targets: file, HTTP (including Hadoop), JDBC, Kafka, and Elasticsearch. Each export connector supports different properties specific to that type of target. Section 15.3, “VoltDB Export Connectors” describes the built-in export connectors and the required and optional properties for each.

15.1.3. Understanding Migration

Migration is a special case of export that synchronizes export with the deletion of data in database tables. You can migrate data from either regular database tables or stream views. And you can migrate the data to either an export target or to a topic.

When you migrate a record, VoltDB ensures the data is successfully transmitted to (and acknowledged by) the target before the data is deleted from the database. In the case of export to a target, VoltDB waits until it receives acknowledgement that the data has reached the external system before deleting it. For export to a topic, the data is deleted as soon as it is available to topic consumers. Either way, migration ensures that the data is always available from one of the two systems — it cannot temporarily "disappear" during the move.

You define a VoltDB table as a source of migration using the MIGRATE TO... clause, the same way you define an export source with the EXPORT TO... clause. For example, the following CREATE TABLE statement defines the orders table as a source for migration to the oldorders target connector:

CREATE TABLE orders 
   MIGRATE TO TARGET oldorders
   ( {column-definition} [,...] );

Migration uses the export subsystem to perform the interaction with the external data store. So you can use any of the supported connectors to configure the target of the migration; and you do so the exact same way you do for any other export target. The difference is that rather than exporting the data when it is inserted into the table, the data is exported when you initiate migration.

You trigger migration at run time using the MIGRATE SQL statement and a WHERE clause to identify the specific rows to move. For example, to migrate all of the orders for a specific customer, you could use the following MIGRATE statement:

MIGRATE FROM orders
   WHERE custmer_id = ? AND NOT MIGRATING;

Note the use of NOT MIGRATING. MIGRATING is a special function that identifies all rows that are currently being migrated; that is, where migration (and deletion) has not yet completed. Although not required — VoltDb will skip rows that are already migrating — adding AND NOT MIGRATING to a MIGRATE statement can improve performance by reducing the number of rows evaluated by the expression.

Once the rows are migrated and the external target acknowledges receipt, the rows are deleted from the database.

To further automate the migration of data to external targets, you can use the MIGRATE TO... clause with USING TTL. USING TTL automates the deletion of records based on a TTL value and a TIMESTAMP column in the table. For example, adding the clause USING TTL 12 HOURS ON COLUMN created to a table where the created column defaults to NOW, means that records will be deleted from the table 12 hours after they are inserted. By adding the MIGRATE TO TARGET clause, you can tell VoltDB to migrate the data to the specified target before removing it when its TTL expiration is reached.

CREATE TABLE sessions 
   MIGRATE TO TARGET sessionlog
   ( session_id BIGINT NOT NULL,
     created TIMESTAMP DEFAULT NOW [,...] 
   )
   USING TTL 12 HOURS ON COLUMN created;

15.1.4. Understanding Topics

Topics allow you to integrate both import and export into a single stream. They also allow multiple external producers and consumers to access the topic at the same time, keeping track of where each consumer or group of consumers is in the stream of output.

There are actually two distinct and independent components to a topic that you control separately: input and output. You declare a topic having either or both, depending on the schema and configuration file. The schema associates individual streams or tables with topics and the configuration file defines the properties of the topic, including what stored procedure to use for input. For example, you can declare an output-only topic by specifying the topic in the CREATE STREAM.... EXPORT TO TOPIC statement but specifying no stored procedure in the configuration file. In this case, any records written to the associated stream are queued for output and available to any consumers of the topic:

CREATE STREAM session EXPORT TO TOPIC sessions ... 

If, on the other hand, you specify a stored procedure in the configuration file, records written to the topic by producers invoke the specified procedure passing the message contents (and, optionally, the key) as arguments:

<topics>
   <topic name="sessions" procedure="ProcessSessions"/>
</topics>

If you include both the EXPORT TO TOPIC clause in the CREATE STEAM statement and the procedure attribute in the <topic> element of the configuration file, the topic is available for both input and output. What happens to the data as it passes through VoltDB is up to you. You can simply pass it from producers to consumers by taking the data received by the input procedure and inserting it into the associated stream. Or the stored procedure can filter, modify, or redirect the content as needed. For example, the following data definitions create a topic where the input procedure uses an existing table in the database (users) to fill out additional fields based on the matching username in the incoming records while writing the data to the stream for output:

Schema
CREATE TABLE tempuser ( username VARCHAR(128) NOT NULL);
CREATE TABLE users ( username VARCHAR(128)  NOT NULL,
    country VARCHAR(32), userrank INTEGER);
PARTITION TABLE tempuser on column username;
PARTITION TABLE users on column username;

CREATE STREAM session
  EXPORT TO TOPIC sessions
  PARTITION ON COLUMN username (
    username VARCHAR(128) NOT NULL,
    login TIMESTAMP, country VARCHAR(32), userrank INTEGER);

CREATE PROCEDURE ProcessSessions 
  PARTITION ON TABLE users COLUMN username 
  AS BEGIN
   INSERT INTO tempuser VALUES(CAST(? AS VARCHAR));
   INSERT INTO session SELECT u.username, 
     CAST(? AS TIMESTAMP), u.country, u.userrank
     FROM users AS u, tempuser AS t
     WHERE u.username=t.username;
   TRUNCATE TABLE tempuser;
  END;
Configuration
<topics>
   <topic name="sessions" procedure="ProcessSessions"/>
</topics>

Finally, if you want to create a topic that is not processed but simply flows through VoltDB from producers to consumers, you declare the topic as "opaque" in the configuration file, without either specifying a stored procedure for input or associating a stream with the topic for output.

<topic name="sysmsgs" opaque="true"/>

Opaque topics are useful if you want to have a single set of brokers for all your topics but only need to analyze and process some of the data feeds. Opaque topics let VoltDB handle the additional topics without requiring the stored procedure or stream definitions needed for processed topics.