Skip to content

Resources

Resources represent integrations with external systems or shared clients that your pipeline needs during processing. They encapsulate connection management, credentials, and reusable clients (for example, HTTP client, VoltDB client, S3 client).

A single resource may be shared by all workers or created per worker thread depending on the resource implementation.

  • Typical examples:
    • HTTP client for calling a REST service
    • VoltDB client connection pool used by processors or emitters
    • AWS S3 client

Resources can be declared and named separately:

    public class VoltDbResourceExample implements VoltPipeline {
        @Override
        public void define(VoltStreamBuilder stream) {
            // Configure the resource (name must match what processors reference)
            stream.configureResource("voltdb-primary", VoltDBResourceConfigBuilder.class, cfg ->
                cfg.withServers(...)
            );
version: 1
name: VoltDbClientResourceExample

    resources:
        - name: voltdb-primary
          voltdb-client:
              servers: localhost:21212

These can later be used by various components:

    public class VoltDbResourceExample implements VoltPipeline {
        @Override
        public void define(VoltStreamBuilder stream) {
            ...
            // Use the resource from a processor (example: VoltDB cache)
            stream
                .consumeFromSource(Sources.collection(1, 2, 3))
                .processWith(VoltProcedureCacheConfigBuilder.<Integer>builder()
                        .withVoltClientResourceRef("voltdb-primary")
                        .withProcedureName("getEvent")
                        .withProcParamsMapper(i -> new Object[]{ i })
                )
                .terminateWithSink(Sinks.stdout());
        }
    }
    ... 

    processors:
      - voltdb-cache:
          voltClientResourceRef: voltdb-primary
          procedureName: "getEvent"
          expireAfterWrite: PT10M

If a resource is used by exactly one operator, you can configure it inline inside that operator configuration instead of declaring it under resources.

Example of inlining a resource inside a sink:

    public class VoltDbResourceExample implements VoltPipeline {
        @Override
        public void define(VoltStreamBuilder stream) {
            ...
            // Use the inllined resource in the sink (example: VoltDB procedure sink)
             stream
                .consumeFromSource(Sources.collection(1, 2, 3))
                .processWith(param -> VoltRequest.createWithParameters("TABLE_NAME", param))
                .terminateWithSink(
                        ProcedureVoltSinkConfigBuilder.builder()
                                .withVoltClientResourceBuilder(
                                        VoltDBResourceConfigBuilder.builder()
                                                .addServers(HostAndPort.fromParts("localhost", 12122))
                                )
                );
        }
    }
    ...

    sink:
      voltdb-procedure:
        voltClientResource:
          servers: localhost:12122