kafkaloader

Documentation

VoltDB Home » Documentation » Using VoltDB

kafkaloader

kafkaloader — Imports data from a Kafka message queue into the specified database table.

Synopsis

kafkaloader table-name [arguments]

Description

The kafkaloader utility loads data from a Kafka message queue and inserts each message as a separate record into the specified database table. Apache Kafka is a distributed messaging service that lets you set up message queues which are written to and read from by "producers" and "consumers", respectively. In the Apache Kafka model, the kafkaloader acts as a "consumer".

When you start the kafkaloader, you must specify at least three arguments:

  • The database table

  • The Kafka server to read messages from, specified using the --brokers flag

  • The Kafka "topic" where the messages are stored, specified using the --topic flag

For example:

$ kafkaloader --brokers=quesvr:2181 --topic=voltdb_customer customer

Note that Kafka does not impose any specific format on the messages it manages. The format of the messages are application specific. In the case of kafkaloader, VoltDB assumes the messages are encoded as standard comma-separated value (CSV) strings, with the values representing the columns of the table in the order listed in the schema definition. Each Kafka message contains a single row to be inserted into the database table.

It is also important to note that, unlike the csvloader which reads a static file, the kafkaloader is reading from a queue where messages can be written at any time, on an ongoing basis. Therefore, the kafkaloader process does not stop when it reads the last message on the queue; instead it continues to monitor the queue and process any new messages it receives. The kafkaloader process will continue to read from the queue until one of the following events occur:

  • The maximum number of errors (specified by --maxerrors) is reached.

  • The user explicitly stops the process.

  • If --stopondisconnect is specified and connection to all of the VoltDB servers is broken (that is, kafkaloader can no longer access the VoltDB database).

The kafkaloader will not terminate if it loses its connection to the Kafka zookeeper. Therefore, it is important to monitor the Kafka service and restart the kafkaloader if and when the Kafka service is interrupted. Similarly, the kafkaloader will not stop if it loses connection to the VoltDB database, unless you include the --stopondisconnect argument on the command line.

Arguments

Note

The arguments --servers and --port are deprecated in favor of the new, more flexible argument --host. Also, the argument --zookeeper is deprecated in favor of the new argument --brokers. The deprecated arguments continue to work but may be removed in a future major release.

--batch={integer}

Specifies the number of rows to submit in a batch. By default, rows of input are sent in batches to maximize overall throughput. You can specify how many rows are sent in each batch using the --batch flag. The default batch size is 200.

Note that --batch and --flush work together. Whichever limit is reached first triggers an insert to the database.

-b, -brokers={kafka-broker[:port]}[,...]

Specifies one or more Kafka brokers to connect to. Specify multiple brokers as a comma-separated list. The Kafka service must be running Kafka 0.10.2 or later (including 1.0.0).

-c, --config={file}

Specifies a Kafka configuration file that lets you set Kafka consumer properties, such as group.id. The file should contain the names of the properties you want to set, one per line, followed by an equals sign and the desired value. For example:

group.id=mydb
client.id=myappname
--commitpolicy={interval}

Because the loader performs two distinct tasks — retrieving records from Kafka and then inserting them into VoltDB — Kafka's automated tracking of the current offset may not match what records are successfully inserted into the database. Therefore, by default, the importer uses a manual commit policy to ensure the Kafka offset matches the completed inserts.

Use of the default commit policy is recommended. However, you can, if you choose, use Kafka's automated commit policy by specifying a commit interval, in milliseconds, using this property.

--credentials={properties-file}

Specifies a file that lists the username and password of the account to use when connecting to a database with security enabled. This is useful when writing shell scripts because it avoids having to hardcode the password as plain text in the script. The credentials file is interpreted as a Java properties file defining the properties username and password. For example:

username: johndoe
password: 4tUn8

Because it is a Java properties file, you must escape certain special characters in the username or password, including the colon or equals sign.

-f, --flush={integer}

Specifies the maximum number of seconds before pending data is written to the database. The default flush period is 10 seconds.

If data is inserted into the kafka queue intermittently, there could be a long delay between when data is read from the queue and when enough records have been read to meet the --batch limit. The flush value avoids unnecessary delays in this situation by periodically writing all pending data. If the flush limit is reached, all pending records are written to the database, even if the --batch limit has not been satisfied.

--formatter={file}

Specifies a configuration file identifying properties for a custom formatter. The file must set the property formatter to the class for the custom implementation of the Formatter interface. (Note, this is different than the attribute set when declaring a formatter for a built-in import connector. For the kaflaloader utility you specify the Formatter class, not the Formatter Factory.) You can also declare additional custom properties used by the formatter itself. For example:

formatter=myformatter.MyFormatter
column_width=12

Before running kafkaloader with a custom formatter, you must define two environment variables: ZK_LIB pointing to the location of the Apache Zookeeper libraries and FORMATTER_LIB pointing to the location of your custom formatter JAR file. See the chapter on "Custom Importers,Exporters, and Formatters" in the VoltDB Guide to Performance and Customization for more information about using custom formatters.

-H, --host={server[:port]}[,...]

Specifies one or more nodes of the database cluster where the records are to be inserted. You can specify servers as a network address or hostname, plus an optional port number. When specifying an IPv6 address, enclose the address (exclusive of the optional colon and port number) in square brackets. By default, kafkaloader attempts to connect to the default client port on the local system (localhost). To load data into a remote database, use the --host argument to specify one or more VoltDB servers the loader should connect to. Once kafkaloader connects to at least one cluster node, it will automatically connect to the other servers in the cluster.

-m, --maxerrors={integer}

Specifies the target number of input errors before kafkaloader stops processing input. Once kafkaloader encounters the specified number of errors while trying to insert rows, it will stop reading input and end the process.

The default maximum error count is 100. Since kafka import can be an persistent process, you can avoid having input errors cancel ongoing import by setting the maximum error count to zero, which means that the loader will continue to run no matter how many input errors are generated.

--maxpollinterval={integer}

Specifies the maximum time (in milliseconds) allowed between polls of the Kafka brokers before Kafka assumes the kafkaloader client has failed and drops it from the client group. The default poll interval is 300 seconds (5 minutes).

--maxpollrecords={integer}

Specifies the maximum number of records fetched in each batch from the kafka brokers. The default maximum is 500 records.

--maxrequesttimeout={integer}

Specifies the maximum length of time (in milliseconds) VoltDB waits for a response from the Kafka brokers before retrying the request or timing out the session. The default time out is 305 seconds (just over 5 minutes).

--maxsessiontimeout={integer}

Specifies the maximum interval between heart beats from the consumer (kafkaloader) and the Kafka brokers before Kafka drops the kafkaloader from the client group identified by group.id. The default time out is 20 seconds.

-n, --consumercount={integer}

Specifies the number of concurrent Kafka consumers kafakloader uses to pull data from the brokers. The default is one consumer.

--password={text}

Specifies the password to use when connecting to the database. You must specify a username and password if security is enabled for the database. If you specify a username with the --user argument but not the --password argument, VoltDB prompts for the password. This is useful when writing shell scripts because it avoids having to hardcode passwords as plain text in the script.

-p, --procedure={procedure-name}

Specifies a stored procedure to use for loading each record from the data file. The named procedure must exist in the database schema and must accept the fields of the data record as input parameters. By default, kafkaloader uses a custom procedure to batch multiple rows into a single insert operation. If you explicitly name a procedure, batching does not occur.

--ssl={ssl-config-file}

Specifies the use of TLS encryption when communicating with the server. Only necessary if the cluster is configured to use TLS encryption for the external ports. See the section called “Using CLI Commands with TLS/SSL” for more information.

--stopondisconnect

Specifies that if connections to all of the VoltDB servers are broken, the kafkaloader process will stop. The kafkaloader connects to servers automatically as the topology of the cluster changes. Normally, if all connections are broken, kafkaloader will periodically attempt to reconnect until the servers come back online. However, you can use this argument to have the loader process stop when the VoltDB cluster becomes unavailable.

-t, --topic={kafka-topic}

Specifies the Kafka topic to read from the Kafka queue.

--update

Specifies that existing records with a matching primary key are updated, rather than being rejected. By default, kafkaloader attempts to create new records. The --update flag lets you load updates to existing records — and create new records where the primary key does not already exist. To use --update, the table must have a primary key.

--user={text}

Specifies the username to use when connecting to the database. You must specify a username and password if security is enabled for the database.

Examples

The following example starts the kafkaloader to read messages from the voltdb_customer topic on the Kafka broker quebkr:9092, inserting the resulting records into the CUSTOMER table in the VoltDB cluster that includes the servers dbsvr1, dbsvr2, and dbsvr3. The process will continue, regardless of errors, until the user explicitly ends the process.

$ kafkaloader --maxerrors=0 customer \
   --brokers=quebkr:2181 --topic=voltdb_customer \
   --host=dbsvr1,dbsvr2,dbsvr3