Skip to content

Advanced: Custom Sources, Sinks, and Processors

This section provides information about creating and using custom sources, sinks, and processors. By including the VoltSP connectors API in your Maven POM file, you automatically get access to the standard set of sources and sinks (as described in the reference section). These include sources and sinks for common data sources such as Kafka and Volt Active Data. For example:

<dependency>
    <groupId>org.voltdb</groupId>
    <artifactId>volt-stream-connectors-api</artifactId>
    <version>1.0-20250910-124207-release-1.5.3</version>
</dependency>

If the set of out-of-the-box operators do not meet your needs, you can create and use your own custom sources, sinks, and processors. The following sections explain how to create, build, and test your custom operators.

Writing a Custom Operator Using the Stream Extension API

Operators are the components that do the actual work of the VoltSP pipeline. Sources and sinks are a special case of the VoltSP operator that act as the origin and end points of the pipeline. Other operators are the processors that analyze, evaluate, and modify the data as it passes through the pipeline.

All standard operators are implemented as an extension of the public extension API. By designing and coding your own extensions of the public API you can create new operators once and reuse them in different pipelines.

An Example of a Custom Operator

So let's take a look at an example of a custom operator. In this case, it is a sink that simply writes messages to a file.

package org.voltdb.stream.execution;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.voltdb.stream.api.ExecutionContext;
import org.voltdb.stream.api.Sources;
import org.voltdb.stream.api.extension.CommitResult;
import org.voltdb.stream.api.pipeline.VoltPipeline;
import org.voltdb.stream.api.pipeline.VoltStreamBuilder;
import org.voltdb.stream.api.pipeline.VoltStreamSink;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

public class WriteToAFilePipeline implements VoltPipeline {

    @Override
    public void define(VoltStreamBuilder stream) {
        stream
                .withName("read into a file")
                .consumeFromSource(Sources.generate(WriteToAFilePipeline::createNext))
                .terminateWithSink(new ToFileSink());
    }

    private static String createNext() {
        return ...;
    }
}

class ToFileSink implements VoltStreamSink<String> {

    private static final Logger LOGGER = LoggerFactory.getLogger("FILE_SINK");

    private static final String LINE_TERMINATOR = "\n";

    private final String targetFile = "/tmp/target.file";
    private final Path path = Paths.get(targetFile);

    private BufferedOutputStream bos;

    @Override
    public void configure(ExecutionContext context) {
        try {
            Files.createFile(path);
            File file = path.toFile();
            FileOutputStream fos = new FileOutputStream(file);
            bos = new BufferedOutputStream(fos, 1024);
        } catch (IOException e) {
            throw new IllegalStateException("Could not create " + targetFile + " for write.", e);
        }
    }

    @Override
    public void consume(String input, ExecutionContext context) {
        try {
            byte[] bytes = input.getBytes(Charset.defaultCharset());
            bos.write(bytes);
            bos.write(LINE_TERMINATOR.getBytes(StandardCharsets.UTF_8));
        } catch (IOException e) {
            LOGGER.warn("Could not write " + input + " to " + targetFile, e);
        }
    }

    @Override
    public CommitResult commit(long batchId, ExecutionContext context) {
        try {
            bos.flush();
            return CommitResult.COMMITTED;
        } catch (IOException e) {
            throw new RuntimeException("Could not flush buffered data to " + targetFile, e);
        }
    }
}
  1. First, the operator defines where the consumed messages from upstream source should be written.
  2. During the configure stage, the sink makes sure the file exists.
  3. During the consume stage, the sink extracts the bytes from the incoming String message and writes them to that file.
  4. At runtime, the source generates string messages in fixed size batches and sends them to a sink. After a batch is consumed, VoltSP commits all operators in the pipeline.

For more information about committing an operator see Committing Operator Execution.

Designing Operators for Reusability

As written our custom sink is not easily reusable. It needs to be provided in the pipeline jar; It cannot be configured the same way as build-in operators. Built-in operators support many configuration features that are initially missing from a user written custom sink.

There is an easy way for VoltSP tooling to generate missing functionality such as autoconfiguration of operator from yaml. To do this you need to define an immutable configuration record class and use VoltSP maven plugin to generate needed boilerplate code. The boilerplate will generate a ConfigBuilder class and a corresponding jar files allowing anyone to:

  • drop an extension jar with operator implementation into any VoltSP installation allowing it to be used by anyone
  • api jar file to be used during pipeline development to (auto)configure the operator just like a builtin one.

1 Create a Configuration Record

A configuration record lists every tunable option for the operator using plain Java types.

package org.voltdb.stream.plugin.beats.api;

import org.voltdb.stream.api.pipeline.ExceptionHandler;
import org.voltdb.stream.processor.VoltSP;
import org.voltdb.stream.processor.VoltSP.Documentation.Field;

import java.nio.file.Path;

@VoltSP.Sink(name = "custom-file", implementation = "org.acme.FileSink")
public record FileSinkConfig(
        @Field(required = true, description = "Target directory")
        Path directory,
        @Field(defaultValue = "10MB", description = "Max file size before rollover")
        long maxSize,
        ExceptionHandler exceptionHandler) {
}
* The name of the @Sink is used during autoconfiguration to read YAML entries from corresponding part of the YAML structure. It should be unique. * The implementation tells the VoltSP plugin system which operator to create. * @Field marks required options and declares defaults. The framework will validate default values at build time and fail build if they are invalid. * Framework supports types like HostAndPort, Duration, and Path and generates extra methods for ease of use of the builder. It also supports collections of primitive types and nested records.

2 Accept configuration record in the implementation class

Every plugin has a single public constructor that may accept the config record and/or a logger. Pair it with the record via @VoltSP.Source or @VoltSP.Sink.

@VoltSP.Sink(name = "custom-file", implementation = "org.acme.FileSink")
public record FileSinkConfig() {
}

public class FileSink implements VoltStreamSink<byte[]> {
    private final FileSinkConfig cfg;

    public FileSink(Logger log, FileSinkConfig cfg) {
        this.cfg = cfg;
    }
    // implement configure / consume / commit…
}

3 Build — and Let the Tools Work

Including the Maven plugin org.voltdb:volt-stream-maven-plugin plus the dependency volt-stream-plugin-infrastructure automatically generates:

  • A fluent builder (FileSinkConfigBuilder) with validation.
  • Markdown documentation describing every field and its defaults.
  • Stub classes used for plugin discovery at runtime.

You never hand‑write a separate builder or configurator.

4 Use the Generated Builder in a Pipeline

stream
    .consumeFromSource(...)
    .terminateWithSink(
        FileSinkConfigBuilder.builder()
            .withDirectory(Paths.get("/data/out"))
    );

When the pipeline starts VoltSP invokes build method fo the genrated builder and injects it into the FileSink.

5 Packaging & Deployment

The build produces two artifacts:

Jar What it contains
implementation jar The operator class(es) and config record
API jar Only the record, generated builder and supporting enums—everything users need at compile time
VoltSP loads implementation jars dropped into the plugins/ directory at startup; pipelines depend only on the lightweight API jar.

6 Testing Made Simple

Because a config is just a Plain Old Java Record, unit tests can instantiate it directly or use the builder—no reflection or environment wiring required.

var cfg = FileSinkConfigBuilder.builder()
        .withDirectory(tempDir)
        .build();
var sink = new FileSink(LoggerFactory.getLogger("test"), cfg);

Testing Pipelines with Custom Operators

Dividing an operator into a configurator and logic classes has one more advantage. During simple unit testing, one can choose to replace one implementation with another. For example, when the Kafka source is used in a pipeline, it can be replaced by a simple source that generates the desired messages and rest of the pipeline can be tested in isolation.

To include customer operators in tests, you first establish the test dependency:

<dependency>
    <groupId>org.voltdb</groupId>
    <artifactId>volt-stream-api-test</artifactId>
    <version>1.0-20250910-124207-release-1.5.3</version>
</dependency>

Next, define the pipeline with a generic source (IterableSource):

public class WriteToAFilePipeline implements VoltPipeline {
    @Override
    public void define(VoltStreamBuilder stream) {
        stream
                .withName("read into a file")
                .consumeFromSource(IterableSource.iterate(new EventMessage(2)))
                .processWith(..) // important business transformation to test
                .processWith(..) 
                .terminateWithSink(AFileSinkConfigurator.toFile()
                .withEncoder((event) -> event.getId()));
    }
}

Designing Operators For Parallel Processing

Writing a custom operator can solve many problems. Keep in mind that VoltSP will try to use as many CPU cores as possible and for each CPU core will create an independent worker process. In turn, a worker process will create its own copy of the pipeline and its own instances of defined operators.

Even though a pipeline and operators are executed by a single thread, if not configured properly, a custom operator may start competing for the same resources as other copies of the same operator. For example the AFileSink creates a single file defined by a configurator. All workers will try to write to the same file; this can lead to data corruption. A wiser implementation accepts a directory path and each worker runs a sink operator that creates a unique file within the directory.

Designing the Commit Action for Custom Operators

A commit action can be as easy as returning CommitResult.COMMITTED or it can be more complex. For example, the operator may need to wait before completing the result. In zn asynchronous scenario, a custom operator must override the commit method and ask for a new commitResult, for example:

@Override
public CommitResult commit(long batchId, ExecutionContext context) {
    var result = context.execution().nextCommitResult();
    // the default timeout can be overridden
    // var result = context.execution().nextCommitResult(Duration.ofMillis(10));

    return result;
}

By default, VoltSP waits for 10 seconds before invalidating a created result. This timeout can be changed by setting the voltsp.commit.async.timeout.ms property.

Only source and sink operators are commited, as they are the operators that interact with external processes and manage resources associated with those systems. Processors are executed locally and assumed to not be holding any state. The commit cycle is executed immediately after the batch processing cycle finishes. VoltSP tries to fully complete the commit cycle before starting the next batch processing cycle. This may yield smaller throughput, but keeps data consistent.