Pipeline Configuration¶
Pipelines in VoltSp can be defined either directly in Java or through YAML definition files. This section demonstrates both approaches and explains how to configure, parameterize, and externalize runtime settings for different environments.
Basic Example¶
Java Definition¶
public class BasicPipeline implements VoltPipeline {
@Override
public void define(VoltStreamBuilder builder) {
builder
.name("Elements to Blackhole")
.consumeFromSource(Sources.collection("one", "two"))
.terminateWithSink(Sinks.blackhole());
}
}
Equivalent YAML Definition¶
version: 1
name: "Elements to Blackhole"
source:
collection:
elements:
- "one"
- "two"
sink:
blackhole: {}
Although simple, this pipeline is fully functional. However, it hardcodes its input data, which makes it unsuitable for deployment across different environments. To address this, letβs consider a more flexible example.
Example: HTTP Sink Pipeline¶
public class BasicPipeline implements VoltPipeline {
@Override
public void define(VoltStreamBuilder builder) {
builder
.withName("Send HTTP Requests")
.consumeFromSource(Sources.collection("one", "two"))
.processWith(element -> new HttpUpdateRequest("/process", HttpMethod.POST, ...))
.terminateWithSink(HttpSinkConfigBuilder.builder()
.withHttpClientBuilder(builder -> builder
.withAddress("localhost", 9990)
.withConnectTimeout(Duration.ofSeconds(10))
.withRequestTimeout(Duration.ofSeconds(1))
.withMinimalProtocol(HttpProtocol.HTTP_11)
.withMaxContentSize(MemoryUnit.ofKiloBytes(100))
.withSocketOptions(Map.of("SO_SNDBUF", "10240"))
)
.withExceptionHandler(this::handleExceptions));
}
private void handleExceptions(...) {}
}
This pipeline sends HTTP POST requests for each element in the input collection. However, several configuration values depend on the environment:
- Input data
- Remote host and port
- Connection and request timeouts
- Maximum content size
- Socket options
Configuration Options¶
VoltSp supports configuration through command-line arguments, Docker mounts, or Helm values.
Command-Line Interface¶
For standalone (bare-metal) deployments, properties can be supplied using:
-c/--configβ path to a YAML configuration file-cs/--configSecureβ path to a secure YAML file (for sensitive data)--system-propertyβ key-value pairs, e.g.
--system-property KEY1=value1 --system-property KEY2=value2
π‘ Note String fields defined in
configSecure.yamlare treated as sensitive. Their values are never logged and are suitable for storing passwords or tokens.
Running with Docker¶
Docker uses the same CLI interface. Expected mount locations:
/etc/voltsp/configuration.yamlβ standard configuration/etc/voltsp/configurationSecure.yamlβ secure configuration- System properties can be provided via the
JAVA_OPTSenvironment variable.
Helm Deployment¶
VoltSp can also be configured via Helm values:
streaming:
pipeline:
configuration:
configurationSecure:
javaProperties:
key: value
Example: Externalized Configuration¶
Let's rewrite previous example to use configuration file Configuration file:
elements:
- "one"
- "two"
http:
host: "localhost"
port: 9990
connectionTimeout: 10
requestTimeout: 1
maxContentSize: 100
socket:
sendBufferSize: 10240
Pipeline definition:
public class BasicPipeline implements VoltPipeline {
@Override
public void define(VoltStreamBuilder builder) {
ConfigurationContext config = stream.getExecutionContext().configurator();
List<String> elements = config.findByPath("elements").asList(String.class);
String host = config.findByPath("http.host").orElse("localhost");
int port = config.findByPath("http.port").orElse(9990);
int connectTimeout = config.findByPath("http.connectionTimeout").orElse(10);
int requestTimeout = config.findByPath("http.requestTimeout").orElse(1);
int maxContentSize = config.findByPath("http.maxContentSize").orElse(100);
int sendBufferSize = config.findByPath("http.socket.sendBufferSize").asInt();
builder
.withName("Send HTTP Requests")
.consumeFromSource(Sources.collection(elements))
.processWith(element -> new HttpUpdateRequest("/process", HttpMethod.POST, ...))
.terminateWithSink(HttpSinkConfigBuilder.builder()
.withHttpClientBuilder(b -> b
.withAddress(host, port)
.withConnectTimeout(Duration.ofSeconds(connectTimeout))
.withRequestTimeout(Duration.ofSeconds(requestTimeout))
.withMinimalProtocol(HttpProtocol.HTTP_11)
.withMaxContentSize(MemoryUnit.ofKiloBytes(maxContentSize))
.withSocketOptions(Map.of("SO_SNDBUF", Integer.toString(sendBufferSize)))
)
.withExceptionHandler(this::handleExceptions));
}
private void handleExceptions(...) {}
}
This approach allows the same pipeline code to be reused across test and production environments simply by replacing the configuration file. However, it can be verbose for larger setups.
Operator Builders¶
VoltSp provides specialized builder classes for most built-in operators. These simplify configuration by automatically mapping configuration paths and providing sensible defaults.
For example, the HTTP sink can be configured through HttpSinkConfigBuilder, which automatically maps fields from configuration files.
Configuration:
elements:
- "one"
- "two"
sink:
http:
httpClient:
address: "localhost:9990"
minimalProtocol: HTTP_11
connectTimeout: 10s
requestTimeout: 1s
maxContentSize: 100kb
socket:
SO_SNDBUF: "10240"
SO_KEEPALIVE: "true"
headers:
Content-Type: application/json
Pipeline:
public class BasicPipeline implements VoltPipeline {
@Override
public void define(VoltStreamBuilder builder) {
ConfigurationContext config = stream.getExecutionContext().configurator();
List<String> elements = config.findByPath("elements").asList(String.class);
builder
.withName("Send HTTP Requests")
.consumeFromSource(Sources.collection(elements))
.processWith(element -> new HttpUpdateRequest("/process", HttpMethod.POST, ...))
.terminateWithSink(HttpSinkConfigBuilder.builder()
.withExceptionHandler(this::handleExceptions));
}
private void handleExceptions(...) {}
}
π‘ Note Not all operators support builder-based configuration, and not all fields can be defined externally.
Configuration Interpolation¶
VoltSp supports dynamic configuration values using interpolation.
Supported sources:
- Environment variables β
${env:VAR}or${env:VAR:DEFAULT} - System properties β
${VAR}or${VAR:DEFAULT}(set with-DVAR=value) - Escaped literals β
${{ not ${interpolated} string }}βnot ${interpolated} string - Prefix/suffix substitution β e.g.
us-${REGION}-1βus-east-1ifREGION=east
Secret Values¶
Secrets can be marked explicitly:
${secret:env:VAR}${secret:VAR:DEFAULT}
π‘ Note Secret values cannot be combined with prefix/suffix interpolation. They are never logged or exposed in plaintext.
Configuration:
elements:
- "one"
- "two"
sink:
http:
httpClient:
address: "${env:HOST:localhost}:${env:PORT:9990}"
minimalProtocol: HTTP_11
connectTimeout: 10s
requestTimeout: 1s
maxContentSize: 100kb
socket:
SO_SNDBUF: "10240"
SO_KEEPALIVE: "true"
headers:
Content-Type: application/json
AWS Secret Manager Support¶
Interpolation service can fetch secrets from AWS Secret Manager.
By default, the secret manager is configured with the AWS default credentials provider chain see https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials-chain.html .
It can also be configured with custom env variables: AWS_SECRET_MANAGER_ACCESS_KEY, AWS_SECRET_MANAGER_SECRET_KEY.
If additional environment variable or java properties are set, the secret manager will override endpoint with AWS_SECRET_MANAGER_ENDPOINT variable and region with AWS_SECRET_MANAGER_REGION or AWS_DEFAULT_REGION.
The default for region is Region.US_EAST_1.
If secret manager fails to configure itself it will fall back to validation mode and fail only if aws secret is requested.
The aws secret resolver can be combined with secret resolver, but all string values returned from aws secret manager are treated as secrets and are already guarded.
Configuration:
elements:
- "one"
- "two"
other:
service:
key: "id:${aws:key-name:key-default}"
password: "${secret:aws:secret-name:}"
#password: "${aws:secret-name:}" - has same effect
sink:
http:
httpClient:
address: "${env:HOST:localhost}:${env:PORT:9990}"
minimalProtocol: HTTP_11
connectTimeout: 10s
requestTimeout: 1s
maxContentSize: 100kb
socket:
SO_SNDBUF: "10240"
SO_KEEPALIVE: "true"
headers:
Content-Type: application/json
Configuration vs Pipeline definition in Yaml¶
Although YAML-based configuration and YAML pipeline definitions share some structural similarities, they serve different purposes within VoltSp.
- YAML Pipeline Definition (also referred to as the YAML API) defines the structure and behavior of the pipeline itself. It requires a strict schema and supports only operators that provide corresponding builder classes.
- YAML Configuration files, on the other hand, are used to parameterize a pipeline β allowing values such as host, port, or timeouts to be defined externally.
In summary, the YAML API focuses on pipeline definition (similar to the Java API), while configuration files provide runtime properties that any pipeline β regardless of how it is defined β can read and apply.
Example:
Yaml Definition file
version: 1
name: "Elements to File"
source:
collection:
elements:
- "one"
- "two"
sink:
file:
dirPath: ${outputDir}
Yaml Configuration file
sink:
file:
delimiter: ";;"
Interpolation works also in yaml definition files. Both files are used to configure file sink operator.