Custom Sources, Sinks, and Processors
This section provides information about creating and using custom sources, sinks, and processors. By including the VoltSP connectors API in your Maven POM file, you automatically get access to the standard set of sources and sinks (as described in the reference section). These include sources and sinks for common data sources such as Kafka and Volt Active Data. For example:
<dependency>
<groupId>org.voltdb</groupId>
<artifactId>volt-stream-connectors-api</artifactId>
<version>1.2.0</version>
</dependency>
If the set of out-of-the-box operators do not meet your needs, you can create and use your own custom sources, sinks, and processors. The following sections explain how to create, build, and test your custom operators.
Writing a Custom Operator Using the Stream Extension API
Operators are the components that do the actual work of the VoltSP pipeline. Sources and sinks are a special case of the VoltSP operator that act as the origin and end points of the pipeline. Other operators are the processors that analyze, evaluate, and modify the data as it passes through the pipeline.
All standard operators are implemented as an extension of the public extension API. By designing and coding your own extensions of the public API you can create new operators once and reuse them in different pipelines.
An Example of a Custom Operator
So let's take a look at an example of a custom operator. In this case, it is a sink that simply writes messages to a file.
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.voltdb.stream.api.ExecutionContext;
import org.voltdb.stream.api.pipeline.VoltStreamBuilder;
import org.voltdb.stream.api.pipeline.VoltPipeline;
import org.voltdb.stream.api.pipeline.VoltStreamSink;
import org.voltdb.stream.source.IterableSource;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
public class WriteToAFilePipeline implements VoltPipeline {
@Override
public void define(VoltStreamBuilder stream) {
stream
.withName("read into a file")
.consumeFromSource(IterableSource.generate(createNext()))
.terminateWithSink(new VoltStreamSink<String>() {
private static final Logger LOG = LoggerFactory.getLogger("FILE_SINK");
private final String targetFile = "/tmp/target.file";
private final Path path = Paths.get(targetFile);
private final String lineTerminator = "\n";
private BufferedOutputStream bos;
@Override
public void configure(ExecutionContext context) {
try {
Files.createFile(path);
File file = path.toFile();
FileOutputStream fos = new FileOutputStream(file);
bos = new BufferedOutputStream(fos, 1024);
} catch(IOException e) {
throw new IllegalStateException("Could not create " + targetFile + " for write.", e);
}
}
@Override
public void consume(String input, ExecutionContext context) {
try {
byte[] bytes = input.getBytes(Charset.defaultCharset());
bos.write(bytes);
bos.write(lineTerminator.getBytes(StandardCharsets.UTF_8));
} catch(IOException e) {
LOG.warn("Could not write " + input + " to " + targetFile, e);
}
}
@Override
public CommitResult commit(long batchId, ExecutionContext context) {
try {
bos.flush();
return CommitResult.COMMITTED;
} catch(IOException e) {
throw new RuntimeException("Could not flush buffered data to " + targetFile, e);
}
}
});
}
private static String createNext() {
...
}
}
- First, the operator defines where the consumed messages from upstream source should be written.
- During the
configure
stage, the sink makes sure the file exists. - During the
consume
stage, the sink extracts the bytes from the incoming String message and writes them to an open file. - At runtime, the source generates string messages in fixed size batches and sends them to a sink. After a batch is consumed, VoltSP commits all operators in the pipeline.
For more information about committing an operator see Committing Operator Execution.
Designing Operators for Reusability
As written our custom sink is not reusable. There is no way to configure a target file, a path to a file, the line terminator, or the buffer size. It is possible to extract the sink operator and publish it as an implementation class.
However, if we want to add more configuration options, such as an exception handler or a custom encoder, the constructor will grow and not be very readable. Instead, we recommend using a configurator/builder pattern.
For example, the Kafka source has 22 different options, many of which can be configured via properties. That would be too many to effectively put on the class's constructor.
Instead, you can add a Configurator. For our example, we can add the Configurator AFileSinkConfigurator
that accepts at least five configuration options:
package org.acme.operators;
public class AFileSinkConfigurator implements VoltStreamSinkConfigurator<String> {
public static AFileSinkConfigurator toFile() {
return new AFileSinkConfigurator();
}
private String targetFile;
private String lineTerminator;
private int bufferSize;
private ExceptionHandler exceptionHandler;
private Function<Object, byte[]> encoder;
AFileSinkConfigurator() {
Configuration configuration = getConfiguration();
targetFile = configuration.findByPath("sink.file.path").asString();
lineTerminator = configuration.findByPath("sink.file.lineTerminator").asString();
bufferSize = configuration.findByPath("sink.file.bufferSize").asInt();
// define default exception handler and encoder
// ...
}
public AFileSinkConfigurator withTargetFile(String targetFile) {}
public AFileSinkConfigurator withLineTerminator(String lineTerminator) {}
public AFileSinkConfigurator withBufferSize(int bufferSize) {}
public AFileSinkConfigurator withExceptionHAndler(ExceptionHandler exceptionHandler) {}
public AFileSinkConfigurator withEncoder(Function<Object, byte[]> encoder) {}
public String getTargetFile() {
return targetFile;
}
public String getLineTerminator() {
return lineTerminator;
}
public int getBufferSize() {
return bufferSize;
}
public ExceptionHandler getExceptionHandler() {
return exceptionHandler;
}
public Function<Object, byte[]> getEncoder() {
return encoder;
}
}
By using a configurator pattern we can add any number of configuration options and set them via java code, system property, or the helm properties that is exposed to the configurator.
To finish the pattern, we modify the AFileSink.java
to accept the configurator as a single parameter.
package org.acme.operators;
public class AFileSink implements VoltStreamSink<String> {
private static final Logger LOG = LoggerFactory.getLogger("FILE_SINK");
private final Path path;
private final AFileSinkConfigurator configurator;
private BufferedOutputStream bos;
AFileSink(AFileSinkConfigurator configurator) {
this.configurator = configurator;
path = Paths.get(configurator.getTargetFile());
}
@Override
public void configure(ExecutionContext context) {
try {
Files.createFile(path);
File file = path.toFile();
FileOutputStream fos = new FileOutputStream(file);
bos = new BufferedOutputStream(fos, configurator.getBufferSize());
} catch(IOException e) {
throw new IllegalStateException("Could not create " + configurator.getTargetFile() + " for write.", e);
}
}
@Override
public void consume(String input, ExecutionContext context) {
try {
byte[] bytes = input.getBytes(Charset.defaultCharset());
bos.write(bytes);
bos.write(configurator.getLineTerminator().getBytes(StandardCharsets.UTF_8));
} catch(IOException e) {
LOG.warn("Could not write " + input + " to " + configurator.getTargetFile(), e);
}
}
@Override
public CommitResult commit(long batchId, ExecutionContext context) {
try {
bos.flush();
} catch(IOException e) {
configurator.getExceptionHandler().handle(Collections.emptyList(), context, e);
}
return CommitResult.COMMITTED;
}
}
Because the configurator handles the configuration of other properties, the pipeline definition can focus on those aspects that are important for this specific pipeline.
public class WriteToAFilePipeline implements VoltPipeline {
@Override
public void define(VoltStreamBuilder stream) {
stream
.withName("read into a file")
.consumeFromSource(IterableSource.iterate(new EventMessage()))
.terminateWithSink(AFileSinkConfigurator.toFile()
.withEncoder((event) -> event.getId()));
}
}
The final step is register the custom operator with the pipeline using registerExtension
. This example
binds the AFileSinkConfigurator
class to the name AFileSink
at runtime.
Note that you must also include the package including the custom sink.
In this case, org.acme.operators.Extension
.
package org.acme.operators;
public class Extension implements VoltStreamExtension {
@Override
public void registerExtension(VoltEnvironment environment) {
environment.register(AFileSinkConfigurator.class, AFileSink::new);
}
}
When building the pipeline, you include the custom operator extension using
the standard Java "service loader" mechanism. This is done by adding
the file org.voltdb.stream.api.extension.VoltStreamExtension
to the appropriate location within
the source code hierarchy:
src
main
resources
META-INF
services
org.voltdb.stream.api.extension.VoltStreamExtension
This file contains information on how to configure the extension; that is,
the org.acme.operators.Extension
class name.
If more extension classes are being added, include each fully qualified name on a separate line
of the org.voltdb.stream.api.extension.VoltStreamExtension
file.
Testing Pipelines with Custom Operators
Dividing an operator into a configurator and logic classes has one more advantage. During simple unit testing, one can choose to replace one implementation with another. For example, when the Kafka source is used in a pipeline, it can be replaced by a simple source that generates the desired messages and rest of the pipeline can be tested in isolation.
To include customer operators in tests, you first establish the test dependency:
<dependency>
<groupId>org.voltdb</groupId>
<artifactId>volt-stream-api-test</artifactId>
<version>1.2.0</version>
</dependency>
Next, define the pipeline with a generic source (IterableSource
):
public class WriteToAFilePipeline implements VoltPipeline {
@Override
public void define(VoltStreamBuilder stream) {
stream
.withName("read into a file")
.consumeFromSource(IterableSource.iterate(new EventMessage(2)))
.processWith(..) // important business transformation to test
.processWith(..)
.terminateWithSink(AFileSinkConfigurator.toFile()
.withEncoder((event) -> event.getId()));
}
}
Then, using the junit framework you can set up the test to apply the appropriate source as part of the test definition:
// given
TestVoltEnvironment voltEnvironment = TestVoltEnvironment.get(new WriteToAFilePipeline());
voltEnvironment.register(AFileSinkConfigurator.class,
(ConfigurableSourceProducer<AFileSinkConfigurator>) configurator -> {
assertThat(configurator.getGroupId()).isEqualTo("voltdb");
return new IterableSink<>();
});
// when
voltEnvironment.execute();
// then
List<EventMessage> emittedValues = voltEnvironment.getEmittedValuesFor(TestVoltEnvironment.SINK, EventMessage.class);
assertThat(emittedValues).containsOnly(new EventMessage(2));
Designing Operators For Parallel Processing
Writing a custom operator can solve many problems. Keep in mind that VoltSP will try to use as many CPU cores as possible and for each CPU core will create a independent worker process. In turn, a worker process will create its own copy of the pipeline and its own instances of defined operators.
Even though a pipeline and operators are executed by a single thread, if not configured properly,
a custom operator may start competing for the same resources as other copies of the same operator.
For example the AFileSink
creates a single file defined by a configurator. All workers will try
write to the same file; this can lead to data corruption.
A wiser implementation accepts a directory path and each worker runs a sink operator that creates a unique file within the directory.
Designing the Commit Action for Custom Operators
A commit action can be as easy as returning CommitResult.COMMITTED
or it can be more complex. For example, the operator may need to wait before completing the result.
In zn asynchronous scenario, a custom operator must override the commit
method and ask for a new commitResult
, for example:
@Override
public CommitResult commit(long batchId, ExecutionContext context) {
var result = context.execution().nextCommitResult();
// the default timeout can be overridden
// var result = context.execution().nextCommitResult(Duration.ofMillis(10));
return result;
}
By default, VoltSP waits for 10 seconds before invalidating a created result.
This timeout can be changed by setting the volt.stream.commit.async.timeout.ms
property.
Only source and sink operators are commited, as they are the operators that interact with external processes and manage resources associated with those systems. Processors are executed locally and assumed to not be holding any state. The commit cycle is executed immediately after the batch processing cycle finishes. VoltSP tries to fully complete the commit cycle before starting the next batch processing cycle. This may yield smaller throughput, but keeps data consistent.