The Kafka importer connects to the specified Kafka messaging service and imports one or more Kafka topics and writes the records into the VoltDB database. The data is decoded according to the specified format — comma-separated values (CSV) by default — and is inserted into the VoltDB database using the specified stored procedure.
The Kafka importer supports both Kafka versions 0.8 and 0.10 or later. You can specify the version of Kafka to use
with the version
attribute of the <configuration>
tag. The default version is "10" (Kafka 0.10.2), which supports kafka
version 0.10.2 and later, including version 1.0.0. To use the earlier version 0.8.2 of Kafka, specify version="8"
. For example:
<configuration type="kafka" version="8" enabled="true">
For either version, you must specify at least the following properties for each configuration:
brokers — Identifies one or more Kafka brokers. That is, servers hosting the Kafka service and desired topics. Specify a single server or a comma-separated list of brokers.
topics — Identifies the Kafka topics that will be imported. The property value can be a single topic name or a comma-separated list of topics.
procedure — Identifies the stored procedure that is invoked to insert the records into the VoltDB database.
When import starts, the importer first checks to make sure the specified stored procedure exists in the database schema. If not (for example, when you first create a database and before a schema is loaded), the importer issues periodic warnings to the console.
Once the specified stored procedure is declared, the importer looks for the specified Kafka brokers and topics. If the specified brokers cannot be found or the specified topics do not exist on the brokers, the importer reports an error and stops. You will need to restart import once this error condition is corrected. You can restart import using any of the following methods:
Stop and restart the database
Pause and resume the database using the voltadmin pause and voltadmin resume commands
Update the configuration using the voltadmin update command or the web-based VoltDB Management Center
If the brokers are found and the topics exist, the importer starts fetching data from the Kafka topics and submitting it to the stored procedure to insert into the database. In the simplest case, you can use the default insert procedure for a table to insert records into a single table. For more complex data you can write your own import stored procedure to interpret the data and insert it into the appropriate table(s).
Table 15.7, “Kafka Import Properties” lists the allowable properties for the Kafka importer. You can also specify properties associated with the formatter, as described in Table 15.9, “CSV and TSV Formatter Properties”.
Table 15.7. Kafka Import Properties
Property | Allowable Values | Description |
---|---|---|
brokers* | string | A comma-separated list of Kafka brokers. |
procedure* | string | The stored procedure to invoke to insert the incoming data into the database. |
topics* | string | A comma-separated list of Kafka topics. |
commit.policy | integer | Because the importer performs two distinct tasks — retrieving records from Kafka and then inserting them into VoltDB — Kafka's automated tracking of the current offset may not match what records are successfully inserted into the database. Therefore, by default, the importer uses a manual commit policy to ensure the Kafka offset matches the completed inserts. Use of the default commit policy is recommended. However, you can, if you choose, use Kafka's automated commit policy by specifying a commit interval, in milliseconds, using this property. |
groupid | string | A user-defined name for the group that the client belongs to. Kafka maintains a single pointer for the current position within the stream for all clients in the same group. The default group ID is "voltdb". In the rare case where you have two or more databases importing data from the same Kafka brokers and topics, be sure to set this property to give each database a unique group ID and avoid the databases interfering with each other. |
fetch.message.max.bytes | string | These Kafka version 0.8-specific consumer properties are supported as import properties when using version="8". See the Kafka 0.8 documentation for details. |
fetch.max.bytes | string | These Kafka version 0.10-specific consumer properties are supported as import properties as import properties when using the default version="10". See the Kafka 0.11 documentation for details. |
*Required |