15.4. VoltDB Import Connectors

Documentation

VoltDB Home » Documentation » Using VoltDB

15.4. VoltDB Import Connectors

Just as VoltDB can export data from selected streams and tables to external targets, it supports importing data into selected tables from external sources. Import works in two ways:

  • Bulk loading data using one of several standalone utilities VoltDB provides. These data loaders support multiple standard input protocols and can be run from any server, even remotely from the database itself.

  • Streaming import as part of the database server process. For data that is imported on an ongoing basis, use of the built-in import functionality ensures that import starts and stops with the database.

The following sections discuss these two approaches to data import.

15.4.1. Bulk Loading Data Using VoltDB Standalone Utilities

Often, when migrating data from one database to another or when pre-loading a set of data into VoltDB as a starting point, you just want to perform the import once and then use the data natively within VoltDB. For these one-time uses, or when you prefer to manage the import process externally, VoltDB provides separate data loader utilities.

Each data loader supports a different source format. You can load data from text files — such as comma-separated value (CSV) files — using the csvloader utility. You can load data from another JDBC-compliant database using the jdbcloader utility. Or you can load data from a streaming message service with the Kafka loader utility, kafkaloader.

All of the data loaders operate in much the same way. For each utility you specify the source for the import and either a table that the data will be loaded into or a stored procedure that will be used to load the data. So, for example, to load records from a CSV file named staff.csv into the table EMPLOYEES, the command might be the following:

$ csvloader employees --file=staff.csv

If instead you are copying the data from a JDBC-compliant database, the command might look like this:

$ jdbcloader employees \
     --jdbcurl=jdbc:postgresql://remotesvr/corphr \
     --jdbctable=employees \
     --jdbcdriver=org.postgresql.Driver

Each utility has arguments unique to the data source (such as --jdbcurl) that allow you to properly configure and connect to the source. See the description of each utility in Appendix D, VoltDB CLI Commands for details.

15.4.2. Streaming Import Using Built-in Import Features

If importing data is an ongoing business process, rather than a one-time event, then it is desirable to make it an integral part of the database system. This can be done by building a custom application to push data into VoltDB using one of its standard APIs, such as the JDBC interface. Or you can take advantage of VoltDB's built-in import infrastructure.

The built-in importers work in much the same way as the data loading utilities, where incoming data is written into one or more database tables using an existing stored procedure. The difference is that the built-in importers start automatically whenever the database starts and stop when the database stops, making import an integral part of the database process.

You configure the built-in importers in the configuration file the same way you configure export connections. Within the <import> element, you declare each import stream using separate <configuration> elements. Within the <configuration> tag you use attributes to specify the type and format of data being imported and whether the import configuration is enabled or not. Then enclosed within the <configuration> tags you use <property> elements to provide information required by the specific importer and/or formatter. For example:

<import>
  <configuration type="kafka" format="csv" enabled="true">
    <property name="brokers">kafkasvr:9092</property>
    <property name="topics">employees</property>
    <property name="procedure">EMPLOYEE.insert</property>
  </configuration>
</import>

When the database starts, the import infrastructure starts any enabled configurations. If you are importing multiple streams to separate tables through separate procedures, you must include multiple configurations, even if they come from the same source. For example, the following configuration imports data from two Kafka topics from the same Kafka servers into separate VoltDB tables.

<import>
  <configuration type="kafka" enabled="true">
    <property name="brokers">kafkasvr:9092</property>
    <property name="topics">employees</property>
    <property name="procedure">EMPLOYEE.insert</property>
  </configuration>
  <configuration type="kafka" enabled="true">
    <property name="brokers">kafkasvr:9092</property>
    <property name="topics">managers</property>
    <property name="procedure">MANAGER.insert</property>
  </configuration>
</import>

VoltDB currently provides support for two types of import:

VoltDB also provides support for two import formats: comma-separated values (csv) and tab-separated values (tsv). Comma-separated values are the default format. So if you are using CSV-formatted input, you can leave out the format attribute, as in the preceding example.

The following sections describe each of the importers and the CSV/TSV formatter in more detail.

15.4.3. The Kafka Importer

The Kafka importer connects to the specified Kafka messaging service and imports one or more Kafka topics and writes the records into the VoltDB database. The data is decoded according to the specified format — comma-separated values (CSV) by default — and is inserted into the VoltDB database using the specified stored procedure.

The Kafka importer supports Kafka version 0.10 or later. You must specify at least the following properties for each configuration:

  • brokers — Identifies one or more Kafka brokers. That is, servers hosting the Kafka service and desired topics. Specify a single server or a comma-separated list of brokers.

  • topics — Identifies the Kafka topics that will be imported. The property value can be a single topic name or a comma-separated list of topics.

  • procedure — Identifies the stored procedure that is invoked to insert the records into the VoltDB database.

When import starts, the importer first checks to make sure the specified stored procedure exists in the database schema. If not (for example, when you first create a database and before a schema is loaded), the importer issues periodic warnings to the console.

Once the specified stored procedure is declared, the importer looks for the specified Kafka brokers and topics. If the specified brokers cannot be found or the specified topics do not exist on the brokers, the importer reports an error and stops. You will need to restart import once this error condition is corrected. You can restart import using any of the following methods:

  • Stop and restart the database

  • Pause and resume the database using the voltadmin pause and voltadmin resume commands

  • Update the configuration using the voltadmin update command or the web-based VoltDB Management Center

If the brokers are found and the topics exist, the importer starts fetching data from the Kafka topics and submitting it to the stored procedure to insert into the database. In the simplest case, you can use the default insert procedure for a table to insert records into a single table. For more complex data you can write your own import stored procedure to interpret the data and insert it into the appropriate table(s).

Table 15.7, “Kafka Import Properties” lists the allowable properties for the Kafka importer. You can also specify properties associated with the formatter, as described in Table 15.9, “CSV and TSV Formatter Properties”.

Table 15.7. Kafka Import Properties

PropertyAllowable ValuesDescription
brokers*stringA comma-separated list of Kafka brokers.
procedure*stringThe stored procedure to invoke to insert the incoming data into the database.
topics*stringA comma-separated list of Kafka topics.
commit.policyinteger

Because the importer performs two distinct tasks — retrieving records from Kafka and then inserting them into VoltDB — Kafka's automated tracking of the current offset may not match what records are successfully inserted into the database. Therefore, by default, the importer uses a manual commit policy to ensure the Kafka offset matches the completed inserts.

Use of the default commit policy is recommended. However, you can, if you choose, use Kafka's automated commit policy by specifying a commit interval, in milliseconds, using this property.

groupidstring

A user-defined name for the group that the client belongs to. Kafka maintains a single pointer for the current position within the stream for all clients in the same group.

The default group ID is "voltdb". In the rare case where you have two or more databases importing data from the same Kafka brokers and topics, be sure to set this property to give each database a unique group ID and avoid the databases interfering with each other.

fetch.max.bytes
heartbeat.interval.ms
max.partition.fetch.bytes
max.poll.interval.ms
max.poll.records
request.timeout.ms
session.timeout.ms

stringThese Kafka consumer properties are supported as import properties. See the Kafka 0.11 documentation for details.

*Required


15.4.4. The Kinesis Importer

The Kinesis importer connects to the specified Amazon Kinesis stream and writes the records into the VoltDB database. Kinesis streams let you aggregate data from multiple sources, such as click streams and media feeds, which is then pushed as streaming data to the application. The VoltDB Kinesis importer acts as a target application for the Kinesis Stream. The data is decoded according to the specified format — comma-separated values (CSV) by default — and is inserted into the VoltDB database using the specified stored procedure.

When import starts, the importer first checks to make sure the specified stored procedure exists in the database schema. If not (for example, when you first create a database and before a schema is loaded), the importer issues periodic warnings to the console.

Once the specified stored procedure is declared, the importer looks for the specified Kinesis stream. If the stream cannot be found or accessed (for example, if the keys don't match), the importer reports an error and stops. You will need to restart import once this error condition is corrected. You can restart import using any of the following methods:

  • Stop and restart the database

  • Pause and resume the database using the voltadmin pause and voltadmin resume commands

  • Update the configuration using the voltadmin update command or the web-based VoltDB Management Center

If the stream is found and can be accessed, the importer starts fetching data and submitting it to the stored procedure to insert into the database. In the simplest case, you can use the default insert procedure for a table to insert records into a single table. For more complex data you can write your own import stored procedure to interpret the data and insert it into the appropriate table(s).

Table 15.8, “Kinesis Import Properties” lists the allowable properties for the Kinesis importer. You can also specify properties associated with the formatter, as described in Table 15.9, “CSV and TSV Formatter Properties”.

Table 15.8. Kinesis Import Properties

PropertyAllowable ValuesDescription
app.name*stringA user-defined name that is used by Kinesis to track the application's current position in the stream.
procedure*stringThe stored procedure to invoke to insert the incoming data into the database.
region*stringThe Amazon region where the Kinesis stream service is running.
stream.name*stringThe name of the Kinesis stream.
access.key*stringThe Amazon access key for permitting access to the stream.
secret.key*stringThe Amazon secret key for permitting access to the stream.
​max.read.batch.sizeintegerThe maximum number of records to read in a single batch. The default batch size is size 10,000 records.

*Required