Voltdb-cache¶
The voltdb-cache processor that caches VoltDB server responses based on input parameters of a request.
Caching can dramatically reduce network calls.
Note the input object has to properly implement equals and hashCode contract defined by java. The equals() and hashCode() methods must be overridden together to ensure that objects behave correctly in collections like HashMap and HashSet. The contract states that if two objects are considered equal by the equals() method, they must return the same hash code, and the implementation should follow rules of reflexivity, symmetry, transitivity, consistency, and non-nullity. The input object will be used as a key in underlying cache.
A "double miss" occurs when a lookup request fails to find a value in both the cache and the underlying database. In this scenario, the requested key is neither cached nor persisted and therefore cannot be resolved by the system. Such requests are not processed further by the pipeline but may be routed to a dead-letter queue (DLQ) for handling.
User can relay on VoltSP standard configuration options to customise this processor.
As with the VoltDB flavored sinks this resource works with arrays of Object-s that correspond to stored procedure arguments.
E.g. to invoke a stored procedure that has public long run(long id, int anotherId, String someString) method
the Object[] should contain Long, Integer and String or compatible types as defined in the VoltDB
documentation.
In case of the system error this resource will retry procedure execution with exponential backoff. The details of this behaviour are configurable using retry configuration.
The voltdb-cache operator requires VoltDBResource to be configured either in yaml or in java, see examples.
.processWith(VoltProcedureCacheProcessorConfigBuilder.builder()
.withVoltClientResource(value)
.withProcedureName(value)
.withSql(value)
.withParamsMapper(value)
.withKeyMapper(value)
.withResponseMapper(value)
.withComputeFunction(value)
.withMaximumSize(value)
.withExpireAfterWrite(value)
.withRetry(builder -> builder
.withRetries(value)
.withBackoffDelay(value)
.withMaxBackoffDelay(value)
)
.withExceptionHandler(value)
)
processor:
voltdb-cache:
voltClientResource: value
procedureName: value
sql: value
paramsMapper: value
keyMapper: value
responseMapper: value
computeFunction: value
maximumSize: value
expireAfterWrite: value
retry:
retries: value
backoffDelay: value
maxBackoffDelay: value
exceptionHandler: value
Java dependency management¶
Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.
<dependency>
<groupId>org.voltdb</groupId>
<artifactId>volt-stream-plugin-volt-api</artifactId>
<version>1.7.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-volt-api', version: '1.7.0'
Properties¶
voltClientResource¶
Client resource reference to be used when connecting to VoltDb cluster Required.
Type: object
procedureName¶
Called stored procedure name
Type: string
Default value: @AdHoc
sql¶
SQL query executed by @AdHoc procedure
Type: string
paramsMapper¶
Given the input event this mapper returns parameters for named stored procedure or SQL query Required.
Type: object
keyMapper¶
A function that determines how the cache key is computed from the input. The returned Object must properly implement equals() and hashCode() contract to ensure correct cache behavior. If no custom key function is provided, the cache key will be generated automatically based on the output of paramsMapper and, if applicable, the SQL query.
Type: object
responseMapper¶
The function maps VoltDB response to an cacheable object. The object is cached for Object[] key returned from #paramsMapper. This function must accept empty VoltTable result for scenarios when given params cannot be mapped to valid response, for such cases this method can return a fallback value and this value will be cached. This function must return a value, but only non-null value will be cached.
Required.
Type: object
computeFunction¶
The function computes result event for next operator in pipeline Required.
Type: object
maximumSize¶
The maximum size of the cache
Type: number
Default value: 100000
expireAfterWrite¶
Each entry should be automatically removed from the cache once a this duration has elapsed after the entry's creation, or the most recent replacement of its value.
Type: object
Default value: PT10M
retry¶
Configuration for retrying failed operations, including the number of retries and backoff delays.
Type: object
Fields of retry:
retry.retries¶
Number of retry attempts after a request failure.
Type: number
Default value: 3
retry.backoffDelay¶
Initial delay before the first retry attempt.
Type: object
Default value: PT0.2S
retry.maxBackoffDelay¶
Maximum delay between consecutive retry attempts.
Type: object
Default value: PT3S
exceptionHandler¶
A custom exception handler enabling interception of all errors related to this processor.
Type: object
JSON Schema¶
You can validate or explore the configuration using its JSON Schema.
Usage Examples¶
// if resource is configured in yaml
voltStreamBuilder.configureResource("primary-cluster", VoltDBResourceConfigBuilder.class);
// or
voltStreamBuilder.configureResource("primary-cluster",
VoltDBResourceConfigBuilder.class,
new Consumer<VoltDBResourceConfigBuilder>() {
@Override
public void consume(VoltDBResourceConfigBuilder configurator) {
configurator
.addToServers("localhost", 12122)
.withClientBuilder(builder -> {
builder.withMaxOutstandingTransactions(42000);
builder.withMaxTransactionsPerSecond(23);
builder.withRequestTimeout(Duration.ofSeconds(5));
builder.withAuthBuilder(authBuilder -> authBuilder
.withUsername("admin")
.withPassword("admin123"));
builder.withSslBuilder(sslBuilder -> sslBuilder
.withTrustStoreFile("c:/Users32/trust.me")
.withTrustStorePassword("got2have"));
builder.withRetryBuilder(retryBuilder -> retryBuilder
.withRetries(4)
.withBackoffDelay(Duration.ofSeconds(2))
.withMaxBackoffDelay(Duration.ofSeconds(11)));
})
}
});
voltStreamBuilder
.consumeFromSource(...)
.processWith(VoltProcedureCacheProcessorConfigBuilder.<Integer, String, EnhancedEvent>builder()
.withVoltClientResourceName("primary-cluster")
.withSql("SELECT * FROM events WHERE id=?")
.withParamsMapper(i -> new Object[]{i})
.withResponseMapper((voltResponse, i) -> {
if (voltResponse != null && voltResponse.advanceRow()) {
return voltResponse.getString("name");
} else {
return computeDefault(i);
}
})
.withComputeFunction((i, cached) -> {
return new EnhancedEvent(i, cached);
})
.withMaximumSize(100000)
.withExpireAfterWrite(Duration.ofMinutes(10))
.withRetryBuilder(retryBuilder -> retryBuilder
.withRetries(4)
.withBackoffDelay(Duration.ofSeconds(2))
.withMaxBackoffDelay(Duration.ofSeconds(11)))
)
.processWith((EnhancedEvent event) -> {...})
private String computeDefault(int i) {...}
public record EnhancedEvent(int id, String name) {}
resources:
- name: primary-cluster
voltdb-client:
servers: localhost:12122
client:
maxTransactionsPerSecond: 3000
maxOutstandingTransactions: 3000
requestTimeout: PT10S
auth:
user: Admin
password: 2r2Ffafw3V
trustStore:
file: file.pem
password: got2have
processor:
voltdb-cache:
voltClientResource: primary-cluster
procedureName: "getEvent"
maximumSize: 100000
expireAfterWrite: PT10S
server: %s