Skip to content

YAML API for Defining Pipelines

VoltSP provides a declarative YAML configuration language for defining streaming data pipelines without writing Java code. This document describes the structure and options available in the YAML configuration format.

Documentation for VoltSP operators such as sinks, processors, sources, etc. is equally applicable to Java as well as YAML pipelines, and includes examples for both.

Schema support

When writing YAML pipelines, it is recommended to use the official VoltSP schema. It will drive your favorite IDE's autocomplete, provide syntax hints and validation errors. Schema contains all possible components and their configuration options. It also contains inline documentation and lists valid values for each field.

You can reference the schema using the "$schema" field:

$schema: "https://docs.voltdb.com/VoltSP/schemas/voltsp1.7.0.json"

or manually associate the document with that schema in your IDE.

Basic Structure

A VoltSP pipeline configuration requires the following main sections:

version: 1              # Required: Configuration version (must be 1)
name: "pipeline-name"   # Required: Pipeline name
source: { }             # Required: Source configuration
processors: { }         # Optional: Processing steps to apply
sink: { }               # Required: Sink configuration
logging: { }            # Optional: allows configuring logging levels for the system ans well as specific loggers

Configuration Sections

Version

Must be 1. This field is required.

version: 1

Name

Pipeline name that will be visible in the logs as well as metrics. This field is required.

name: "my-pipeline"

Source Configuration

The source section defines where the pipeline gets its data. You must specify exactly one source. All sources available to the Java DSL are supported.

Each source type has its own configuration parameters.

Processing Configuration

The processors section defines an array of data transformations that should be applied to the data delivered by the source.

Sink Configuration

The sink section defines where the pipeline outputs its data. You must specify exactly one sink type. All sinks available to the Java DSL are supported.

Calling code snippets from operator declaration

Some configuration options in VoltSP accept Function types, allowing you to provide custom logic for data extraction or transformation. There are two ways to provide such logic: Java method references or JavaScript/Python code snippets.

Java method references

The most straightforward way to declare a function is using a Java method reference with the syntax fully.qualified.ClassName::methodName.

For example, if an operator expects:

Function<E, Long> idExtractor;
then such a parameter can be set in YAML as:
version: 1
name: TestPipeline
source:
  decodingSource:
    idExtractor: org.acme.Event::id
...
NOTE: The method is extracted from the fully qualified class during the configuration phase and will be cached to improve performance.

Instance methods on the input object

When the method is a no-argument instance method, VoltSP will call it directly on the input object. This works well with Java records:

public record Event(long id, String name) {}
idExtractor: org.acme.Event::id
nameExtractor: org.acme.Event::name
Instance methods on a factory class

The method can also accept a parameter. In this case, VoltSP will instantiate the class and call the method with the input object:

version: 1
name: TestPipeline
source:
  decodingSource:
    idExtractor: org.acme.Factory::getId
...
and its implementation:
public class Factory {
    public long getId(Event event) {
        return event.id();
    }
}

Important requirements for factory classes:

  • The class must not be abstract
  • The class must have a public no-argument constructor
  • The constructor must not throw exceptions
Static methods

Methods can also be static, which avoids instantiation requirements:

public class Factory {
    public static Long getStaticId(Event event) {
        return event.id();
    }
}
idExtractor: org.acme.Factory::getStaticId

Static no-argument methods are also supported – the input object will be ignored:

public class Constants {
    public static String getDefaultName() {
        return "default";
    }
}
Method resolution rules
  • The method name must be unambiguous - if the class has multiple methods with the same name (overloaded), configuration will fail
  • The method must be public
  • Whitespace around the class name and method name is trimmed automatically
Common errors
Error Cause
"Java class 'X' could not be found on class path" The fully qualified class name is incorrect or the class is not available
"Java class 'X' does not have method 'Y'" The method name is misspelled or doesn't exist
"Ambiguous Java method name" The class has multiple methods with the same name; rename or use a wrapper
"Class 'X' cannot be instantiated (abstract?)" Non-static method on an abstract class
"Class 'X' does not have a public no argument constructor" The class has only private/protected constructors or requires arguments
"The no argument constructor of class 'X' has thrown an exception" The constructor threw an exception during instantiation

Note: All Java method reference errors are detected during the configuration phase at startup. If any error occurs, VoltSP will fail to start and report the specific configuration issue.

JavaScript and Python code execution

If the value does not match the Java method reference pattern (ClassName::methodName), it will be passed to the JavaScript engine for evaluation. This allows inline scripting:

version: 1
name: TestPipeline
source:
  decodingSource:
    idExtractor: |
      function process(input) {
        return input.id();
      }
...

The code snippet is evaluated once during the configuration phase.

Note: Syntax errors in JavaScript or Python code snippets are detected during the configuration phase at startup. If the script contains syntax errors, VoltSP will fail to start and report the parsing error.

Source Types

Some examples of simple source configurations.

File Source

Reads data from a file:

source:
    file:
        path: "input.txt"       # Required: Path to input file

Stdin Source

Reads data from standard input:

source:
    stdin: { }

Collection Source

Reads from a static collection of strings:

source:
    collection:
        elements: # Required: Array of strings
            - "element1"
            - "element2"

Network Source

Reads from network:

source:
    network:
        type: "UDP"                        # Required: UDP or TCP
        address: "0.0.0.0:12345"           # Required: Port number or address:port
        decoder: "lines"                   # Required: Decoder type (none/identity/line/bytes)

Sink Types

Kafka Sink

Outputs to a Kafka topic:

sink:
    kafka:
        bootstrapServers: "localhost:9092"
        topicName: "test"
        keySerializer: org.apache.kafka.common.serialization.StringSerializer
        valueSerializer: org.apache.kafka.common.serialization.StringSerializer

Processor Types

Processors in YAML configs are predefined transformations that just like sinks and sources have their own configuration. Some ot them allow executing arbitrary code - javascript, python, or ruby, some provide specific functionality like cache, or AI inference.

processors:
    -   javascript:
        script: |
            function process(input) {
              var VoltRequest = Java.type('org.voltdb.stream.plugin.volt.api.VoltRequest');
              new VoltRequest(input, ["3", "4"]);
            }
    -   voltdb-cache:
            voltClientResource: "us-east-cluster"

Resources

Resources represent integrations with external systems or shared clients that your pipeline needs during processing. They encapsulate connection management, credentials, and reusable clients (for example, HTTP client, VoltDB client, S3 client).

resources:
    -   name: voltdb
        voltdb-client:
            servers:
                - voltdb-us-central:21212

# ...

sink:
    voltdb-procedure:
        voltClientResource: "voltdb"

Logging

The logging section allows you to configure log levels for the entire system and specific loggers. Both globalLevel and loggers are optional.

The logger names and level naming follow the log4j2 conventions.

  • globalLevel - Sets the root log level for the entire application. Valid values are: TRACE, DEBUG, INFO, WARN, ERROR, FATAL, OFF, ALL
  • loggers - A map of logger names to their specific log levels. This allows fine-grained control over logging for specific packages or classes.

Warning: YAML defined logging configuration is applied only after the pipeline starts. To influence the startup time logging of the core system, use CLI/Helm parameters directly.

version: 1
name: "logging-test"

source:
    collection:
        elements:
            - "Hello world!"

sink:
    stdout: { }

logging:
    globalLevel: "TRACE"
    loggers:
        "org.voltdb": "DEBUG"

Complete Examples

Simple File Processing Pipeline

$schema: "https://docs.voltdb.com/VoltSP/schemas/voltsp1.7.0.json"

version: 1
name: "file-processor"

source:
    file:
        path: "input.txt"

processors:
    -   javascript:
            script: |
                function process(input) {
                    if (typeof input === 'string') {
                        return input.toUpperCase();
                    }
                    return input;
                }

sink:
    file:
        dirPath: "/tmp"

Kafka to VoltDB Pipeline

$schema: "https://docs.voltdb.com/VoltSP/schemas/voltsp1.7.0.json"

version: 1
name: "kafka-to-voltdb"

resources:
    -   name: voltdb
        voltdb-client:
            servers:
                - voltdb-us-central:21212

source:
    kafka:
        bootstrapServers:
            - "kafka1:9092"
            - "kafka2:9092"
        topicNames:
            - "incoming-data"
        groupId: "processor-group"
        startingOffset: latest
        keyDeserializer: org.apache.kafka.common.serialization.StringSerializer
        valueDeserializer: org.apache.kafka.common.serialization.StringSerializer

processors:
    -   javascript:
            script: |
                var VoltProcedureRequest = Java.type('org.voltdb.stream.plugin.volt.api.VoltProcedureRequest');

                function process(input) {
                    // Transform message
                    JSON.parse(message);

                    return new VoltProcedureRequest("ProcessFrame", ...);
                }                    

sink:
    voltdb-procedure:
        voltClientResource: "voltdb"

Network to Network Pipeline

$schema: "https://docs.voltdb.com/VoltSP/schemas/voltsp1.7.0.json"

version: 1
name: "network-relay"

source:
    network:
        type: udp
        address: "0.0.0.0:12345"
        decoder: "lines"

processors: [ ]

sink:
    network:
        type: udp
        address: "target-host:54321"

Testing pipelines

Users are encouraged to write Java tests harnesses that verify the correctness of pipelines whether they are defined using Java code or YAML. The volt-stream-testcontainer module provides tools for setting up runtime environment of a pipeline and running them against a docker image of VoltSP.