VoltSP YAML Configuration Language

VoltSP provides a declarative YAML configuration language for defining streaming data pipelines without writing Java code. This document describes the structure and options available in the YAML configuration format.

Table of Contents

Basic Structure

A VoltSP pipeline configuration requires the following main sections:

version: 1              # Required: Configuration version (must be 1)
name: "pipeline-name"   # Required: Pipeline name
source: { }            # Required: Source configuration
pipeline: { }          # Optional: Processing steps to apply
sink: { }             # Required: Sink configuration
logging: { }          # Optional: Logging configuration

Configuration Sections

Version

Must be 1. This field is required.

version: 1

Name

Pipeline name that will be visible in the logs as well as metrics. This field is required.

name: "my-pipeline"

Source Configuration

The source section defines where the pipeline gets its data. You must specify exactly one source type. Available source types include:

  • file: Read from a local file
  • stdin: Read from standard input
  • collection: Read from a static collection of elements defined inline
  • kafka: Read from a Kafka topic
  • network: Read from network (supports UDP and TCP)
  • beats: Receive data from a Filebeat agent.

Each source type has its own configuration parameters.

Pipeline Configuration

The pipeline section defines processing configuration and any data transformations. It includes:

  • parallelism: Optional value specifying pipeline parallelism
  • processors: Optional array of processor configurations

Sink Configuration

The sink section defines where the pipeline outputs its data. You must specify exactly one sink type. Available sink types include:

  • voltdb: Output to VoltDB
  • kafka: Output to Kafka
  • file: Output to a file
  • directory: Output to files in a directory (if parallelism is greater than one each thread will output data to a separate file)
  • stdout: Output to standard output of the VoltSP process
  • network: Output to network (supports UDP and TCP)
  • blackhole: Discard all output
  • elasticsearch: Output to Elasticsearch
  • syslog: Output to Syslog using TCP and RFC3164 message format.

Logging Configuration

Not yet implemented

The optional logging section configures logging behavior:

logging:
  globalLevel: "DEBUG"        # Global log level
  loggers:                    # Per-logger configuration
    "org.myapp": "TRACE"
    "org.thirdparty": "WARN"

Source Types

File Source

Reads data from a file:

source:
  file:
    path: "input.txt"       # Required: Path to input file
    delimiter: "\n"         # Optional: Record delimiter

Stdin Source

Reads data from standard input:

source:
  stdin: {}

Collection Source

Reads from a static collection of strings:

source:
  collection:
    elements:              # Required: Array of strings
      - "element1"
      - "element2"

Kafka Source

Reads from Kafka topics:

source:
  kafka:
    servers: "host1:9092,host2:9092"   # Required: Kafka bootstrap servers
    topic: "my-topic"                  # Required: Topic name
    consumer_group: "my-group"         # Required: Consumer group ID
    starting_offset: "LATEST"          # Required: Starting offset (LATEST/EARLIEST)

Network Source

Reads from network:

source:
  network:
    address: "12345"                   # Required: Port number or address:port
    type: "UDP"                        # Required: UDP or TCP
    decoder: "line"                    # Required: Decoder type (none/identity/line/bytes)

Beats Source

Reads from Elastic Beats:

source:
  beats:
    address: "0.0.0.0"                # Required: Listen address
    port: 5044                        # Required: Listen port
    idleTimeout: "PT30S"              # Optional: Connection idle timeout (ISO8601 duration)

Sink Types

VoltDB Sink

Outputs to VoltDB:

sink:
  voltdb:
    procedure: "MyStoredProc"         # Required: Stored procedure name
    host: "voltdb-host"              # Required: VoltDB host
    port: 21212                      # Required: VoltDB port
    retries: 3                       # Optional: Number of retries

Kafka Sink

Outputs to Kafka:

sink:
  kafka:
    servers: "host1:9092,host2:9092"  # Required: Kafka bootstrap servers
    topic: "output-topic"             # Required: Topic name

File Sink

Outputs to a single file:

sink:
  file:
    path: "output.txt"                # Required: Output file path

Directory Sink

Outputs to multiple files in a directory:

sink:
  directory:
    path: "/output/dir"               # Required: Output directory path

Stdout Sink

Outputs to standard output:

sink:
  stdout: {}

Network Sink

Outputs to network:

sink:
  network:
    type: "UDP"                       # Required: UDP or TCP
    address: "host:port"              # Required: Target address

Elasticsearch Sink

Outputs to Elasticsearch:

sink:
  elasticsearch:
    host: "es-host"                   # Required: Elasticsearch host
    indexName: "my-index"             # Required: Index name
    port: 9200                        # Required: Elasticsearch port
    username: "user"                  # Required: Username
    password: "pass"                  # Required: Password
    payloadSizeInBytes: 5242880      # Required: Maximum payload size
    requestParameters:                # Optional: Additional request parameters
      timeout: "30s"
    requestHeaders:                   # Optional: Additional request headers
      Content-Type: "application/json"

Syslog Sink

Outputs to Syslog:

sink:
  syslog:
    host: "syslog-host"              # Required: Syslog host
    port: 514                        # Required: Syslog port
    facility: "USER"                 # Optional: Syslog facility
    severity: "NOTICE"               # Optional: Syslog severity
    hostname: "my-host"              # Optional: Source hostname
    tag: "my-app"                    # Optional: Message tag

Processor Types

Processors can be written in multiple languages and are defined in the pipeline's processors array. Each processor must specify its language and code:

pipeline:
  processors:
    - javascript:
        code: "message.toUpperCase()"
    - python:
        code: |
          import re
          def process(message):
              return message.lower()
          process(message)
    - ruby:
        code: |
          message.reverse

Complete Examples

Simple File Processing Pipeline

version: 1
name: "file-processor"

source:
  file:
    path: "input.txt"

pipeline:
  parallelism: 1
  processors:
    - javascript:
        code: |
          message.toUpperCase();

sink:
  file:
    path: "output.txt"

Kafka to VoltDB Pipeline

version: 1
name: "kafka-to-voltdb"

source:
  kafka:
    servers: "kafka1:9092,kafka2:9092"
    topic: "incoming-data"
    consumer_group: "processor-group"
    starting_offset: "LATEST"

pipeline:
  parallelism: 4
  processors:
    - javascript:
        code: |
          // Transform message
          JSON.parse(message)

sink:
  voltdb:
    host: "voltdb-host"
    port: 21212
    procedure: "ProcessData"

logging:
  globalLevel: "INFO"
  loggers:
    org.voltdb: "DEBUG"

Network to Network Pipeline

version: 1
name: "network-relay"

source:
  network:
    type: "UDP"
    address: "12345"
    decoder: "line"

pipeline:
  parallelism: 1

sink:
  network:
    type: "UDP"
    address: "target-host:54321"