Developing VoltSP Pipelines

Each pipeline consists of a data source, one or more functions that operate on the data, and ends by sending the resulting record to a data target or sink. You describe the structure of your pipeline using a Domain Specific Language (DSL), written in Java. The DSL includes classes and methods that define the structure of your pipeline and can be compiled into the actual runtime code.

For VoltSP, the DSL describes the three primary components of the pipeline: the source, the functions, and the sink. Like so:

stream
   .consumeFromSource( [ . . . ] )  ❶
       .processWith( [ . . . ] )    ❷
       .processWith( [ . . . ] )
       .processWith( [ . . . ] )
           [ . . . ]
   .terminateWithSink( [ . . . ] )  ❸

And we can see those components highlighted in an actual example below:

import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.voltdb.stream.api.Consumer;
import org.voltdb.stream.api.ExecutionContext;
import org.voltdb.stream.api.ExecutionContext.ConfigurationContext;
import org.voltdb.stream.api.Sinks;
import org.voltdb.stream.api.Sources;
import org.voltdb.stream.api.pipeline.VoltStreamFunction;
import org.voltdb.stream.api.pipeline.VoltStreamBuilder;
import org.voltdb.stream.api.pipeline.VoltPipeline;

public class ProducerPipeline implements VoltPipeline {

    @Override
    public void define(VoltStreamBuilder stream) {
        ConfigurationContext configurator = stream.getExecutionContext().configurator();
        int messageCount = configurator.findByPath("producer.message.count").asInt();
        int messageBytesLength = configurator.findByPath("producer.message.bytes").asInt();

        stream
                .withName("event producer")
        ❶       .consumeFromSource(Sources.generateAtRate(100, () -> new EventMessage(messageBytesLength)))
        ❷       .processWith(new VoltFunction<EventMessage, EventMessage>() {
                    private int count = 0;
                    @Override
                    public void process(EventMessage input, Consumer<EventMessage> consumer, ExecutionContext context) {
                        if (count > messageCount) {
                            context.execution().cancel();
                        }
                        count++;
                        consumer.consume(input);
                    }
                })
        ❸       .terminateWithSink(Sinks.kafka().accepting(EventMessage.class)
                        .withKeyExtractor(EventMessage::getSessionId)
                        .withValueSerializer(KafkaAvroSerializer.class));
    }

    private static class EventMessage {
        private long sessionId; 

        public EventMessage(int messageBytesLength) {
        }

        public long getSessionId() {
            return sessionId;
        }
    }
}

This is simplified example:

  • The source produces EventMessage data objects at a given rate of transactions per second (TPS)
  • The function checks how many messages are produced and stops the pipeline if the threshold is reached.
  • The sink publishes events to a kafka broker See yaml configuration

Setting Up Your Development Environment

You could start your VoltSP pipeline project from scratch, setting up the necessary folder structure, creating Java source files, and defining the Maven dependencies and Helm properties by hand. But it is much easier to start with a template, and the quick start example described in Downloading the Sample Application can be used for just that. Follow the instructions for downloading the quick start, specifying your organization's ID as the group ID and your pipeline name as the artifact ID to create your template. Let's say you are creating a pipeline called mydatapipe for acme.org, the resulting template would have the following folder structure:

mydatapipe
- src
  - main
    - java
      - org
        - acme
    - resources
  - test
    - java
      - org
        - acme
    - resources

The following are the key files you will use for creating your own pipeline from the quick start sample: - mydatapipe/pom.xml — The Maven project file for building the pipeline - mydatapipe/src/main/java/org/acme/*.java — Pipeline definition files you can revise and reuse to match your pipeline's source, sink, and functions. - mydatapipe/src/main/resources/*.yaml — Helm property files you can use to describe the data resources the pipeline requires, such as Kafka streams, Volt Active Data databases, and their properties.

The pom.xml defines dependencies that are later resolved by maven. The most important dependency is on VoltSP itself:

<dependency>
    <groupId>org.voltdb</groupId>
    <artifactId>volt-stream-api</artifactId>
    <version>1.1.0</version>
</dependency>

Choosing the Source and Destination for the Pipeline

The first thing the pipeline needs is a data source to produce a flow of data for the pipeline to operate on. The source can be as simple as an inline function or much more complicated, such as an operator that integrates with a remote system. The same is true of the sink. which can be as simple or as complex as you need. Let's look at a basic example:

import org.voltdb.stream.api.Consumer;
import org.voltdb.stream.api.ExecutionContext;
import org.voltdb.stream.api.pipeline.VoltStreamBuilder;
import org.voltdb.stream.api.pipeline.VoltPipeline;
import org.voltdb.stream.api.pipeline.VoltStreamSink;
import org.voltdb.stream.api.pipeline.VoltStreamSource;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;

public class ProducerPipeline implements VoltPipeline {

    @Override
    public void define(VoltStreamBuilder stream) {
        Random random = ThreadLocalRandom.current();

        stream
                .withName("event producer")
                .consumeFromSource(new VoltStreamSource<EventMessage>() {
                    @Override
                    public void process(long batchId, Consumer<EventMessage> consumer, ExecutionContext context) {
                        var event = new EventMessage(random.nextInt());
                        consumer.consume(event);
                    }
                })
                .terminateWithSink(new VoltStreamSink<EventMessage>() {
                    private List<EventMessage> messages = new ArrayList<>();
                    @Override
                    public void consume(EventMessage output) {
                        messages.add(output);
                    }
                });
    }

    private static class EventMessage {
        public EventMessage(int messageBytesLength) {
        }
    }
}

For each call to the process method, the data source creates a new EventMessage, which the pipeline sends downstream. Since there are no processor operations in the example, the data is sent directly to a sink via a Consumer binding.

The consumer pattern is very useful, as it allows the source to introduce none, one or more messages into a data stream, without any dependency on what is the next operator. This way the pipeline can be easily changed to adjust for any business needs. (In this example, there are no other processors to filter or transform the data; it is consumed as is by a sink. Normally, as we will see later, additional operators would be included between the source and the sink.)

Batch Processing

In the previous example, notice the use of long batchId in the argument list to the process method. Every time the system calls the process method, it starts a new batch. The batch ID is globally unique. Batching is a well established pattern in data processing, because handling multiple records at a time is almost always a good performance optimisation.

The source generates messages using its own business logic, whether it is reading records from a file locally or from a remote source, pulling data from external data sources such as queueing systems, databases, or invoking REST APIs.

All those operations can be executed in batches. The more messages sent in a batch to the downstream operators, the more data can be processed. The downside of batching is that the more data read in at a time, the more could be lost if processing fails. The decision whether to repeat the process or discard the data so the next batch can be processed is a business decision. See the section on Handling Errors for more information.

NOTE: If downstream system cannot process data and keeps failing the stream will halt awaiting downstream system to be operational again.

In the following example the source process has been expanded to produce multiple records up to a maximum batch size (set to 1,000).

import org.voltdb.stream.api.Consumer;
import org.voltdb.stream.api.ExecutionContext;
import org.voltdb.stream.api.pipeline.VoltStreamBuilder;
import org.voltdb.stream.api.pipeline.VoltPipeline;
import org.voltdb.stream.api.pipeline.VoltStreamSink;
import org.voltdb.stream.api.pipeline.VoltStreamSource;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;

public class ProducerPipeline implements VoltPipeline {

    @Override
    public void define(VoltStreamBuilder stream) {
        Random random = ThreadLocalRandom.current();

        stream
                .withName("event producer")
                .consumeFromSource(new VoltStreamSource<EventMessage>() {
                    private static final int MAX_BATCH_SIZE = 1000;
                    @Override
                    public void process(long batchId, Consumer<EventMessage> consumer, ExecutionContext context) {
                        for (int counter = 0; !context.execution().isCanceled() && counter < MAX_BATCH_SIZE; counter++) {
                            var event = new EventMessage(random.nextInt());
                            consumer.consume(event);
                        }
                    }
                })
                ...;
    }

    private static class EventMessage {
        public EventMessage(int messageBytesLength) {
        }
    }
}

After each batch is processed the system performs any necessary housekeeping and then assigns a new batch ID and calls the source's process method, repeating this cycle again and again until the pipeline is stopped.

Defining the Business Functions

The source and sink define where the stream starts and ends. But it is the functions in the middle that do the real work of transforming the data into actionable business decisions.

The functions are executed sequentially as specified in the pipeline definition and can be any function or method you choose. For example, the function can filter out incoming data so the message is not sent to a consumer. It can transform the data, enhance it, or even create multiple messages from a single message (such as a money transfer being divided into two transactions &emdash; a debit and a credit for each user taking part in the original transfer operation).

In the following example from the VoltSP quick start sample, the random-to-kafka pipeline uses an inline function to convert the text to uppercase:

public class ProducerPipeline implements VoltPipeline {

    @Override
    public void define(VoltStreamBuilder stream) {
        stream
                .withName("event producer")
                .consumeFromSource(...)
                .processWith(new VoltFunction<String, String>() {
                    @Override
                    public void process(String input, Consumer<String> consumer, ExecutionContext context) {
                        consumer.consume(input.toUpperCase());
                    }
                })
                .terminateWithSink(...);
    }
}

This can be further simplified by using a Java lambda expression:

public class ProducerPipeline implements VoltPipeline {

    @Override
    public void define(VoltStreamBuilder stream) {
        stream
                .withName("event producer")
                .consumeFromSource(...)
                .processWith(String::toLowerCase)
                .terminateWithSink(...);
    }
}

Any operator, but especially functions are a good place to decide whether flow of data should continue, for example:

public class ProducerPipeline implements VoltPipeline {

    @Override
    public void define(VoltStreamBuilder stream) {
        stream
                .withName("event producer")
                .consumeFromSource(new VoltStreamSource<EventMessage>() {...})
                .processWith(new VoltFunction<EventMessage, EventMessage>() {
                    @Override
                    public void process(EventMessage input, Consumer<EventMessage> consumer, ExecutionContext context) {
                        if (shouldContinue()) {
                            consumer.consume(input);
                        } else {
                            context.execution().cancel();
                        }
                    }
                })
                .terminateWithSink(new VoltStreamSink<EventMessage>() {...});
    }

    private static class EventMessage {
    }
}

For convenience, VoltSP includes an implementation, org.voltdb.stream.function.CancelingFunction, that can be configured to pass only a defined number of messages and will then cancel processing. This helps to separate responsibilities of operators and group them together if needed in a pipeline, enhancing reusability.

For more complex processing, you can include the function source code separately elsewhere in the pipeline definition file or define and build it as a separate class.

Handling Exceptions

VoltSP lets you define a global exception handler for the pipeline. All exceptions from pipeline processing are routed to this handler. By default, the handler just logs the error and any affected records.

NOTE: Some sources include their own exception handling, because error handling is very specific to a source's implementation. But also because if the exception occurs before any messages are inserted into the pipeline, there are no side effects and the operation can be repeated without data loss.

The following is an example of a custom error handler that checks for constraint violations.

public class ProducerPipeline implements VoltPipeline {

    @Override
    public void define(VoltStreamBuilder stream) {

        stream.onError()
              .setExceptionHandler(new ExceptionHandler() {
                  @Override
                  public void handle(List<?> records, ExecutionContext context, Throwable throwable) {
                      if (throwable instanceof ConstraintFailureException) {
                          return;
                      } else {
                          Logger.error("Got failure when processing {}.", records);
                      }
                  }
              });

        stream
                .withName("event producer")
                .consumeFromSource(new VoltStreamSource<EventMessage>() {...})
                .terminateWithSink(new VoltStreamSink<EventMessage>() {...});
    }

    private static class EventMessage {
    }
}

Using Alternate Sinks

You can define and use additional sinks in your pipeline definition. Often these sinks are used in conjunction with error handling (for example, routing bad records to an alternate stream). However, they can also be useful when directing data to different destinations based on their content.

Data to these sinks can be routed at any time by calling context.execution().emit("name", object) as in the following example:

public class ProducerPipeline implements VoltPipeline {

    @Override
    public void define(VoltStreamBuilder stream) {

        stream.onError()
              .addNamedSink("file", new VoltStreamSink<EventMessage>() {
                  @Override
                  public void consume(EventMessage output) {
                      File.write(output);
                  }
              })
              .setExceptionHandler(new ExceptionHandler() {
                  @Override
                  public void handle(List<?> records, ExecutionContext context, Throwable throwable) {
                      if (throwable instanceof ConstraintFailureException) {
                          return;
                      } else {
                          for(Object record : records) {
                              context.execution().emit("file", record);
                          }
                      }
                  }
              });

        stream
                .withName("mediation event producer")
                .consumeFromSource(new VoltStreamSource<EventMessage>() {...})
                .terminateWithSink(new VoltStreamSink<EventMessage>() {...});
    }

    private static class EventMessage {
    }
}

It is important to note, however, that there is a difference between the handling of the main sink defined by a pipeline and alternate sinks defined by an error handler. When commiting to the main sink, the sink operator is ask to commit the batch and either return a successful CommitResult or a CommitResult representing a failure. the operator must return a meaningful cause and data associated with the error. That information is passed to the exception handler and the batch is considered to be committed.

The exception handler may decide to emit failed records to another sink, and the system will also try to commit to those additional sinks. If an additional sink fails to return success, the system keeps try to commit the data, otherwise data could be lost. Only once all additional sinks are committed, with the system consider the batch completed.

In addition, there is a default timeout associated with each commit. CommitResult represents a future event and can be completed or failed asynchronously from other threads. For example:

long batchId = 5L;
CompletableCommitResult commitResult = context.execution().nextCommitResult();

// from other thread, at other time
commitResult.complete(batchId);

The default timeout is 10 seconds and can be modified by assigning a value to the volt.stream.commit.async.timeout.ms property. When the timeout occurs, a org.voltdb.stream.api.extension.CommitResult.CommitTimeoutException is thrown and the system calls the exception handler, passing all messages that were sent to the main sink along with the exception.