YAML API for Defining Pipelines¶
VoltSP provides a declarative YAML configuration language for defining streaming data pipelines without writing Java code. This document describes the structure and options available in the YAML configuration format.
Choosing Between YAML and Java for Pipeline Definition¶
While defining execution steps in YAML can be simpler and more concise, it is recommended to implement complex pipelines in Java for better performance, maintainability, and reliability.
Key advantages of using Java include:
- Compile-time validation – Java is a compiled language, allowing errors to be detected before execution.
- Modular structure – Source code can be organized into multiple files, improving readability and long-term maintainability.
- Local testing – Pipelines can be executed and verified locally before deployment to the target environment.
- Direct JVM execution – Logic defined in YAML is ultimately interpreted on top of the Java Virtual Machine, which can introduce performance overhead compared to native Java implementations.
Basic Structure¶
A VoltSP pipeline configuration requires the following main sections:
version: 1 # Required: Configuration version (must be 1)
name: "pipeline-name" # Required: Pipeline name
source: { } # Required: Source configuration
processors: { } # Optional: Processing steps to apply
sink: { } # Required: Sink configuration
logging: { } # Optional: Logging configuration
Schema¶
JSON schema for the pipeline configuration, including all supported sections and components. Import it into your favourite IDE for syntax suggestions and validation.
Configuration Sections¶
Version¶
Must be 1. This field is required.
version: 1
Name¶
Pipeline name that will be visible in the logs as well as metrics. This field is required.
name: "my-pipeline"
Source Configuration¶
The source section defines where the pipeline gets its data. You must specify exactly one source. All sources
available to the Java DSL are supported.
Each source type has its own configuration parameters.
Processing Configuration¶
The processors section defines an array of data transformations that should be applied to data from source.
Sink Configuration¶
The sink section defines where the pipeline outputs its data. You must specify exactly one sink type. All sinks
available to the Java DSL are supported.
Source Types¶
Some examples of simple source configurations.
File Source¶
Reads data from a file:
source:
file:
path: "input.txt" # Required: Path to input file
Stdin Source¶
Reads data from standard input:
source:
stdin: { }
Collection Source¶
Reads from a static collection of strings:
source:
collection:
elements: # Required: Array of strings
- "element1"
- "element2"
Network Source¶
Reads from network:
source:
network:
type: "UDP" # Required: UDP or TCP
address: "0.0.0.0:12345" # Required: Port number or address:port
decoder: "lines" # Required: Decoder type (none/identity/line/bytes)
Beats Source¶
Reads from Elastic Beats:
source:
beats:
address: "0.0.0.0:514" # Required: Listen address
clientInactivityTimeout: "PT30S" # Optional: Connection idle timeout (ISO8601 duration)
Sink Types¶
VoltDB Sink¶
Outputs to VoltDB:
sink:
voltdb-procedure:
procedureName: "MyStoredProc" # Required: Stored procedure name
servers: "voltdb-host:21212" # Required: VoltDB host
client:
retires: 3 # Optional: Number of retries
Processor Types¶
Processors can be written in multiple languages and are defined in the pipeline's processors array. Each processor
must specify its language and code:
processors:
- javascript:
script: |
function process(input) {
if (typeof input === 'string') {
return input.toUpperCase();
}
return input;
}
- python:
code: |
import re
def process(message):
return message.lower()
process(message)
- ruby:
code: |
message.reverse
Complete Examples¶
Simple File Processing Pipeline¶
version: 1
name: "file-processor"
source:
file:
path: "input.txt"
processors:
- javascript:
script: |
function process(input) {
if (typeof input === 'string') {
return input.toUpperCase();
}
return input;
}
sink:
file:
dirPath: "/tmp"
Kafka to VoltDB Pipeline¶
version: 1
name: "kafka-to-voltdb"
source:
kafka:
bootstrapServers:
- "kafka1:9092"
- "kafka2:9092"
topicNames:
- "incoming-data"
groupId: "processor-group"
startingOffset: "LATEST"
processors:
- javascript:
script: |
function process(input) {
// Transform message
return JSON.parse(message)
}
sink:
voltdb-procedure:
procedureName: "ProcessData"
servers: "voltdb-host:21212"
client:
retires: 3
logging:
globalLevel: "INFO"
loggers:
org.voltdb: "DEBUG"
Network to Network Pipeline¶
version: 1
name: "network-relay"
source:
network:
type: "UDP"
address: "0.0.0.0:12345"
decoder: "lines"
sink:
network:
type: "UDP"
address: "target-host:54321"