Voltdb-bulk-procedure¶
The voltdb-bulk-procedure operator differs from a typical sink in that it enables two-way communication with VoltDB.
It sends requests to a VoltDB procedure and forwards the responses to be processed by subsequent components of the stream.
The voltdb-bulk-procedure operator processes requests in batches, enabling a high throughput procedure calls with at least once delivery guarantees.
The voltdb-bulk-procedure operator requires VoltDBResource to be configured either in yaml or in java, see examples.
Partitioning¶
Requests that hash to the same partition are batched and send directly to site responsible for that partition. The VoltDB site thread reads the requests one by one and executes the procedure invocation.
After processing, responses (or errors) are serialized and inserted into the VoltSP stream. The worker thread picks them up as regular events. Even if a VoltSP worker is unavailable, the event flow is preserved, and batches continue regardless of procedure execution delays.
VoltSP can finalize processing current batch and start consuming new batch of data, while VoltDB is still executing the procedure - these phases are detached. Even if called procedure is slow, VoltSP processing is not disturbed.
Handling duplicates¶
In case of failure while sending data to VoltDB, the batch may be retried, which can cause duplicates.
Requests are saved with a batchId and a unique index to ensure each row’s uniqueness. The target procedure
must deduplicate incoming requests if the business logic relies on lack of duplicates. It is similar to the situation when an application
sends same request twice via a VoltDB client.
Reading responses¶
For each procedure (e.g. MyProcedure.java), VoltSP automatically creates an associated VoltDB stream
named myprocedure_sq. Since VoltDB streams require a topic defined in deployment.xml, you must add it manually.
For example:
<?xml version="1.0"?>
<deployment>
...
<topics enabled="true">
<topic name="myprocedure_topic" />
</topics>
</deployment>
Once the topic is configured, procedure responses are fetched, decoded, and made available in the child stream.
Handling avro encoding¶
VoltDB procedures can accept and return Avro payloads. The first parameter must be the partition key; subsequent parameters may be Avro GenericRecord instances or POJOs.
To enable Avro serialization, both VoltSP and VoltDB must be able to retrieve schemas from a remote schema registry.
.consumeFromSource(...)
.terminateWithEmitter(VoltBulkProcedureEmitterConfigBuilder.builder()
.withProcedureName(value)
.withVoltClientResource(value)
.withBatchSize(value)
.withFlushInterval(value)
.withExceptionHandler(value)
)
sink:
voltdb-bulk-procedure:
procedureName: value
voltClientResource: value
batchSize: value
flushInterval: value
exceptionHandler: value
Java dependency management¶
Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.
<dependency>
<groupId>org.voltdb</groupId>
<artifactId>volt-stream-plugin-volt-api</artifactId>
<version>1.6.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-volt-api', version: '1.6.0'
Properties¶
procedureName¶
The name of the stored procedure in VoltDB to be invoked for bulk operations. Required.
Type: string
voltClientResource¶
Client resource reference to be used when connecting to VoltDb cluster Required.
Type: object
batchSize¶
The maximum number of records to include in a single batch before inserting data into VoltDB. Higher values can improve throughput but will increase memory usage.
Type: number
Default value: 100000
flushInterval¶
The time interval after which batch is flushed to VoltDB, even if the desired batch size is not reached.
Type: object
Default value: 1s
exceptionHandler¶
A custom exception handler to process errors that occur during the execution of bulk operations.
Type: object
JSON Schema¶
You can validate or explore the configuration using its JSON Schema.
Usage Examples¶
// if resource is configured in yaml, see resource documentation
voltStreamBuilder.configureResource("primary-cluster", VoltDBResourceConfigBuilder.class);
// or
voltStreamBuilder.configureResource("primary-cluster",
VoltDBResourceConfigBuilder.class,
new Consumer<VoltDBResourceConfigBuilder>() {
@Override
public void consume(VoltDBResourceConfigBuilder configurator) {
configurator
.addToServers("localhost", 12122)
.withClientBuilder(vcb -> builder -> {
builder.withMaxOutstandingTransactions(42000);
builder.withMaxTransactionsPerSecond(23);
builder.withRequestTimeout(Duration.ofSeconds(5));
builder.withAuthBuilder(authBuilder -> authBuilder
.withUsername("admin")
.withPassword("admin123"));
builder.withSslBuilder(sslBuilder -> sslBuilder
.withTrustStoreFile("c:/Users32/trust.me")
.withTrustStorePassword("got2have"));
builder.withRetryBuilder(retryBuilder -> retryBuilder
.withRetries(4)
.withBackoffDelay(Duration.ofSeconds(2))
.withMaxBackoffDelay(Duration.ofSeconds(11)));
})
}
});
voltStreamBuilder.terminateWithSink(VoltBulkProcedureEmitterConfigBuilder.builder()
.withClientReferenceName("primary-cluster")
.withProcedureName("runMe")
.withBatchSize(100)
.withFlushInterval(10, TimeUnit.SECONDS)
.withExceptionHandler(exceptionHandler)
)
resources:
- name: primary-cluster
voltdb-client:
servers: localhost:12122
client:
maxOutstandingTransactions: 42000
maxTransactionsPerSecond: 23
requestTimeout: PT5S
auth:
username: admin
password: admin123
ssl:
trustStoreFile: c:/Users32/trust.me
trustStorePassword: got2have
retry:
retries: 4
backoffDelay: 2s
maxBackoffDelay: 11s
sink:
voltdb-bulk-procedure:
clientReferenceName: primary-cluster
procedureName: runMe
batchSize: 100
flushInterval: 10s