Resources¶
Resources represent integrations with external systems or shared clients that your pipeline needs during processing. They encapsulate connection management, credentials, and reusable clients (for example, HTTP client, VoltDB client, S3 client).
A single resource may be shared by all workers or created per worker thread depending on the resource implementation.
- Typical examples:
- HTTP client for calling a REST service
- VoltDB client connection pool used by processors or emitters
- AWS S3 client
Resources can be declared and named separately:
public class VoltDbResourceExample implements VoltPipeline {
@Override
public void define(VoltStreamBuilder stream) {
// Configure the resource (name must match what processors reference)
stream.configureResource("voltdb-primary", VoltDBResourceConfigBuilder.class, cfg ->
cfg.withServers(...)
);
version: 1
name: VoltDbClientResourceExample
resources:
- name: voltdb-primary
voltdb-client:
servers: localhost:21212
These can later be used by various components:
public class VoltDbResourceExample implements VoltPipeline {
@Override
public void define(VoltStreamBuilder stream) {
...
// Use the resource from a processor (example: VoltDB cache)
stream
.consumeFromSource(Sources.collection(1, 2, 3))
.processWith(VoltProcedureCacheConfigBuilder.<Integer>builder()
.withVoltClientResourceRef("voltdb-primary")
.withProcedureName("getEvent")
.withProcParamsMapper(i -> new Object[]{ i })
)
.terminateWithSink(Sinks.stdout());
}
}
...
processors:
- voltdb-cache:
voltClientResourceRef: voltdb-primary
procedureName: "getEvent"
expireAfterWrite: PT10M
If a resource is used by exactly one operator, you can configure it inline inside that operator configuration instead of declaring it under resources.
Example of inlining a resource inside a sink:
public class VoltDbResourceExample implements VoltPipeline {
@Override
public void define(VoltStreamBuilder stream) {
...
// Use the inllined resource in the sink (example: VoltDB procedure sink)
stream
.consumeFromSource(Sources.collection(1, 2, 3))
.processWith(param -> VoltRequest.createWithParameters("TABLE_NAME", param))
.terminateWithSink(
ProcedureVoltSinkConfigBuilder.builder()
.withVoltClientResourceBuilder(
VoltDBResourceConfigBuilder.builder()
.addServers(HostAndPort.fromParts("localhost", 12122))
)
);
}
}
...
sink:
voltdb-procedure:
voltClientResource:
servers: localhost:12122