YAML API for Defining Pipelines¶
VoltSP provides a declarative YAML configuration language for defining streaming data pipelines without writing Java code. This document describes the structure and options available in the YAML configuration format.
Documentation for VoltSP operators such as sinks, processors, sources, etc. is equally applicable to Java as well as YAML pipelines, and includes examples for both.
Schema support¶
When writing YAML pipelines, it is recommended to use the official VoltSP schema. It will drive your favorite IDE's autocomplete, provide syntax hints and validation errors. Schema contains all possible components and their configuration options. It also contains inline documentation and lists valid values for each field.
You can reference the schema using the "$schema" field:
$schema: "https://docs.voltdb.com/VoltSP/schemas/voltsp1.7.0.json"
or manually associate the document with that schema in your IDE.
Basic Structure¶
A VoltSP pipeline configuration requires the following main sections:
version: 1 # Required: Configuration version (must be 1)
name: "pipeline-name" # Required: Pipeline name
source: { } # Required: Source configuration
processors: { } # Optional: Processing steps to apply
sink: { } # Required: Sink configuration
logging: { } # Optional: allows configuring logging levels for the system ans well as specific loggers
Configuration Sections¶
Version¶
Must be 1. This field is required.
version: 1
Name¶
Pipeline name that will be visible in the logs as well as metrics. This field is required.
name: "my-pipeline"
Source Configuration¶
The source section defines where the pipeline gets its data. You must specify exactly one source. All sources
available to the Java DSL are supported.
Each source type has its own configuration parameters.
Processing Configuration¶
The processors section defines an array of data transformations that should be applied to the data delivered by the
source.
Sink Configuration¶
The sink section defines where the pipeline outputs its data. You must specify exactly one sink type. All sinks
available to the Java DSL are supported.
Calling code snippets from operator declaration¶
Some configuration options in VoltSP accept Function types, allowing you to provide custom logic for data extraction
or transformation. There are two ways to provide such logic: Java method references or JavaScript/Python code snippets.
Java method references¶
The most straightforward way to declare a function is using a Java method reference with the syntax
fully.qualified.ClassName::methodName.
For example, if an operator expects:
Function<E, Long> idExtractor;
version: 1
name: TestPipeline
source:
decodingSource:
idExtractor: org.acme.Event::id
...
Instance methods on the input object¶
When the method is a no-argument instance method, VoltSP will call it directly on the input object. This works well with Java records:
public record Event(long id, String name) {}
idExtractor: org.acme.Event::id
nameExtractor: org.acme.Event::name
Instance methods on a factory class¶
The method can also accept a parameter. In this case, VoltSP will instantiate the class and call the method with the input object:
version: 1
name: TestPipeline
source:
decodingSource:
idExtractor: org.acme.Factory::getId
...
public class Factory {
public long getId(Event event) {
return event.id();
}
}
Important requirements for factory classes:
- The class must not be abstract
- The class must have a public no-argument constructor
- The constructor must not throw exceptions
Static methods¶
Methods can also be static, which avoids instantiation requirements:
public class Factory {
public static Long getStaticId(Event event) {
return event.id();
}
}
idExtractor: org.acme.Factory::getStaticId
Static no-argument methods are also supported – the input object will be ignored:
public class Constants {
public static String getDefaultName() {
return "default";
}
}
Method resolution rules¶
- The method name must be unambiguous - if the class has multiple methods with the same name (overloaded), configuration will fail
- The method must be public
- Whitespace around the class name and method name is trimmed automatically
Common errors¶
| Error | Cause |
|---|---|
| "Java class 'X' could not be found on class path" | The fully qualified class name is incorrect or the class is not available |
| "Java class 'X' does not have method 'Y'" | The method name is misspelled or doesn't exist |
| "Ambiguous Java method name" | The class has multiple methods with the same name; rename or use a wrapper |
| "Class 'X' cannot be instantiated (abstract?)" | Non-static method on an abstract class |
| "Class 'X' does not have a public no argument constructor" | The class has only private/protected constructors or requires arguments |
| "The no argument constructor of class 'X' has thrown an exception" | The constructor threw an exception during instantiation |
Note: All Java method reference errors are detected during the configuration phase at startup. If any error occurs, VoltSP will fail to start and report the specific configuration issue.
JavaScript and Python code execution¶
If the value does not match the Java method reference pattern (ClassName::methodName), it will be passed to the
JavaScript engine for evaluation. This allows inline scripting:
version: 1
name: TestPipeline
source:
decodingSource:
idExtractor: |
function process(input) {
return input.id();
}
...
The code snippet is evaluated once during the configuration phase.
Note: Syntax errors in JavaScript or Python code snippets are detected during the configuration phase at startup. If the script contains syntax errors, VoltSP will fail to start and report the parsing error.
Source Types¶
Some examples of simple source configurations.
File Source¶
Reads data from a file:
source:
file:
path: "input.txt" # Required: Path to input file
Stdin Source¶
Reads data from standard input:
source:
stdin: { }
Collection Source¶
Reads from a static collection of strings:
source:
collection:
elements: # Required: Array of strings
- "element1"
- "element2"
Network Source¶
Reads from network:
source:
network:
type: "UDP" # Required: UDP or TCP
address: "0.0.0.0:12345" # Required: Port number or address:port
decoder: "lines" # Required: Decoder type (none/identity/line/bytes)
Sink Types¶
Kafka Sink¶
Outputs to a Kafka topic:
sink:
kafka:
bootstrapServers: "localhost:9092"
topicName: "test"
keySerializer: org.apache.kafka.common.serialization.StringSerializer
valueSerializer: org.apache.kafka.common.serialization.StringSerializer
Processor Types¶
Processors in YAML configs are predefined transformations that just like sinks and sources have their own configuration.
Some ot them allow executing arbitrary code - javascript, python, or ruby, some provide specific functionality
like cache, or AI inference.
processors:
- javascript:
script: |
function process(input) {
var VoltRequest = Java.type('org.voltdb.stream.plugin.volt.api.VoltRequest');
new VoltRequest(input, ["3", "4"]);
}
- voltdb-cache:
voltClientResource: "us-east-cluster"
Resources¶
Resources represent integrations with external systems or shared clients that your pipeline needs during processing. They encapsulate connection management, credentials, and reusable clients (for example, HTTP client, VoltDB client, S3 client).
resources:
- name: voltdb
voltdb-client:
servers:
- voltdb-us-central:21212
# ...
sink:
voltdb-procedure:
voltClientResource: "voltdb"
Logging¶
The logging section allows you to configure log levels for the entire system and specific loggers. Both globalLevel
and loggers are optional.
The logger names and level naming follow the log4j2 conventions.
globalLevel- Sets the root log level for the entire application. Valid values are:TRACE,DEBUG,INFO,WARN,ERROR,FATAL,OFF,ALLloggers- A map of logger names to their specific log levels. This allows fine-grained control over logging for specific packages or classes.
Warning: YAML defined logging configuration is applied only after the pipeline starts. To influence the startup time logging of the core system, use CLI/Helm parameters directly.
version: 1
name: "logging-test"
source:
collection:
elements:
- "Hello world!"
sink:
stdout: { }
logging:
globalLevel: "TRACE"
loggers:
"org.voltdb": "DEBUG"
Complete Examples¶
Simple File Processing Pipeline¶
$schema: "https://docs.voltdb.com/VoltSP/schemas/voltsp1.7.0.json"
version: 1
name: "file-processor"
source:
file:
path: "input.txt"
processors:
- javascript:
script: |
function process(input) {
if (typeof input === 'string') {
return input.toUpperCase();
}
return input;
}
sink:
file:
dirPath: "/tmp"
Kafka to VoltDB Pipeline¶
$schema: "https://docs.voltdb.com/VoltSP/schemas/voltsp1.7.0.json"
version: 1
name: "kafka-to-voltdb"
resources:
- name: voltdb
voltdb-client:
servers:
- voltdb-us-central:21212
source:
kafka:
bootstrapServers:
- "kafka1:9092"
- "kafka2:9092"
topicNames:
- "incoming-data"
groupId: "processor-group"
startingOffset: latest
keyDeserializer: org.apache.kafka.common.serialization.StringSerializer
valueDeserializer: org.apache.kafka.common.serialization.StringSerializer
processors:
- javascript:
script: |
var VoltProcedureRequest = Java.type('org.voltdb.stream.plugin.volt.api.VoltProcedureRequest');
function process(input) {
// Transform message
JSON.parse(message);
return new VoltProcedureRequest("ProcessFrame", ...);
}
sink:
voltdb-procedure:
voltClientResource: "voltdb"
Network to Network Pipeline¶
$schema: "https://docs.voltdb.com/VoltSP/schemas/voltsp1.7.0.json"
version: 1
name: "network-relay"
source:
network:
type: udp
address: "0.0.0.0:12345"
decoder: "lines"
processors: [ ]
sink:
network:
type: udp
address: "target-host:54321"
Testing pipelines¶
Users are encouraged to write Java tests harnesses that verify the correctness of pipelines whether they are defined
using Java code or YAML. The volt-stream-testcontainer module provides tools for setting up runtime environment of a
pipeline and running them against a docker image of VoltSP.