Skip to content

Pipeline Configuration

Pipelines in VoltSp can be defined either directly in Java or through YAML definition files. This section demonstrates both approaches and explains how to configure, parameterize, and externalize runtime settings for different environments.


Basic Example

Java Definition

public class BasicPipeline implements VoltPipeline {

    @Override
    public void define(VoltStreamBuilder builder) {
        builder
            .name("Elements to Blackhole")
            .consumeFromSource(Sources.collection("one", "two"))
            .terminateWithSink(Sinks.blackhole());
    }
}

Equivalent YAML Definition

version: 1
name: "Elements to Blackhole"

source:
  collection:
    elements:
      - "one"
      - "two"

sink:
  blackhole: {}

Although simple, this pipeline is fully functional. However, it hardcodes its input data, which makes it unsuitable for deployment across different environments. To address this, let’s consider a more flexible example.


Example: HTTP Sink Pipeline

public class BasicPipeline implements VoltPipeline {

    @Override
    public void define(VoltStreamBuilder builder) {
        builder
            .withName("Send HTTP Requests")
            .consumeFromSource(Sources.collection("one", "two"))
            .processWith(element -> new HttpUpdateRequest("/process", HttpMethod.POST, ...))
            .terminateWithSink(HttpSinkConfigBuilder.builder()
                .withHttpClientBuilder(builder -> builder
                    .withAddress("localhost", 9990)
                    .withConnectTimeout(Duration.ofSeconds(10))
                    .withRequestTimeout(Duration.ofSeconds(1))
                    .withMinimalProtocol(HttpProtocol.HTTP_11)
                    .withMaxContentSize(MemoryUnit.ofKiloBytes(100))
                    .withSocketOptions(Map.of("SO_SNDBUF", "10240"))
                )
                .withExceptionHandler(this::handleExceptions));
    }

    private void handleExceptions(...) {}
}

This pipeline sends HTTP POST requests for each element in the input collection. However, several configuration values depend on the environment:

  • Input data
  • Remote host and port
  • Connection and request timeouts
  • Maximum content size
  • Socket options

Configuration Options

VoltSp supports configuration through command-line arguments, Docker mounts, or Helm values.

Command-Line Interface

For standalone (bare-metal) deployments, properties can be supplied using:

  • -c / --config – path to a YAML configuration file
  • -cs / --configSecure – path to a secure YAML file (for sensitive data)
  • --system-property – key-value pairs, e.g.
--system-property KEY1=value1 --system-property KEY2=value2

πŸ’‘ Note String fields defined in configSecure.yaml are treated as sensitive. Their values are never logged and are suitable for storing passwords or tokens.


Running with Docker

Docker uses the same CLI interface. Expected mount locations:

  • /etc/voltsp/configuration.yaml β€” standard configuration
  • /etc/voltsp/configurationSecure.yaml β€” secure configuration
  • System properties can be provided via the JAVA_OPTS environment variable.

Helm Deployment

VoltSp can also be configured via Helm values:

streaming:
  pipeline:
    configuration:
    configurationSecure:
  javaProperties:
    key: value

Example: Externalized Configuration

Let's rewrite previous example to use configuration file Configuration file:

elements:
  - "one"
  - "two"

http:
  host: "localhost"
  port: 9990
  connectionTimeout: 10
  requestTimeout: 1
  maxContentSize: 100
  socket:
    sendBufferSize: 10240

Pipeline definition:

public class BasicPipeline implements VoltPipeline {

    @Override
    public void define(VoltStreamBuilder builder) {
        ConfigurationContext config = stream.getExecutionContext().configurator();
        List<String> elements = config.findByPath("elements").asList(String.class);
        String host = config.findByPath("http.host").orElse("localhost");
        int port = config.findByPath("http.port").orElse(9990);
        int connectTimeout = config.findByPath("http.connectionTimeout").orElse(10);
        int requestTimeout = config.findByPath("http.requestTimeout").orElse(1);
        int maxContentSize = config.findByPath("http.maxContentSize").orElse(100);
        int sendBufferSize = config.findByPath("http.socket.sendBufferSize").asInt();

        builder
            .withName("Send HTTP Requests")
            .consumeFromSource(Sources.collection(elements))
            .processWith(element -> new HttpUpdateRequest("/process", HttpMethod.POST, ...))
            .terminateWithSink(HttpSinkConfigBuilder.builder()
                .withHttpClientBuilder(b -> b
                    .withAddress(host, port)
                    .withConnectTimeout(Duration.ofSeconds(connectTimeout))
                    .withRequestTimeout(Duration.ofSeconds(requestTimeout))
                    .withMinimalProtocol(HttpProtocol.HTTP_11)
                    .withMaxContentSize(MemoryUnit.ofKiloBytes(maxContentSize))
                    .withSocketOptions(Map.of("SO_SNDBUF", Integer.toString(sendBufferSize)))
                )
                .withExceptionHandler(this::handleExceptions));
    }

    private void handleExceptions(...) {}
}

This approach allows the same pipeline code to be reused across test and production environments simply by replacing the configuration file. However, it can be verbose for larger setups.


Operator Builders

VoltSp provides specialized builder classes for most built-in operators. These simplify configuration by automatically mapping configuration paths and providing sensible defaults.

For example, the HTTP sink can be configured through HttpSinkConfigBuilder, which automatically maps fields from configuration files.

Configuration:

elements:
  - "one"
  - "two"

sink:
  http:
    httpClient:
      address: "localhost:9990"
      minimalProtocol: HTTP_11
      connectTimeout: 10s
      requestTimeout: 1s
      maxContentSize: 100kb
      socket:
        SO_SNDBUF: "10240"
        SO_KEEPALIVE: "true"
    headers:
      Content-Type: application/json

Pipeline:

public class BasicPipeline implements VoltPipeline {

    @Override
    public void define(VoltStreamBuilder builder) {
        ConfigurationContext config = stream.getExecutionContext().configurator();
        List<String> elements = config.findByPath("elements").asList(String.class);

        builder
            .withName("Send HTTP Requests")
            .consumeFromSource(Sources.collection(elements))
            .processWith(element -> new HttpUpdateRequest("/process", HttpMethod.POST, ...))
            .terminateWithSink(HttpSinkConfigBuilder.builder()
                .withExceptionHandler(this::handleExceptions));
    }

    private void handleExceptions(...) {}
}

πŸ’‘ Note Not all operators support builder-based configuration, and not all fields can be defined externally.


Configuration Interpolation

VoltSp supports dynamic configuration values using interpolation.

Supported sources:

  • Environment variables β€” ${env:VAR} or ${env:VAR:DEFAULT}
  • System properties β€” ${VAR} or ${VAR:DEFAULT} (set with -DVAR=value)
  • Escaped literals β€” ${{ not ${interpolated} string }} β†’ not ${interpolated} string
  • Prefix/suffix substitution β€” e.g. us-${REGION}-1 β†’ us-east-1 if REGION=east

Secret Values

Secrets can be marked explicitly:

  • ${secret:env:VAR}
  • ${secret:VAR:DEFAULT}

πŸ’‘ Note Secret values cannot be combined with prefix/suffix interpolation. They are never logged or exposed in plaintext.

Configuration:

elements:
  - "one"
  - "two"

sink:
  http:
    httpClient:
      address: "${env:HOST:localhost}:${env:PORT:9990}"
      minimalProtocol: HTTP_11
      connectTimeout: 10s
      requestTimeout: 1s
      maxContentSize: 100kb
      socket:
        SO_SNDBUF: "10240"
        SO_KEEPALIVE: "true"
    headers:
      Content-Type: application/json

AWS Secret Manager Support

Interpolation service can fetch secrets from AWS Secret Manager. By default, the secret manager is configured with the AWS default credentials provider chain see https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials-chain.html . It can also be configured with custom env variables: AWS_SECRET_MANAGER_ACCESS_KEY, AWS_SECRET_MANAGER_SECRET_KEY. If additional environment variable or java properties are set, the secret manager will override endpoint with AWS_SECRET_MANAGER_ENDPOINT variable and region with AWS_SECRET_MANAGER_REGION or AWS_DEFAULT_REGION. The default for region is Region.US_EAST_1.

If secret manager fails to configure itself it will fall back to validation mode and fail only if aws secret is requested.

The aws secret resolver can be combined with secret resolver, but all string values returned from aws secret manager are treated as secrets and are already guarded.

Configuration:

elements:
  - "one"
  - "two"

other:
  service:
    key: "id:${aws:key-name:key-default}"
    password: "${secret:aws:secret-name:}"
    #password: "${aws:secret-name:}" - has same effect

sink:
  http:
    httpClient:
      address: "${env:HOST:localhost}:${env:PORT:9990}"
      minimalProtocol: HTTP_11
      connectTimeout: 10s
      requestTimeout: 1s
      maxContentSize: 100kb
      socket:
        SO_SNDBUF: "10240"
        SO_KEEPALIVE: "true"
    headers:
      Content-Type: application/json

Configuration vs Pipeline definition in Yaml

Although YAML-based configuration and YAML pipeline definitions share some structural similarities, they serve different purposes within VoltSp.

  • YAML Pipeline Definition (also referred to as the YAML API) defines the structure and behavior of the pipeline itself. It requires a strict schema and supports only operators that provide corresponding builder classes.
  • YAML Configuration files, on the other hand, are used to parameterize a pipeline β€” allowing values such as host, port, or timeouts to be defined externally.

In summary, the YAML API focuses on pipeline definition (similar to the Java API), while configuration files provide runtime properties that any pipeline β€” regardless of how it is defined β€” can read and apply.

Example:

Yaml Definition file

version: 1
name: "Elements to File"
source:
  collection:
    elements:
    - "one"
    - "two"
sink:
  file:
    dirPath: ${outputDir}

Yaml Configuration file

sink:
  file:
    delimiter: ";;"

Interpolation works also in yaml definition files. Both files are used to configure file sink operator.