Pulsar¶
The pulsar sink sends data to an Apache Pulsar topic. It supports a variety of configurations such as specifying
the topic name, service URL, authentication, SSL configuration, and more.
This sink is ideal for integrating with Pulsar for real-time data streaming.
PulsarSinkConfigBuilder.<String>builder()
.withTopicName("persistent://public/default/my-topic")
.withServiceUrl("pulsar://pulsar.example.com:6650")
.withProducerName("my-producer")
.addPropertiesEntry("key1", "value1")
.withSslBuilder(builder -> builder
.withTrustStoreFile("/path/to/truststore.jks")
.withTrustStorePassword("truststore-password")
.withKeyStoreFile("/path/to/keystore.jks")
.withKeyStorePassword("keystore-password")
.withKeyPassword("key-password"))
sink:
pulsar:
topicName: "persistent://public/default/my-topic"
serviceUrl: "pulsar://pulsar.example.com:6650"
producerName: "my-producer"
properties:
key1: value1
Java dependency management¶
Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.
<dependency>
<groupId>org.voltdb</groupId>
<artifactId>volt-stream-plugin-pulsar-api</artifactId>
<version>1.7.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-pulsar-api', version: '1.7.0'
Properties¶
serviceUrl¶
The service URL for connecting to the Pulsar cluster. For example: pulsar://localhost:6650 or pulsar+ssl://pulsar.example.com:6651 Required.
Type: string
topicName¶
Sets the Pulsar topic name where data will be sent. Required.
Type: string
producerName¶
Optional producer name for identification purposes.
Type: string
properties¶
Adds or overrides specific Pulsar client properties, allowing customization for advanced configurations.
Type: object
ssl¶
SSL configuration.
Type: object
Fields of ssl:
ssl.trustStoreFile¶
Truststore file or trusted CA certificate; supported formats include JKS, PKCS#12, or PEM.
Type: string
ssl.trustStorePassword¶
Truststore password.
Type: string
ssl.keyStoreFile¶
Keystore file; supported formats include JKS, PKCS#12, or PEM
Type: string
ssl.keyStorePassword¶
Keystore password.
Type: string
ssl.keyPassword¶
Private key password. Optional — if not set, the key store password will be used.
Type: string
ssl.insecure¶
If set to true, disables SSL certificate and hostname validation. Intended for debugging purposes only. Doesn't work with mTLS.
Type: boolean
ssl.hostnameVerifier¶
Custom hostname verifier for SSL connections. If not specified and 'insecure' is true, hostname verification will be disabled.
Type: object
authToken¶
Authentication token for Pulsar authentication.
Type: string
exceptionHandler¶
Custom exception handler enabling interception of all errors related to this sink.
Type: object
keyExtractor¶
A function that extracts the message key from the given output object.
The extracted key is used by Pulsar for message ordering within a topic.
Type: object
valueExtractor¶
A function that extracts the message value (payload) from the output object.
This is the core content of the Pulsar message.
Type: object
propertiesExtractor¶
A function that extracts message properties from the output object.
Properties can carry metadata or additional context alongside the message.
Type: object
batchingMaxMessages¶
Maximum number of messages to batch before sending.
Batching improves throughput by sending multiple messages in a single request.
Type: number
Default value: 1000
batchingMaxPublishDelay¶
Maximum time to wait before sending a batch of messages.
Even if the batch is not full, messages will be sent after this delay.
Type: object
Default value: 10ms
compressionType¶
Compression type for messages sent to Pulsar.
Compression reduces network bandwidth and storage requirements.
Type: object
Supported values: none, lz4, zlib, zstd, snappy.
Default value: NONE
sendTimeout¶
Timeout for sending messages to Pulsar.
If a message cannot be sent within this time, it will fail.
Type: object
Default value: 30s
messageRoutingMode¶
Message routing mode for partitioned topics.
Determines how messages are distributed across partitions.
Type: object
Supported values: single_partition, round_robin_partition, custom_partition.
Default value: ROUND_ROBIN_PARTITION
JSON Schema¶
You can validate or explore the configuration using its JSON Schema.
Usage Examples¶
PulsarSinkConfigBuilder.accepting(String.class)
.withTopicName("persistent://public/default/my-topic")
.withServiceUrl("pulsar://pulsar.example.com:6650")
.withProducerName("my-producer")
.withProperty("key1", "value1")
.withSSL(
PulsarSslConfiguration.builder()
.withTruststore("/path/to/truststore.jks", "truststore-password")
.withKeystore("/path/to/keystore.jks", "keystore-password", "key-password")
.build()
);
This configuration is designed for producing real-time data to Pulsar topics, with support for secure communication.