Voltdb-procedure¶
The emitter voltdb-procedure invokes VoltDB stored procedure. It employs at-least-once delivery guarantees.
For each event it will synchronously or asynchronously invoke stored procedure passing event data as arguments. When using
the Java API the argument to the sink is a VoltRequest object that contains:
- parameters: an array of Object-s that correspond to stored procedure arguments. For example, to invoke a
stored procedure that has public long run(long id, int anotherId, String someString) the Object[] should
contain Long, Integer and String or compatible types as defined in the VoltDB
documentation.
- procedureName: name of the stored procedure to run for this specific event.
In case of downstream system error this processor will retry procedure execution with exponential backoff. The details of this behaviour are configurable using retry configuration that is nested within client configuration.
The voltdb-procedure operator requires VoltDBResource to be configured either in yaml or in java, see examples.
// if resource is configured in yaml
voltStreamBuilder.configureResource("primary-cluster", VoltDBResourceConfigBuilder.class);
// or
voltStreamBuilder.configureResource("primary-cluster",
VoltDBResourceConfigBuilder.class,
new Consumer<VoltDBResourceConfigBuilder>() {
@Override
public void consume(VoltDBResourceConfigBuilder configurator) {
configurator
.addToServers("localhost", 12122)
.withClientBuilder(vcb -> builder -> {
builder.withMaxOutstandingTransactions(42000);
builder.withMaxTransactionsPerSecond(23);
builder.withRequestTimeout(Duration.ofSeconds(5));
builder.withAuthBuilder(authBuilder -> authBuilder
.withUsername("admin")
.withPassword("admin123"));
builder.withSslBuilder(sslBuilder -> sslBuilder
.withTrustStoreFile("c:/Users32/trust.me")
.withTrustStorePassword("got2have"));
builder.withRetryBuilder(retryBuilder -> retryBuilder
.withRetries(4)
.withBackoffDelay(Duration.ofSeconds(2))
.withMaxBackoffDelay(Duration.ofSeconds(11)));
})
}
});
voltStreamBuilder
.consumeFromSource(...)
.terminateWithEmitter(VoltProcedureEmitterConfig.<Event, EnhancedEvent>builder()
.withClientReferenceName("primary-cluster")
.withExceptionHandler(exceptionHandler)
.withRequestMapper(event -> new Object[] { event.id() })
.withComputeFunction((event, result) -> {
if (result != null && result.advanceRow()) {
return new EnhancedEvent(event.id(), result.getString("name"));
}
return null;
})
)
.emit(VoltProcedureTrigger.onEachResponse)
...
public record EnhancedEvent(int id, String name) {}
resources:
- name: primary-cluster
voltdb-client:
servers: localhost:12122
client:
maxOutstandingTransactions: 42000
maxTransactionsPerSecond: 23
requestTimeout: PT5S
auth:
username: admin
password: admin123
ssl:
trustStoreFile: c:/Users32/trust.me
trustStorePassword: got2have
retry:
retries: 4
backoffDelay: 2s
maxBackoffDelay: 11s
emitter:
voltdb-procedure:
voltClientResource: primary-cluster
requestMapper: "com.example.Event::toRequest"
computeFunction: "com.example.Event::toResult"
triggers:
- trigger: onEachResponse
...
Java dependency management¶
Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.
<dependency>
<groupId>org.voltdb</groupId>
<artifactId>volt-stream-plugin-volt-api</artifactId>
<version>1.7.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-volt-api', version: '1.7.0'
Properties¶
requestMapper¶
Given the input event this mapper returns request to VoltDB
Type: object
computeFunction¶
The function computes result event for next operator in pipeline Required.
Type: object
asyncCall¶
Synchronously/asynchronously call to VoltDB. By default procedures are executed asynchronously.
Type: boolean
Default value: true
voltClientResource¶
Client resource reference to be used when connecting to VoltDb cluster Required.
Type: object
exceptionHandler¶
A custom exception handler enabling interception of all errors related to this processor.
Type: object
JSON Schema¶
You can validate or explore the configuration using its JSON Schema.