public class VoltDBKafkaPartitioner
extends org.apache.kafka.clients.producer.internals.DefaultPartitioner
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("bootstrap.servers.voltdb", "localhost:21212");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, VoltDBKafkaPartitioner.class.getName());
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
}
producer.close();
The bootstrap.servers.voltdb
is required to calculate partition id from the key.
Kafka client properties used by the partitioner
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
- If bootstrap.servers.voltdb
is not set
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG
SaslConfigs.SASL_MECHANISM
- Must be PLAIN
SaslConfigs.SASL_JAAS_CONFIG
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG
Modifier and Type | Field and Description |
---|---|
static java.lang.String |
BOOTSTRAP_SERVERS_VOLTDB
Configuration for a VoltDB client to connect to VoltDB cluster: comma separated list of the form server[:port]
|
protected org.voltdb.client.ClientImpl |
m_client |
Constructor and Description |
---|
VoltDBKafkaPartitioner() |
Modifier and Type | Method and Description |
---|---|
void |
close() |
void |
configure(java.util.Map<java.lang.String,?> original) |
protected ClientConfig |
createClientConfig(org.voltdb.client.topics.VoltDBKafkaPartitioner.PartitionConfig configs)
Create ClientConfig for client connection to VoltDB cluster
|
protected void |
loadTopics() |
int |
partition(java.lang.String topic,
java.lang.Object key,
byte[] keyBytes,
java.lang.Object value,
byte[] valueBytes,
org.apache.kafka.common.Cluster cluster)
Use DefaultPartitioner for opaque topics, otherwise use VoltDB hash mechanism for partition calculation.
|
public static final java.lang.String BOOTSTRAP_SERVERS_VOLTDB
Configuration for a VoltDB client to connect to VoltDB cluster: comma separated list of the form server[:port]
protected org.voltdb.client.ClientImpl m_client
public void configure(java.util.Map<java.lang.String,?> original)
configure
in interface org.apache.kafka.common.Configurable
configure
in class org.apache.kafka.clients.producer.internals.DefaultPartitioner
public int partition(java.lang.String topic, java.lang.Object key, byte[] keyBytes, java.lang.Object value, byte[] valueBytes, org.apache.kafka.common.Cluster cluster)
partition
in interface org.apache.kafka.clients.producer.Partitioner
partition
in class org.apache.kafka.clients.producer.internals.DefaultPartitioner
public void close()
close
in interface java.io.Closeable
close
in interface java.lang.AutoCloseable
close
in interface org.apache.kafka.clients.producer.Partitioner
close
in class org.apache.kafka.clients.producer.internals.DefaultPartitioner
protected void loadTopics()
protected ClientConfig createClientConfig(org.voltdb.client.topics.VoltDBKafkaPartitioner.PartitionConfig configs)
Create ClientConfig for client connection to VoltDB cluster
configs
- Configuration properties from KafkaProducer