Skip to content

Pulsar

The pulsar sink sends data to an Apache Pulsar topic. It supports a variety of configurations such as specifying the topic name, service URL, authentication, SSL configuration, and more. This sink is ideal for integrating with Pulsar for real-time data streaming.

PulsarSinkConfigBuilder.<String>builder()
    .withTopicName("persistent://public/default/my-topic")
    .withServiceUrl("pulsar://pulsar.example.com:6650")
    .withProducerName("my-producer")
    .addPropertiesEntry("key1", "value1")
    .withSslBuilder(builder -> builder
        .withTrustStoreFile("/path/to/truststore.jks")
        .withTrustStorePassword("truststore-password")
        .withKeyStoreFile("/path/to/keystore.jks")
        .withKeyStorePassword("keystore-password")
        .withKeyPassword("key-password"))
sink:
  pulsar:
    topicName: "persistent://public/default/my-topic"
    serviceUrl: "pulsar://pulsar.example.com:6650"
    producerName: "my-producer"
    properties:
      key1: value1

Java dependency management

Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.

<dependency>
    <groupId>org.voltdb</groupId>
    <artifactId>volt-stream-plugin-pulsar-api</artifactId>
    <version>1.7.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-pulsar-api', version: '1.7.0'

Properties

serviceUrl

The service URL for connecting to the Pulsar cluster. For example: pulsar://localhost:6650 or pulsar+ssl://pulsar.example.com:6651 Required.

Type: string

topicName

Sets the Pulsar topic name where data will be sent. Required.

Type: string

producerName

Optional producer name for identification purposes. Type: string

properties

Adds or overrides specific Pulsar client properties, allowing customization for advanced configurations. Type: object

ssl

SSL configuration. Type: object

Fields of ssl:

ssl.trustStoreFile

Truststore file or trusted CA certificate; supported formats include JKS, PKCS#12, or PEM. Type: string

ssl.trustStorePassword

Truststore password. Type: string

ssl.keyStoreFile

Keystore file; supported formats include JKS, PKCS#12, or PEM Type: string

ssl.keyStorePassword

Keystore password. Type: string

ssl.keyPassword

Private key password. Optional — if not set, the key store password will be used. Type: string

ssl.insecure

If set to true, disables SSL certificate and hostname validation. Intended for debugging purposes only. Doesn't work with mTLS.

Type: boolean

ssl.hostnameVerifier

Custom hostname verifier for SSL connections. If not specified and 'insecure' is true, hostname verification will be disabled.

Type: object

authToken

Authentication token for Pulsar authentication. Type: string

exceptionHandler

Custom exception handler enabling interception of all errors related to this sink. Type: object

keyExtractor

A function that extracts the message key from the given output object. The extracted key is used by Pulsar for message ordering within a topic. Type: object

valueExtractor

A function that extracts the message value (payload) from the output object. This is the core content of the Pulsar message. Type: object

propertiesExtractor

A function that extracts message properties from the output object. Properties can carry metadata or additional context alongside the message. Type: object

batchingMaxMessages

Maximum number of messages to batch before sending. Batching improves throughput by sending multiple messages in a single request. Type: number

Default value: 1000

batchingMaxPublishDelay

Maximum time to wait before sending a batch of messages. Even if the batch is not full, messages will be sent after this delay. Type: object

Default value: 10ms

compressionType

Compression type for messages sent to Pulsar. Compression reduces network bandwidth and storage requirements. Type: object

Supported values: none, lz4, zlib, zstd, snappy.

Default value: NONE

sendTimeout

Timeout for sending messages to Pulsar. If a message cannot be sent within this time, it will fail. Type: object

Default value: 30s

messageRoutingMode

Message routing mode for partitioned topics. Determines how messages are distributed across partitions. Type: object

Supported values: single_partition, round_robin_partition, custom_partition.

Default value: ROUND_ROBIN_PARTITION

JSON Schema

You can validate or explore the configuration using its JSON Schema.

Usage Examples

PulsarSinkConfigBuilder.accepting(String.class)
   .withTopicName("persistent://public/default/my-topic")
   .withServiceUrl("pulsar://pulsar.example.com:6650")
   .withProducerName("my-producer")
   .withProperty("key1", "value1")
   .withSSL(
       PulsarSslConfiguration.builder()
           .withTruststore("/path/to/truststore.jks", "truststore-password")
           .withKeystore("/path/to/keystore.jks", "keystore-password", "key-password")
           .build()
       );

This configuration is designed for producing real-time data to Pulsar topics, with support for secure communication.