Pulsar¶
The pulsar source consumes data from Apache Pulsar topics. It allows configuration of consumer properties,
such as topic names, service URL, subscription name, subscription type, and more.
This source is suitable for integrating real-time data from Pulsar into your streaming pipeline.
PulsarSourceConfigBuilder.<byte[]>builder()
.withServiceUrl("pulsar://localhost:6650")
.withTopicNames("persistent://public/default/topic1", "persistent://public/default/topic2")
.withSubscriptionName("my-subscription")
.withSubscriptionType(PulsarSubscriptionType.SHARED)
.withReceiveTimeout(Duration.ofMillis(250))
source:
pulsar:
serviceUrl: "pulsar://localhost:6650"
topicNames:
- "persistent://public/default/topic1"
- "persistent://public/default/topic2"
subscriptionName: "my-subscription"
subscriptionType: "SHARED"
receiveTimeout: "PT0.25S"
properties:
key1: value1
Java dependency management¶
Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.
<dependency>
<groupId>org.voltdb</groupId>
<artifactId>volt-stream-plugin-pulsar-api</artifactId>
<version>1.7.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-pulsar-api', version: '1.7.0'
Properties¶
serviceUrl¶
The service URL for connecting to the Pulsar cluster. For example: pulsar://localhost:6650 or pulsar+ssl://pulsar.example.com:6651 Required.
Type: string
topicNames¶
Set of topic names to consume from. Topics should follow Pulsar naming convention: persistent://tenant/namespace/topic Required.
Type: array
subscriptionName¶
The subscription name for this consumer. Multiple consumers with the same subscription name will share messages according to the subscription type. Required.
Type: string
subscriptionType¶
The subscription type determines how messages are distributed: - EXCLUSIVE: Only one consumer can be active at a time - SHARED: Messages are distributed across multiple consumers - FAILOVER: One consumer receives messages until it fails, then another takes over - KEY_SHARED: Messages with the same key go to the same consumer
Required.
Type: object
Supported values: exclusive, shared, failover, key_shared.
Default value: SHARED
consumerName¶
Optional consumer name for identification purposes.
Type: string
receiveTimeout¶
The maximum time to wait for a message.
Type: object
Default value: 1s
properties¶
Adds or overrides specific Pulsar client properties, allowing customization for advanced configurations.
Type: object
ssl¶
SSL configuration.
Type: object
Fields of ssl:
ssl.trustStoreFile¶
Truststore file or trusted CA certificate; supported formats include JKS, PKCS#12, or PEM.
Type: string
ssl.trustStorePassword¶
Truststore password.
Type: string
ssl.keyStoreFile¶
Keystore file; supported formats include JKS, PKCS#12, or PEM
Type: string
ssl.keyStorePassword¶
Keystore password.
Type: string
ssl.keyPassword¶
Private key password. Optional — if not set, the key store password will be used.
Type: string
ssl.insecure¶
If set to true, disables SSL certificate and hostname validation. Intended for debugging purposes only. Doesn't work with mTLS.
Type: boolean
ssl.hostnameVerifier¶
Custom hostname verifier for SSL connections. If not specified and 'insecure' is true, hostname verification will be disabled.
Type: object
authToken¶
Authentication token for Pulsar authentication.
Type: string
exceptionHandler¶
Custom exception handler enabling interception of all errors related to this source.
Type: object
subscriptionInitialPosition¶
The initial position for the subscription when first created.
EARLIEST starts from the beginning of the topic, LATEST starts from new messages only.
Type: object
Supported values: earliest, latest.
Default value: EARLIEST
deadLetterTopic¶
Dead letter topic name for messages that fail processing.
Messages that exceed max redelivery count will be sent to this topic.
Type: string
maxRedeliveryCount¶
Maximum number of times a message will be redelivered before being sent to the dead letter topic.
Type: number
Default value: 0
negativeAckRedeliveryDelay¶
Delay before redelivering negatively acknowledged messages.
This gives the system time to recover from transient failures.
Type: object
Default value: 1m
ackTimeout¶
Timeout for acknowledging messages.
If a message is not acknowledged within this time, it will be redelivered.
Type: object
Default value: 0s
batchReceiveMaxMessages¶
Maximum number of messages to receive in a single batch.
Batch receiving improves throughput by processing multiple messages at once.
Type: number
Default value: 100
batchReceiveTimeout¶
Maximum time to wait for a full batch before processing partial batch.
This prevents indefinite waiting when message rate is low.
Type: object
Default value: 100ms
JSON Schema¶
You can validate or explore the configuration using its JSON Schema.
Usage Examples¶
PulsarSourceConfigBuilder.<byte[]>builder()
.withServiceUrl("pulsar://pulsar.example.com:6650")
.withTopicNames("persistent://public/default/my-topic")
.withSubscriptionName("my-subscription")
.withSubscriptionType(PulsarSubscriptionType.EXCLUSIVE)
.withReceiveTimeout(Duration.ofMillis(500))
.withConsumerName("my-consumer")
.withSslBuilder(builder -> builder
.withTrustStoreFile("/path/to/truststore.jks")
.withTrustStorePassword("truststore-password"))
.withAuthToken("eyJhbGciOiJIUzI1NiJ9...")
.withExceptionHandler((context, ex) ->
System.err.println("Error while consuming data: " + ex.getMessage())
);
This configuration is designed for consuming real-time data from Pulsar topics, with support for secure communication and custom error handling.