Skip to content

YAML API for Defining Pipelines

VoltSP provides a declarative YAML configuration language for defining streaming data pipelines without writing Java code. This document describes the structure and options available in the YAML configuration format.

Choosing Between YAML and Java for Pipeline Definition

While defining execution steps in YAML can be simpler and more concise, it is recommended to implement complex pipelines in Java for better performance, maintainability, and reliability.

Key advantages of using Java include:

  • Compile-time validation – Java is a compiled language, allowing errors to be detected before execution.
  • Modular structure – Source code can be organized into multiple files, improving readability and long-term maintainability.
  • Local testing – Pipelines can be executed and verified locally before deployment to the target environment.
  • Direct JVM execution – Logic defined in YAML is ultimately interpreted on top of the Java Virtual Machine, which can introduce performance overhead compared to native Java implementations.

Basic Structure

A VoltSP pipeline configuration requires the following main sections:

version: 1              # Required: Configuration version (must be 1)
name: "pipeline-name"   # Required: Pipeline name
source: { }             # Required: Source configuration
processors: { }         # Optional: Processing steps to apply
sink: { }               # Required: Sink configuration
logging: { }            # Optional: Logging configuration

Schema

JSON schema for the pipeline configuration, including all supported sections and components. Import it into your favourite IDE for syntax suggestions and validation.

Configuration Sections

Version

Must be 1. This field is required.

version: 1

Name

Pipeline name that will be visible in the logs as well as metrics. This field is required.

name: "my-pipeline"

Source Configuration

The source section defines where the pipeline gets its data. You must specify exactly one source. All sources available to the Java DSL are supported.

Each source type has its own configuration parameters.

Processing Configuration

The processors section defines an array of data transformations that should be applied to data from source.

Sink Configuration

The sink section defines where the pipeline outputs its data. You must specify exactly one sink type. All sinks available to the Java DSL are supported.

Source Types

Some examples of simple source configurations.

File Source

Reads data from a file:

source:
    file:
        path: "input.txt"       # Required: Path to input file

Stdin Source

Reads data from standard input:

source:
    stdin: { }

Collection Source

Reads from a static collection of strings:

source:
    collection:
        elements: # Required: Array of strings
            - "element1"
            - "element2"

Network Source

Reads from network:

source:
    network:
        type: "UDP"                        # Required: UDP or TCP
        address: "0.0.0.0:12345"           # Required: Port number or address:port
        decoder: "lines"                   # Required: Decoder type (none/identity/line/bytes)

Beats Source

Reads from Elastic Beats:

source:
    beats:
        address: "0.0.0.0:514"             # Required: Listen address
        clientInactivityTimeout: "PT30S"   # Optional: Connection idle timeout (ISO8601 duration)

Sink Types

VoltDB Sink

Outputs to VoltDB:

sink:
    voltdb-procedure:
        procedureName: "MyStoredProc"      # Required: Stored procedure name
        servers: "voltdb-host:21212"       # Required: VoltDB host
        client:
            retires: 3                     # Optional: Number of retries

Processor Types

Processors can be written in multiple languages and are defined in the pipeline's processors array. Each processor must specify its language and code:

processors:
    -   javascript:
            script: |
                function process(input) {
                    if (typeof input === 'string') {
                        return input.toUpperCase();
                    }
                    return input;
                }
    -   python:
            code: |
                import re
                def process(message):
                    return message.lower()
                process(message)
    -   ruby:
            code: |
                message.reverse

Complete Examples

Simple File Processing Pipeline

version: 1
name: "file-processor"

source:
    file:
        path: "input.txt"

processors:
    -   javascript:
            script: |
                function process(input) {
                    if (typeof input === 'string') {
                        return input.toUpperCase();
                    }
                    return input;
                }

sink:
    file:
        dirPath: "/tmp"

Kafka to VoltDB Pipeline

version: 1
name: "kafka-to-voltdb"

source:
    kafka:
        bootstrapServers:
            - "kafka1:9092"
            - "kafka2:9092"
        topicNames:
            - "incoming-data"
        groupId: "processor-group"
        startingOffset: "LATEST"

processors:
    -   javascript:
            script: |
                function process(input) {
                    // Transform message
                    return JSON.parse(message)
                }                    

sink:
    voltdb-procedure:
        procedureName: "ProcessData"
        servers: "voltdb-host:21212"
        client:
            retires: 3

logging:
    globalLevel: "INFO"
    loggers:
        org.voltdb: "DEBUG"

Network to Network Pipeline

version: 1
name: "network-relay"

source:
    network:
        type: "UDP"
        address: "0.0.0.0:12345"
        decoder: "lines"

sink:
    network:
        type: "UDP"
        address: "target-host:54321"