Skip to content

Voltdb-procedure

The voltdb-procedure sink passes data to a VoltDB stored procedure. It employs at-least-once delivery guarantees.

For each event it will asynchronously invoke stored procedure passing event data as arguments. When using the Java API the argument to the sink is an array of Object-s that correspond to stored procedure arguments. E.g. to invoke a stored procedure that has public long run(long id, int anotherId, String someString) method the Object[] should contain Long, Integer and String or compatible types as defined in the VoltDB documentation.

In case of downstream system error this sink will retry procedure execution with exponential backoff. The details of this behaviour are configurable using retry configuration that is nested within client configuration.

The voltdb-procedure operator requires VoltDBResource to be configured either in yaml or in java, see examples.

.consumeFromSource(...)
.terminateWithSink(ProcedureVoltSinkConfigBuilder.builder()
    .withProcedureName(value)
    .withVoltClientResourceName(value)
    .withExceptionHandler(value)
)
sink:
  voltdb-procedure:
    procedureName: value
    voltClientResourceName: value
    exceptionHandler: value

Java dependency management

Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.

<dependency>
    <groupId>org.voltdb</groupId>
    <artifactId>volt-stream-plugin-volt-api</artifactId>
    <version>1.0-20250910-124207-release-1.5.3</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-volt-api', version: '1.0-20250910-124207-release-1.5.3'

Properties

procedureName

The name of the VoltDB stored procedure to invoke for processing data. Required.

Type: string

voltClientResourceName

The name of the volt client resource reference Required.

Type: string

exceptionHandler

A custom exception handler to process errors that occur during procedure execution. Type: object

Usage Examples

// if resource is configured in yaml
voltStreamBuilder.configureResource("primary-cluster", VoltDBResourceConfigBuilder.class);
// or
voltStreamBuilder.configureResource("primary-cluster",
         VoltDBResourceConfigBuilder.class,
         new Consumer<VoltDBResourceConfigBuilder>() {
             @Override
             public void consume(VoltDBResourceConfigBuilder configurator) {
                 configurator
                   .addToServers("localhost", 12122)
                   .withClientBuilder(vcb -> builder -> {
                       builder.withMaxOutstandingTransactions(42000);
                       builder.withMaxTransactionsPerSecond(23);
                       builder.withRequestTimeout(Duration.ofSeconds(5));
                       builder.withAuthBuilder(authBuilder -> authBuilder
                          .withUsername("admin")
                          .withPassword("admin123"));
                       builder.withSslBuilder(sslBuilder -> sslBuilder
                          .withTrustStoreFile("c:/Users32/trust.me")
                          .withTrustStorePassword("got2have"));
                       builder.withRetryBuilder(retryBuilder -> retryBuilder
                          .withRetries(4)
                          .withBackoffDelay(Duration.ofSeconds(2))
                          .withMaxBackoffDelay(Duration.ofSeconds(11)));
                   })
             }
         });

voltStreamBuilder.terminateWithSink(ProcedureVoltSinkConfigBuilder.builder()
   .withClientReferenceName("primary-cluster")
   .withProcedureName("runMe")
   .withExceptionHandler(exceptionHandler)
)
resources:
- name: primary-cluster
  voltdb-client:
    servers: localhost:12122
    client:
      maxTransactionsPerSecond: 3000
      maxOutstandingTransactions: 3000
      requestTimeout: PT10S
      auth:
        user: Admin
        password: 2r2Ffafw3V
      trustStore:
        file: file.pem
        password: got2have

sink:
   voltdb-procedure:
     clientReferenceName: primary-cluster
     procedureName: runMe
     name: "my_table"
     batchSize: 100000
     flushInterval: 5000
     operationType: "INSERT"