Skip to content

Voltdb-cache

The voltdb-cache processor that caches VoltDB server responses based on input parameters of a request. Caching can dramatically reduce network calls.

Note the input object has to properly implement equals and hashCode contract defined by java. The equals() and hashCode() methods must be overridden together to ensure that objects behave correctly in collections like HashMap and HashSet. The contract states that if two objects are considered equal by the equals() method, they must return the same hash code, and the implementation should follow rules of reflexivity, symmetry, transitivity, consistency, and non-nullity. The input object will be used as a key in underlying cache.

A "double miss" occurs when a lookup request fails to find a value in both the cache and the underlying database. In this scenario, the requested key is neither cached nor persisted and therefore cannot be resolved by the system. Such requests are not processed further by the pipeline but may be routed to a dead-letter queue (DLQ) for handling.

User can relay on VoltSP standard configuration options to customise this processor.

As with the VoltDB flavored sinks this resource works with arrays of Object-s that correspond to stored procedure arguments. E.g. to invoke a stored procedure that has public long run(long id, int anotherId, String someString) method the Object[] should contain Long, Integer and String or compatible types as defined in the VoltDB documentation.

In case of the system error this resource will retry procedure execution with exponential backoff. The details of this behaviour are configurable using retry configuration.

The voltdb-cache operator requires VoltDBResource to be configured either in yaml or in java, see examples.

.processWith(VoltProcedureCacheProcessorConfigBuilder.builder()
    .withVoltClientResource(value)
    .withProcedureName(value)
    .withSql(value)
    .withParamsMapper(value)
    .withKeyMapper(value)
    .withResponseMapper(value)
    .withComputeFunction(value)
    .withMaximumSize(value)
    .withExpireAfterWrite(value)
    .withRetry(builder -> builder
        .withRetries(value)
        .withBackoffDelay(value)
        .withMaxBackoffDelay(value)
    )
    .withExceptionHandler(value)
)
processor:
  voltdb-cache:
    voltClientResource: value
    procedureName: value
    sql: value
    paramsMapper: value
    keyMapper: value
    responseMapper: value
    computeFunction: value
    maximumSize: value
    expireAfterWrite: value
    retry:
      retries: value
      backoffDelay: value
      maxBackoffDelay: value
    exceptionHandler: value

Java dependency management

Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.

<dependency>
    <groupId>org.voltdb</groupId>
    <artifactId>volt-stream-plugin-volt-api</artifactId>
    <version>1.7.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-volt-api', version: '1.7.0'

Properties

voltClientResource

Client resource reference to be used when connecting to VoltDb cluster Required.

Type: object

procedureName

Called stored procedure name Type: string

Default value: @AdHoc

sql

SQL query executed by @AdHoc procedure Type: string

paramsMapper

Given the input event this mapper returns parameters for named stored procedure or SQL query Required.

Type: object

keyMapper

A function that determines how the cache key is computed from the input. The returned Object must properly implement equals() and hashCode() contract to ensure correct cache behavior. If no custom key function is provided, the cache key will be generated automatically based on the output of paramsMapper and, if applicable, the SQL query.

Type: object

responseMapper

The function maps VoltDB response to an cacheable object. The object is cached for Object[] key returned from #paramsMapper. This function must accept empty VoltTable result for scenarios when given params cannot be mapped to valid response, for such cases this method can return a fallback value and this value will be cached. This function must return a value, but only non-null value will be cached.

Required.

Type: object

computeFunction

The function computes result event for next operator in pipeline Required.

Type: object

maximumSize

The maximum size of the cache Type: number

Default value: 100000

expireAfterWrite

Each entry should be automatically removed from the cache once a this duration has elapsed after the entry's creation, or the most recent replacement of its value.

Type: object

Default value: PT10M

retry

Configuration for retrying failed operations, including the number of retries and backoff delays.

Type: object

Fields of retry:

retry.retries

Number of retry attempts after a request failure. Type: number

Default value: 3

retry.backoffDelay

Initial delay before the first retry attempt. Type: object

Default value: PT0.2S

retry.maxBackoffDelay

Maximum delay between consecutive retry attempts. Type: object

Default value: PT3S

exceptionHandler

A custom exception handler enabling interception of all errors related to this processor. Type: object

JSON Schema

You can validate or explore the configuration using its JSON Schema.

Usage Examples

// if resource is configured in yaml
voltStreamBuilder.configureResource("primary-cluster", VoltDBResourceConfigBuilder.class);
// or
voltStreamBuilder.configureResource("primary-cluster",
         VoltDBResourceConfigBuilder.class,
         new Consumer<VoltDBResourceConfigBuilder>() {
             @Override
             public void consume(VoltDBResourceConfigBuilder configurator) {
                 configurator
                   .addToServers("localhost", 12122)
                   .withClientBuilder(builder -> {
                       builder.withMaxOutstandingTransactions(42000);
                       builder.withMaxTransactionsPerSecond(23);
                       builder.withRequestTimeout(Duration.ofSeconds(5));
                       builder.withAuthBuilder(authBuilder -> authBuilder
                          .withUsername("admin")
                          .withPassword("admin123"));
                       builder.withSslBuilder(sslBuilder -> sslBuilder
                          .withTrustStoreFile("c:/Users32/trust.me")
                          .withTrustStorePassword("got2have"));
                       builder.withRetryBuilder(retryBuilder -> retryBuilder
                          .withRetries(4)
                          .withBackoffDelay(Duration.ofSeconds(2))
                          .withMaxBackoffDelay(Duration.ofSeconds(11)));
                   })
             }
         });

voltStreamBuilder
         .consumeFromSource(...)
         .processWith(VoltProcedureCacheProcessorConfigBuilder.<Integer, String, EnhancedEvent>builder()
            .withVoltClientResourceName("primary-cluster")
            .withSql("SELECT * FROM events WHERE id=?")
            .withParamsMapper(i -> new Object[]{i})
            .withResponseMapper((voltResponse, i) -> {
                if (voltResponse != null && voltResponse.advanceRow()) {
                   return voltResponse.getString("name");
                } else {
                   return computeDefault(i);
                }
            })
            .withComputeFunction((i, cached) -> {
                return new EnhancedEvent(i, cached);
            })
            .withMaximumSize(100000)
            .withExpireAfterWrite(Duration.ofMinutes(10))
            .withRetryBuilder(retryBuilder -> retryBuilder
                      .withRetries(4)
                      .withBackoffDelay(Duration.ofSeconds(2))
                      .withMaxBackoffDelay(Duration.ofSeconds(11)))
         )
         .processWith((EnhancedEvent event) -> {...})

private String computeDefault(int i) {...}

public record EnhancedEvent(int id, String name) {}
resources:
    - name: primary-cluster
      voltdb-client:
        servers: localhost:12122
        client:
          maxTransactionsPerSecond: 3000
          maxOutstandingTransactions: 3000
          requestTimeout: PT10S
          auth:
            user: Admin
            password: 2r2Ffafw3V
          trustStore:
            file: file.pem
            password: got2have

processor:
    voltdb-cache:
      voltClientResource: primary-cluster
      procedureName: "getEvent"
      maximumSize: 100000
      expireAfterWrite: PT10S
      server: %s