Voltdb-procedure¶
The voltdb-procedure
sink passes data to a VoltDB stored procedure. It employs at-least-once delivery guarantees.
For each event it will asynchronously invoke stored procedure passing event data as arguments. When using
the Java API the argument to the sink is an array of Object
-s that correspond to stored procedure arguments.
E.g. to invoke a stored procedure that has public long run(long id, int anotherId, String someString)
method
the Object[]
should contain Long
, Integer
and String
or compatible types as defined in the VoltDB
documentation.
In case of downstream system error this sink will retry procedure execution with exponential backoff. The details of this behaviour are configurable using retry configuration that is nested within client configuration.
The voltdb-procedure
operator requires VoltDBResource to be configured either in yaml or in java, see examples.
.consumeFromSource(...)
.terminateWithSink(ProcedureVoltSinkConfigBuilder.builder()
.withProcedureName(value)
.withVoltClientResourceName(value)
.withExceptionHandler(value)
)
sink:
voltdb-procedure:
procedureName: value
voltClientResourceName: value
exceptionHandler: value
Java dependency management¶
Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.
<dependency>
<groupId>org.voltdb</groupId>
<artifactId>volt-stream-plugin-volt-api</artifactId>
<version>1.0-20250910-124207-release-1.5.3</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-volt-api', version: '1.0-20250910-124207-release-1.5.3'
Properties¶
procedureName
¶
The name of the VoltDB stored procedure to invoke for processing data. Required.
Type: string
voltClientResourceName
¶
The name of the volt client resource reference Required.
Type: string
exceptionHandler
¶
A custom exception handler to process errors that occur during procedure execution.
Type: object
Usage Examples¶
// if resource is configured in yaml
voltStreamBuilder.configureResource("primary-cluster", VoltDBResourceConfigBuilder.class);
// or
voltStreamBuilder.configureResource("primary-cluster",
VoltDBResourceConfigBuilder.class,
new Consumer<VoltDBResourceConfigBuilder>() {
@Override
public void consume(VoltDBResourceConfigBuilder configurator) {
configurator
.addToServers("localhost", 12122)
.withClientBuilder(vcb -> builder -> {
builder.withMaxOutstandingTransactions(42000);
builder.withMaxTransactionsPerSecond(23);
builder.withRequestTimeout(Duration.ofSeconds(5));
builder.withAuthBuilder(authBuilder -> authBuilder
.withUsername("admin")
.withPassword("admin123"));
builder.withSslBuilder(sslBuilder -> sslBuilder
.withTrustStoreFile("c:/Users32/trust.me")
.withTrustStorePassword("got2have"));
builder.withRetryBuilder(retryBuilder -> retryBuilder
.withRetries(4)
.withBackoffDelay(Duration.ofSeconds(2))
.withMaxBackoffDelay(Duration.ofSeconds(11)));
})
}
});
voltStreamBuilder.terminateWithSink(ProcedureVoltSinkConfigBuilder.builder()
.withClientReferenceName("primary-cluster")
.withProcedureName("runMe")
.withExceptionHandler(exceptionHandler)
)
resources:
- name: primary-cluster
voltdb-client:
servers: localhost:12122
client:
maxTransactionsPerSecond: 3000
maxOutstandingTransactions: 3000
requestTimeout: PT10S
auth:
user: Admin
password: 2r2Ffafw3V
trustStore:
file: file.pem
password: got2have
sink:
voltdb-procedure:
clientReferenceName: primary-cluster
procedureName: runMe
name: "my_table"
batchSize: 100000
flushInterval: 5000
operationType: "INSERT"