Skip to content

Voltdb-procedure

The voltdb-procedure sink passes data to a VoltDB stored procedure. It employs at-least-once delivery guarantees.

For each event it will asynchronously invoke a stored procedure passing event data as arguments. When using the Java API the argument to the sink is a VoltRequest object that contains: - parameters: an array of Object-s that correspond to stored procedure arguments. For example, to invoke a stored procedure that has public long run(long id, int anotherId, String someString) the Object[] should contain Long, Integer and String or compatible types as defined in the VoltDB documentation. - procedureName: name of the stored procedure to run for this specific event.

In case of downstream system error this sink will retry procedure execution with exponential backoff. The details of this behaviour are configurable using retry configuration that is nested within client configuration.

The voltdb-procedure operator requires VoltDBResource to be configured either in yaml or in java, see examples.

.consumeFromSource(...)
.terminateWithSink(VoltProcedureSinkConfigBuilder.builder()
    .withVoltClientResource(value)
    .withRequestMapper(value)
    .withExceptionHandler(value)
)
sink:
  voltdb-procedure:
    voltClientResource: value
    requestMapper: value
    exceptionHandler: value

Java dependency management

Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.

<dependency>
    <groupId>org.voltdb</groupId>
    <artifactId>volt-stream-plugin-volt-api</artifactId>
    <version>1.8.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-volt-api', version: '1.8.0'

Properties

voltClientResource

Client resource reference to be used when connecting to VoltDb cluster Required.

Type: object

requestMapper

Given the input event this mapper returns request to VoltDB. If the event is already an instance of VoltProcedureRequest, or an instance of Map with same structure as VoltProcedureRequest, then this mapper is not necessary.

Type: object

exceptionHandler

A custom exception handler to process errors that occur during procedure execution. Type: object

JSON Schema

You can validate or explore the configuration using its JSON Schema.

Usage Examples

// if resource is configured in yaml
voltStreamBuilder.configureResource("primary-cluster", VoltDBResourceConfigBuilder.class);
// or
voltStreamBuilder.configureResource("primary-cluster",
         VoltDBResourceConfigBuilder.class,
         new Consumer<VoltDBResourceConfigBuilder>() {
             @Override
             public void consume(VoltDBResourceConfigBuilder configurator) {
                 configurator
                   .addToServers("localhost", 12122)
                   .withClientBuilder(vcb -> builder -> {
                       builder.withMaxOutstandingTransactions(42000);
                       builder.withMaxTransactionsPerSecond(23);
                       builder.withRequestTimeout(Duration.ofSeconds(5));
                       builder.withAuthBuilder(authBuilder -> authBuilder
                          .withUsername("admin")
                          .withPassword("admin123"));
                       builder.withSslBuilder(sslBuilder -> sslBuilder
                          .withTrustStoreFile("c:/Users32/trust.me")
                          .withTrustStorePassword("got2have"));
                       builder.withRetryBuilder(retryBuilder -> retryBuilder
                          .withRetries(4)
                          .withBackoffDelay(Duration.ofSeconds(2))
                          .withMaxBackoffDelay(Duration.ofSeconds(11)));
                   })
             }
         });

voltStreamBuilder
    .consumeFromSource(...)
    .terminateWithSink(VoltProcedureSinkConfigBuilder.<Event>builder()
       .withVoltClientResourceName("primary-cluster")
       .withExceptionHandler(exceptionHandler)
       .withRequestMapper(event -> new Object[] { event.id() })
       //or .withRequestMapper(org.acme.EventToRequest::map)
    )
resources:
- name: primary-cluster
  voltdb-client:
    servers: localhost:12122
    client:
      maxTransactionsPerSecond: 3000
      maxOutstandingTransactions: 3000
      requestTimeout: PT10S
      auth:
        user: Admin
        password: ${ voltdb.password }
      trustStore:
        file: file.pem
        password: got2have

sink:
   voltdb-procedure:
     clientReferenceName: primary-cluster
     requestMapper: org.acme.EventToRequest::map

Metrics

Volt metrics

Metric enum: org.voltdb.stream.plugin.volt.metrics.VoltMetric

Prometheus name Type Description
voltsp_volt_affinity_reads_total counter Number of VoltDB affinity reads.
voltsp_volt_affinity_writes_total counter Number of VoltDB affinity writes.
voltsp_volt_procedure_bytes_read_bytes counter Number of bytes read by the VoltDB client.
voltsp_volt_procedure_bytes_written_bytes counter Number of bytes written by the VoltDB client.
voltsp_volt_procedure_invocation_aborts_total counter Number of VoltDB procedure invocation aborts.
voltsp_volt_procedure_invocation_completed_total counter Number of completed VoltDB procedure invocations.
voltsp_volt_procedure_invocation_errors_total counter Number of VoltDB procedure invocation errors.
voltsp_volt_procedure_invocation_retry_total counter Number of VoltDB procedure invocation retries.
voltsp_volt_procedure_invocation_timeouts_total counter Number of VoltDB procedure invocation timeouts.
voltsp_volt_roundrobin_reads_total counter Number of VoltDB round-robin reads.
voltsp_volt_roundrobin_writes_total counter Number of VoltDB round-robin writes.
voltsp_volt_procedure_invocation_latency_seconds histogram VoltDB procedure invocation latency.
voltsp_volt_sink_info info Metadata describing a VoltDB sink.

Volt tags

Tag enum: org.voltdb.stream.plugin.volt.metrics.VoltTag

Metrics reported by this component may include these tags. Not every metric includes every tag; tags are present only when they are relevant to the measurement.

Tag Description
connection_id VoltDB client connection id.
partition_id VoltDB partition id.
procedure_name VoltDB procedure name.
table_name VoltDB table name.
volt_batch_size Configured VoltDB batch size.