kafkaloader — Imports data from a Kafka message queue into the specified database table.
kafkaloader table-name [arguments]
The kafkaloader utility loads data from a Kafka message queue and inserts each message as a separate record into the specified database table. Apache Kafka is a distributed messaging service that lets you set up message queues which are written to and read from by "producers" and "consumers", respectively. In the Apache Kafka model, the kafkaloader acts as a "consumer".
When you start the kafkaloader, you must specify at least three arguments:
The database table
The Kafka server to read messages from, specified using the --zookeeper flag
The Kafka "topic" where the messages are stored, specified using the --topic flag
$ kafkaloader --zookeeper=quesvr:2181 --topic=voltdb_customer customer
Note that Kafka does not impose any specific format on the messages it manages. The format of the messages are application specific. In the case of kafkaloader, VoltDB assumes the messages are encoded as standard comma-separated value (CSV) strings, with the values representing the columns of the table in the order listed in the schema definition. Each Kafka message contains a single row to be inserted into the database table.
It is also important to note that, unlike the csvloader which reads a static file, the kafkaloader is reading from a queue where messages can be written at any time, on an ongoing basis. Therefore, the kafkaloader process does not stop when it reads the last message on the queue; instead it continues to monitor the queue and process any new messages it receives. The kafkaloader process will continue to read from the queue until one of the following events occur:
The connection to all of the VoltDB servers is broken and so kafkaloader can no longer access the VoltDB database.
The maximum number of errors (specified by --maxerrors) is reached.
The user explicitly stops the process.
The kafkaloader will not terminate if it loses its connection to the Kafka zookeeper. Therefore, it is important to monitor the Kafka service and restart the kafkaloader if and when the Kafka service is interrupted.
Finally, kafkaloader acks, or acknowledges, receipt of the messages from Kafka as soon as they are read from the queue. The messages are then batched for insert into the VoltDB database. This means that the queue messages are acked regardless of whether they are successfully inserted into the database or not. It is also possible messages may be lost if the loader process stops between when the messages are read and the insert transaction is sent to the VoltDB database.
Specifies the number of rows to submit in a batch. By default, rows of input are sent in batches to maximize
overall throughput. You can specify how many rows are sent in each batch using the
The default batch size is 200.
Note that --batch and --flush work together. Whichever limit is reached first triggers an insert to the database.
Specifies a Kafka configuration file that lets you set Kafka consumer properties, such as group.id. The file should contain the names of the properties you want to set, one per line, followed by an equals sign and the desired value. For example:
Specifies the maximum number of seconds before pending data is written to the database. The default flush period is 10 seconds.
If data is inserted into the kafka queue intermittently, there could be a long delay between when data is read
from the queue and when enough records have been read to meet the
--batch limit. The flush value
avoids unnecessary delays in this situation by periodically writing all pending data. If the flush limit is reached,
all pending records are written to the database, even if the
--batch limit has not been
Specifies a configuration file identifying properties for a custom formatter. The file must set the property
formatter to the class for the custom implementation of the Formatter interface. (Note, this is
different than the attribute set when declaring a formatter for a built-in import connector. For the kaflaloader
utility you specify the Formatter class, not the Formatter Factory.) You can also declare additional custom properties
used by the formatter itself. For example:
Before running kafkaloader with a custom formatter, you must define two environment variables: ZK_LIB pointing to the location of the Apache Zookeeper libraries and FORMATTER_LIB pointing to the location of your custom formatter JAR file. See the chapter on "Custom Importers,Exporters, and Formatters" in the VoltDB Guide to Performance and Customization for more information about using custom formatters.
Specifies the target number of input errors before kafkaloader stops processing input. Once kafkaloader encounters the specified number of errors while trying to insert rows, it will stop reading input and end the process.
The default maximum error count is 100. Since kafka import can be an persistent process, you can avoid having input errors cancel ongoing import by setting the maximum error count to zero, which means that the loader will continue to run no matter how many input errors are generated.
Specifies the password to use when connecting to the database. You must specify a username and password if security is enabled for the database. If you specify a username with the --user argument but not the --password argument, VoltDB prompts for the password. This is useful when writing shell scripts because it avoids having to hardcode passwords as plain text in the script.
Specifies the network port to use when connecting to the database. If you do not specify a port, kafkaloader uses the default client port 21212.
Specifies a stored procedure to use for loading each record from the data file. The named procedure must exist in the database schema and must accept the fields of the data record as input parameters. By default, kafkaloader uses a custom procedure to batch multiple rows into a single insert operation. If you explicitly name a procedure, batching does not occur.
Specifies the network address of one or more nodes of a database cluster. By default, kafkaloader attempts to insert the data into a database on the local system (localhost). To load data into a remote database, use the --servers argument to specify the database nodes the loader should connect to.
Specifies the use of SSL encryption when communicating with the server. Only necessary if the cluster is configured to use SSL encryption for the external ports. See Section D, “Using CLI Commands with SSL” for more information.
Specifies that existing records with a matching primary key are updated, rather than being rejected. By default, kafkaloader attempts to create new records. The --update flag lets you load updates to existing records — and create new records where the primary key does not already exist. To use --update, the table must have a primary key.
Specifies the username to use when connecting to the database. You must specify a username and password if security is enabled for the database.
Specifies the network address of the Kafka Zookeeper instance to connect to. The Kafka service must be running Kafka 0.8.
The following example starts the kafkaloader to read messages from the voltdb_customer topic on the Kafka server quesvr:2181, inserting the resulting records into the CUSTOMER table in the VoltDB cluster that includes the servers dbsvr1, dbsvr2, and dbsvr3. The process will continue, regardless of errors, until connection to the VoltDB database is lost or the user explicitly ends the process.
$ kafkaloader --maxerrors=0 customer \ --zookeeper=quesvr:2181 --topic=voltdb_customer \ --servers=dbsvr1,dbsvr2,dbsvr3