Skip to content

Pulsar

The pulsar source consumes data from Apache Pulsar topics. It allows configuration of consumer properties, such as topic names, service URL, subscription name, subscription type, and more. This source is suitable for integrating real-time data from Pulsar into your streaming pipeline.

PulsarSourceConfigBuilder.<byte[]>builder()
   .withServiceUrl("pulsar://localhost:6650")
   .withTopicNames("persistent://public/default/topic1", "persistent://public/default/topic2")
   .withSubscriptionName("my-subscription")
   .withSubscriptionType(PulsarSubscriptionType.SHARED)
   .withReceiveTimeout(Duration.ofMillis(250))
source:
  pulsar:
    serviceUrl: "pulsar://localhost:6650"
    topicNames:
      - "persistent://public/default/topic1"
      - "persistent://public/default/topic2"
    subscriptionName: "my-subscription"
    subscriptionType: "SHARED"
    receiveTimeout: "PT0.25S"
    properties:
      key1: value1

Java dependency management

Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.

<dependency>
    <groupId>org.voltdb</groupId>
    <artifactId>volt-stream-plugin-pulsar-api</artifactId>
    <version>1.7.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-pulsar-api', version: '1.7.0'

Properties

serviceUrl

The service URL for connecting to the Pulsar cluster. For example: pulsar://localhost:6650 or pulsar+ssl://pulsar.example.com:6651 Required.

Type: string

topicNames

Set of topic names to consume from. Topics should follow Pulsar naming convention: persistent://tenant/namespace/topic Required.

Type: array

subscriptionName

The subscription name for this consumer. Multiple consumers with the same subscription name will share messages according to the subscription type. Required.

Type: string

subscriptionType

The subscription type determines how messages are distributed: - EXCLUSIVE: Only one consumer can be active at a time - SHARED: Messages are distributed across multiple consumers - FAILOVER: One consumer receives messages until it fails, then another takes over - KEY_SHARED: Messages with the same key go to the same consumer

Required.

Type: object

Supported values: exclusive, shared, failover, key_shared.

Default value: SHARED

consumerName

Optional consumer name for identification purposes. Type: string

receiveTimeout

The maximum time to wait for a message. Type: object

Default value: 1s

properties

Adds or overrides specific Pulsar client properties, allowing customization for advanced configurations. Type: object

ssl

SSL configuration. Type: object

Fields of ssl:

ssl.trustStoreFile

Truststore file or trusted CA certificate; supported formats include JKS, PKCS#12, or PEM. Type: string

ssl.trustStorePassword

Truststore password. Type: string

ssl.keyStoreFile

Keystore file; supported formats include JKS, PKCS#12, or PEM Type: string

ssl.keyStorePassword

Keystore password. Type: string

ssl.keyPassword

Private key password. Optional — if not set, the key store password will be used. Type: string

ssl.insecure

If set to true, disables SSL certificate and hostname validation. Intended for debugging purposes only. Doesn't work with mTLS.

Type: boolean

ssl.hostnameVerifier

Custom hostname verifier for SSL connections. If not specified and 'insecure' is true, hostname verification will be disabled.

Type: object

authToken

Authentication token for Pulsar authentication. Type: string

exceptionHandler

Custom exception handler enabling interception of all errors related to this source. Type: object

subscriptionInitialPosition

The initial position for the subscription when first created. EARLIEST starts from the beginning of the topic, LATEST starts from new messages only. Type: object

Supported values: earliest, latest.

Default value: EARLIEST

deadLetterTopic

Dead letter topic name for messages that fail processing. Messages that exceed max redelivery count will be sent to this topic. Type: string

maxRedeliveryCount

Maximum number of times a message will be redelivered before being sent to the dead letter topic. Type: number

Default value: 0

negativeAckRedeliveryDelay

Delay before redelivering negatively acknowledged messages. This gives the system time to recover from transient failures. Type: object

Default value: 1m

ackTimeout

Timeout for acknowledging messages. If a message is not acknowledged within this time, it will be redelivered. Type: object

Default value: 0s

batchReceiveMaxMessages

Maximum number of messages to receive in a single batch. Batch receiving improves throughput by processing multiple messages at once. Type: number

Default value: 100

batchReceiveTimeout

Maximum time to wait for a full batch before processing partial batch. This prevents indefinite waiting when message rate is low. Type: object

Default value: 100ms

JSON Schema

You can validate or explore the configuration using its JSON Schema.

Usage Examples

PulsarSourceConfigBuilder.<byte[]>builder()
    .withServiceUrl("pulsar://pulsar.example.com:6650")
    .withTopicNames("persistent://public/default/my-topic")
    .withSubscriptionName("my-subscription")
    .withSubscriptionType(PulsarSubscriptionType.EXCLUSIVE)
    .withReceiveTimeout(Duration.ofMillis(500))
    .withConsumerName("my-consumer")
    .withSslBuilder(builder -> builder
        .withTrustStoreFile("/path/to/truststore.jks")
        .withTrustStorePassword("truststore-password"))
    .withAuthToken("eyJhbGciOiJIUzI1NiJ9...")
    .withExceptionHandler((context, ex) ->
        System.err.println("Error while consuming data: " + ex.getMessage())
    );

This configuration is designed for consuming real-time data from Pulsar topics, with support for secure communication and custom error handling.