Skip to content

Voltdb-bulk-procedure

The voltdb-bulk-procedure operator differs from a typical sink in that it enables two-way communication with VoltDB. It sends requests to a VoltDB procedure and forwards the responses to be processed by subsequent components of the stream.

The voltdb-bulk-procedure operator processes requests in batches, enabling a high throughput procedure calls with at least once delivery guarantees.

The voltdb-bulk-procedure operator requires VoltDBResource to be configured either in yaml or in java, see examples.

Partitioning

Requests that hash to the same partition are batched and send directly to site responsible for that partition. The VoltDB site thread reads the requests one by one and executes the procedure invocation.

After processing, responses (or errors) are serialized and inserted into the VoltSP stream. The worker thread picks them up as regular events. Even if a VoltSP worker is unavailable, the event flow is preserved, and batches continue regardless of procedure execution delays.

VoltSP can finalize processing current batch and start consuming new batch of data, while VoltDB is still executing the procedure - these phases are detached. Even if called procedure is slow, VoltSP processing is not disturbed.

Handling duplicates

In case of failure while sending data to VoltDB, the batch may be retried, which can cause duplicates. Requests are saved with a batchId and a unique index to ensure each row’s uniqueness. The target procedure must deduplicate incoming requests if the business logic relies on lack of duplicates. It is similar to the situation when an application sends same request twice via a VoltDB client.

Reading responses

For each procedure (e.g. MyProcedure.java), VoltSP automatically creates an associated VoltDB stream named myprocedure_sq. Since VoltDB streams require a topic defined in deployment.xml, you must add it manually. For example:

<?xml version="1.0"?>
<deployment>
    ...
    <topics enabled="true">
        <topic name="myprocedure_topic" />
    </topics>
</deployment>

Once the topic is configured, procedure responses are fetched, decoded, and made available in the child stream.

Handling avro encoding

VoltDB procedures can accept and return Avro payloads. The first parameter must be the partition key; subsequent parameters may be Avro GenericRecord instances or POJOs.

To enable Avro serialization, both VoltSP and VoltDB must be able to retrieve schemas from a remote schema registry.

.consumeFromSource(...)
.terminateWithEmitter(VoltBulkProcedureEmitterConfigBuilder.builder()
    .withProcedureName(value)
    .withVoltClientResource(value)
    .withBatchSize(value)
    .withFlushInterval(value)
    .withExceptionHandler(value)
)
sink:
  voltdb-bulk-procedure:
    procedureName: value
    voltClientResource: value
    batchSize: value
    flushInterval: value
    exceptionHandler: value

Java dependency management

Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.

<dependency>
    <groupId>org.voltdb</groupId>
    <artifactId>volt-stream-plugin-volt-api</artifactId>
    <version>1.6.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-volt-api', version: '1.6.0'

Properties

procedureName

The name of the stored procedure in VoltDB to be invoked for bulk operations. Required.

Type: string

voltClientResource

Client resource reference to be used when connecting to VoltDb cluster Required.

Type: object

batchSize

The maximum number of records to include in a single batch before inserting data into VoltDB. Higher values can improve throughput but will increase memory usage. Type: number

Default value: 100000

flushInterval

The time interval after which batch is flushed to VoltDB, even if the desired batch size is not reached. Type: object

Default value: 1s

exceptionHandler

A custom exception handler to process errors that occur during the execution of bulk operations. Type: object

JSON Schema

You can validate or explore the configuration using its JSON Schema.

Usage Examples

// if resource is configured in yaml, see resource documentation
voltStreamBuilder.configureResource("primary-cluster", VoltDBResourceConfigBuilder.class);
// or
voltStreamBuilder.configureResource("primary-cluster",
         VoltDBResourceConfigBuilder.class,
         new Consumer<VoltDBResourceConfigBuilder>() {
             @Override
             public void consume(VoltDBResourceConfigBuilder configurator) {
                 configurator
                   .addToServers("localhost", 12122)
                   .withClientBuilder(vcb -> builder -> {
                       builder.withMaxOutstandingTransactions(42000);
                       builder.withMaxTransactionsPerSecond(23);
                       builder.withRequestTimeout(Duration.ofSeconds(5));
                       builder.withAuthBuilder(authBuilder -> authBuilder
                          .withUsername("admin")
                          .withPassword("admin123"));
                       builder.withSslBuilder(sslBuilder -> sslBuilder
                          .withTrustStoreFile("c:/Users32/trust.me")
                          .withTrustStorePassword("got2have"));
                       builder.withRetryBuilder(retryBuilder -> retryBuilder
                          .withRetries(4)
                          .withBackoffDelay(Duration.ofSeconds(2))
                          .withMaxBackoffDelay(Duration.ofSeconds(11)));
                   })
             }
         });

voltStreamBuilder.terminateWithSink(VoltBulkProcedureEmitterConfigBuilder.builder()
    .withClientReferenceName("primary-cluster")
    .withProcedureName("runMe")
    .withBatchSize(100)
    .withFlushInterval(10, TimeUnit.SECONDS)
    .withExceptionHandler(exceptionHandler)
)
resources:
- name: primary-cluster
  voltdb-client:
    servers: localhost:12122
    client:
        maxOutstandingTransactions: 42000
        maxTransactionsPerSecond: 23
        requestTimeout: PT5S
        auth:
            username: admin
            password: admin123
        ssl:
            trustStoreFile: c:/Users32/trust.me
            trustStorePassword: got2have
        retry:
            retries: 4
            backoffDelay: 2s
            maxBackoffDelay: 11s
sink:
    voltdb-bulk-procedure:
      clientReferenceName: primary-cluster
      procedureName: runMe
      batchSize: 100
      flushInterval: 10s