Package org.voltdb.stream.api.kafka
Class KafkaStreamSourceConfigurator<K,V>
java.lang.Object
org.voltdb.stream.api.kafka.KafkaStreamSourceConfigurator<K,V>
- All Implemented Interfaces:
OperatorConfigurator
,VoltStreamSourceConfigurator<KafkaRequest<K,
V>>
public class KafkaStreamSourceConfigurator<K,V>
extends Object
implements VoltStreamSourceConfigurator<KafkaRequest<K,V>>
this configurator can be autoconfigured using helm's values under streaming.pipeline.configuration.
By default, source configuration is lookup by `kafka.source` config path, for example:
kafka: source: groupId: groupA topicNames: - topicA - topicB bootstrapServers: - serverA - serverB # optional schemaRegistry: url startingOffset: EARLIEST [LATEST | NONE] pollTimeout: PT0.25S maxCommitTimeout: PT10S maxCommitRetries: 3 properties: key1: value1 key2: value2for mor information about java's Duration string format see Duration String Format
-
Method Summary
Modifier and TypeMethodDescriptionconfigures the configurator using custom config path.int
withBootstrapServers
(String bootstrapServers) withExceptionHandler
(KafkaSourceExceptionHandler exceptionHandler) withGroupId
(String groupId) <KEY> KafkaStreamSourceConfigurator
<KEY, V> withKeyDeserializer
(Class<? extends org.apache.kafka.common.serialization.Deserializer<KEY>> deserializerClass) withMaxCommitRetries
(int retries) KafkaSource will try to commit an offset and will retry this operation blocking its worker thread, but not more than this retry value.withMaxCommitTimeout
(Duration timeout) KafkaSource will try to commit an offset and will retry this operation blocking its worker thread, but not longer than this timeout value.withPollTimeout
(Duration pollTimeout) Defines timeout for consumer's poll call, default is 10 seconds.withProperty
(String key, boolean value) withProperty
(String key, int value) withProperty
(String key, long value) withProperty
(String key, String value) withSchemaRegistryUrl
(String schemaUrl) withSSL
(KafkaStreamSslConfiguration sslConfigurator) withStartingOffset
(KafkaStartingOffset startingOffsets) withTopicNames
(String topicNames) Define topics names used by this kafka source<VALUE> KafkaStreamSourceConfigurator
<K, VALUE> withValueDeserializer
(Class<? extends org.apache.kafka.common.serialization.Deserializer<?>> deserializerClass, Class<VALUE> deserializedType) <VALUE> KafkaStreamSourceConfigurator
<K, VALUE> withValueDeserializer
(Class<? extends org.apache.kafka.common.serialization.Deserializer<VALUE>> deserializerClass) Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface org.voltdb.stream.api.extension.OperatorConfigurator
configure, getConfiguration
-
Method Details
-
aConsumer
-
withGroupId
-
withTopicNames
Define topics names used by this kafka source- Parameters:
topicNames
- comma separated topic names- Returns:
-
withSchemaRegistryUrl
-
withPollTimeout
Defines timeout for consumer's poll call, default is 10 seconds.- Parameters:
pollTimeout
- value- Returns:
- configurator
-
withMaxCommitRetries
KafkaSource will try to commit an offset and will retry this operation blocking its worker thread, but not more than this retry value. Note if eventually commit fails this kafka source will halt not being able to make progress.- Parameters:
retries
- how many times this source will try to commit an offset- Returns:
- configurator
-
withMaxCommitTimeout
KafkaSource will try to commit an offset and will retry this operation blocking its worker thread, but not longer than this timeout value. Note if eventually commit fails this kafka source will halt not being able to make progress.- Parameters:
timeout
- how much time this source will wait before try to commit an offset again- Returns:
- configurator
-
withProperty
-
withProperty
-
withProperty
-
withProperty
-
withBootstrapServers
-
withSSL
-
withKeyDeserializer
public <KEY> KafkaStreamSourceConfigurator<KEY,V> withKeyDeserializer(Class<? extends org.apache.kafka.common.serialization.Deserializer<KEY>> deserializerClass) -
withValueDeserializer
public <VALUE> KafkaStreamSourceConfigurator<K,VALUE> withValueDeserializer(Class<? extends org.apache.kafka.common.serialization.Deserializer<VALUE>> deserializerClass) -
withValueDeserializer
public <VALUE> KafkaStreamSourceConfigurator<K,VALUE> withValueDeserializer(Class<? extends org.apache.kafka.common.serialization.Deserializer<?>> deserializerClass, Class<VALUE> deserializedType) -
withStartingOffset
-
withExceptionHandler
public KafkaStreamSourceConfigurator<K,V> withExceptionHandler(KafkaSourceExceptionHandler exceptionHandler) -
getGroupId
-
getMaxCommitRetries
public int getMaxCommitRetries() -
getMaxCommitTimeout
-
getExceptionHandler
-
configure
configures the configurator using custom config path. for example having a yaml filemy: custom: source: kafka: broker: A,B,C topic: T other: key: value
a path is "my.custom.source.kafka", this will return a map of keys and values. the default path is "kafka.source" Note for more info see helm configuration under streaming.pipeline.configuration.kafka.source- Parameters:
path
- to a config- Returns:
- kafka configurator
-