Table of Contents
- Active(SP) Stream Data Processing
- See Active(SP) in Action
- How VoltSP Stream Processing Works
- Developing VoltSP Pipelines
- Helm Configuration Options
- Running VoltSP Pipelines
- Sinks
- Sources
- Custom Sources, Sinks, and Processors
- Command Line Interface
- VoltSP YAML Configuration Language
VoltSP YAML Configuration Language
VoltSP provides a declarative YAML configuration language for defining streaming data pipelines without writing Java code. This document describes the structure and options available in the YAML configuration format.
Table of Contents
Basic Structure
A VoltSP pipeline configuration requires the following main sections:
version: 1 # Required: Configuration version (must be 1)
name: "pipeline-name" # Required: Pipeline name
source: { } # Required: Source configuration
pipeline: { } # Optional: Processing steps to apply
sink: { } # Required: Sink configuration
logging: { } # Optional: Logging configuration
Configuration Sections
Version
Must be 1
. This field is required.
version: 1
Name
Pipeline name that will be visible in the logs as well as metrics. This field is required.
name: "my-pipeline"
Source Configuration
The source
section defines where the pipeline gets its data. You must specify exactly one source type. Available source types include:
file
: Read from a local filestdin
: Read from standard inputcollection
: Read from a static collection of elements defined inlinekafka
: Read from a Kafka topicnetwork
: Read from network (supports UDP and TCP)beats
: Receive data from a Filebeat agent.
Each source type has its own configuration parameters.
Pipeline Configuration
The pipeline
section defines processing configuration and any data transformations. It includes:
parallelism
: Optional value specifying pipeline parallelismprocessors
: Optional array of processor configurations
Sink Configuration
The sink
section defines where the pipeline outputs its data. You must specify exactly one sink type. Available sink types include:
voltdb
: Output to VoltDBkafka
: Output to Kafkafile
: Output to a filedirectory
: Output to files in a directory (if parallelism is greater than one each thread will output data to a separate file)stdout
: Output to standard output of the VoltSP processnetwork
: Output to network (supports UDP and TCP)blackhole
: Discard all outputelasticsearch
: Output to Elasticsearchsyslog
: Output to Syslog using TCP and RFC3164 message format.
Logging Configuration
Not yet implemented
The optional logging
section configures logging behavior:
logging:
globalLevel: "DEBUG" # Global log level
loggers: # Per-logger configuration
"org.myapp": "TRACE"
"org.thirdparty": "WARN"
Source Types
File Source
Reads data from a file:
source:
file:
path: "input.txt" # Required: Path to input file
delimiter: "\n" # Optional: Record delimiter
Stdin Source
Reads data from standard input:
source:
stdin: {}
Collection Source
Reads from a static collection of strings:
source:
collection:
elements: # Required: Array of strings
- "element1"
- "element2"
Kafka Source
Reads from Kafka topics:
source:
kafka:
servers: "host1:9092,host2:9092" # Required: Kafka bootstrap servers
topic: "my-topic" # Required: Topic name
consumer_group: "my-group" # Required: Consumer group ID
starting_offset: "LATEST" # Required: Starting offset (LATEST/EARLIEST)
Network Source
Reads from network:
source:
network:
address: "12345" # Required: Port number or address:port
type: "UDP" # Required: UDP or TCP
decoder: "line" # Required: Decoder type (none/identity/line/bytes)
Beats Source
Reads from Elastic Beats:
source:
beats:
address: "0.0.0.0" # Required: Listen address
port: 5044 # Required: Listen port
idleTimeout: "PT30S" # Optional: Connection idle timeout (ISO8601 duration)
Sink Types
VoltDB Sink
Outputs to VoltDB:
sink:
voltdb:
procedure: "MyStoredProc" # Required: Stored procedure name
host: "voltdb-host" # Required: VoltDB host
port: 21212 # Required: VoltDB port
retries: 3 # Optional: Number of retries
Kafka Sink
Outputs to Kafka:
sink:
kafka:
servers: "host1:9092,host2:9092" # Required: Kafka bootstrap servers
topic: "output-topic" # Required: Topic name
File Sink
Outputs to a single file:
sink:
file:
path: "output.txt" # Required: Output file path
Directory Sink
Outputs to multiple files in a directory:
sink:
directory:
path: "/output/dir" # Required: Output directory path
Stdout Sink
Outputs to standard output:
sink:
stdout: {}
Network Sink
Outputs to network:
sink:
network:
type: "UDP" # Required: UDP or TCP
address: "host:port" # Required: Target address
Elasticsearch Sink
Outputs to Elasticsearch:
sink:
elasticsearch:
host: "es-host" # Required: Elasticsearch host
indexName: "my-index" # Required: Index name
port: 9200 # Required: Elasticsearch port
username: "user" # Required: Username
password: "pass" # Required: Password
payloadSizeInBytes: 5242880 # Required: Maximum payload size
requestParameters: # Optional: Additional request parameters
timeout: "30s"
requestHeaders: # Optional: Additional request headers
Content-Type: "application/json"
Syslog Sink
Outputs to Syslog:
sink:
syslog:
host: "syslog-host" # Required: Syslog host
port: 514 # Required: Syslog port
facility: "USER" # Optional: Syslog facility
severity: "NOTICE" # Optional: Syslog severity
hostname: "my-host" # Optional: Source hostname
tag: "my-app" # Optional: Message tag
Processor Types
Processors can be written in multiple languages and are defined in the pipeline's processors
array. Each processor must specify its language and code:
pipeline:
processors:
- javascript:
code: "message.toUpperCase()"
- python:
code: |
import re
def process(message):
return message.lower()
process(message)
- ruby:
code: |
message.reverse
Complete Examples
Simple File Processing Pipeline
version: 1
name: "file-processor"
source:
file:
path: "input.txt"
pipeline:
parallelism: 1
processors:
- javascript:
code: |
message.toUpperCase();
sink:
file:
path: "output.txt"
Kafka to VoltDB Pipeline
version: 1
name: "kafka-to-voltdb"
source:
kafka:
servers: "kafka1:9092,kafka2:9092"
topic: "incoming-data"
consumer_group: "processor-group"
starting_offset: "LATEST"
pipeline:
parallelism: 4
processors:
- javascript:
code: |
// Transform message
JSON.parse(message)
sink:
voltdb:
host: "voltdb-host"
port: 21212
procedure: "ProcessData"
logging:
globalLevel: "INFO"
loggers:
org.voltdb: "DEBUG"
Network to Network Pipeline
version: 1
name: "network-relay"
source:
network:
type: "UDP"
address: "12345"
decoder: "line"
pipeline:
parallelism: 1
sink:
network:
type: "UDP"
address: "target-host:54321"