Table of Contents
- Active(SP) Stream Data Processing
- See Active(SP) in Action
- How VoltSP Stream Processing Works
- Developing VoltSP Pipelines
- Helm Configuration Options
- Running VoltSP Pipelines
- Sinks
- Transactional Semantics
- Configuring Sinks
- Java Code Configuration
- Environment Variable Interpolation
- Helm-Based Auto-Configuration
- Kafka Sink
- VoltDB Bulk Sink
- VoltDB Procedure Sink
- Network Sink
- Elasticsearch Sink
- Syslog Sink
- Overview
- Configuration Options
- API Details
- SyslogRFC3164MessageConfigurator Options
- Available Facilities and Severities
- SingleFile Sink
- Blackhole Sink
- File Sink
- Iterable Sink
- Stdout Sink
- Sources
- Custom Sources, Sinks, and Processors
Sinks
Sinks are the components of a streaming pipeline responsible for sending processed data to an external destination or endpoint. They define how and where the data is delivered, supporting a variety of use cases such as network communication, file storage, database interactions, and more.
Each sink provides different configuration options to suit specific requirements, such as protocol selection, error handling, and destination customization. TLS/SSL configuration for sinks that support it is standarized.
The following are the sinks currently available for VoltSP. (See Custom Sources, Sinks, and Processors for information on defining your own custom sink.)
- Blackhole Sink: Discards all incoming data, useful for testing purposes.
- Elasticsearch Sink: Integrates with downstream Elasticsearch clusters (supports version 7 and 8).
- File Sink: Writes streamed data items to separate files in a specified directory.
- Iterable Sink: Stores all streamed data in an in-memory list for later use or testing.
- Kafka Sink: Sends data to an Apache Kafka topic.
- Network Sink: Sends byte data over a network using either UDP or TCP protocols.
- SingleFile Sink: Writes all streamed data into a single output file, ensuring data durability.
- Stdout Sink: Outputs streamed data to the console for debugging or logging purposes.
- Syslog Sink: Sends log messages to a remote syslog server in RFC3164 format.
- VoltDB Sink: VoltDB integrations using stored procedure calls or bulk inserting data.
Transactional Semantics
An essential aspect of any sink is its transactional semantics. Sinks like Kafka and VoltDB will track which messages has been committed and provide this information back to the pipeline source. For example, when using a Kafka source, VoltSP guarantees that no messages will be committed at the source until they have been persisted by the sink. In this case, the entire pipeline ensures no data loss and achieves at-least-once processing semantics.
Configuring Sinks
There are three approaches to configuring sinks, as described in the following sections:
Java Code Configuration
sinks can be configured directly using Java code. For example, the following Java hard codes the address of the Kafka bootstrap server:
KafkaStreamSinkConfigurator.accepting(String.class)
.withBootstrapServers("statically.configured.address")
.withTopicName("topicName")
.withValueSerializer(StringSerializer.class)
Environment Variable Interpolation
To avoid hardcoding parameters, configuration values can be provided externally and interpolated using environment variables. For example:
KafkaStreamSinkConfigurator.accepting(String.class)
.withBootstrapServers("${bootstrap.servers}");
Helm-Based Auto-Configuration
Most sinks also support automatic configuration when running in Kubernetes. Providing Helm values under streaming.pipeline.configuration
that match the documented format will inject the provided values and reduce the amount of code needed for setting up sinks.
Taking the Kafka sink as an example, some parameters can be provided by Helm and therefore omitted in the code:
KafkaStreamSinkConfigurator.accepting(ByteBuffer.class)
// Omitting .withBootstrapServers(...) and others
.withValueSerializer(ByteBufferSerializer.class)
streaming:
pipeline:
configuration:
sink:
kafka:
topicName: topicA
bootstrapServers:
- serverA
- serverB
This approach allows for a flexible combination of code-based, environment-based, and Helm-based configuration to suit different deployment scenarios and requirements. See the section on Helm Configuration Options for alternative ways to configure sink properties in Helm.
Kafka Sink
Overview
The kafka
sink sends data to an Apache Kafka topic. It supports a variety of configurations such as specifying the topic name, bootstrap servers, key and value serializers, SSL configuration, and more. This sink is ideal for integrating with Kafka for real-time data streaming.
Configuration Options
Helm Configuration (YAML)
kafka:
sink:
topicName: "my-topic"
bootstrapServers: "kafka.example.com:9092"
schemaRegistry: "http://registry.example.com"
properties:
key1: value1
Java Configuration
KafkaStreamSinkConfigurator.accepting(String.class)
.withTopicName("my-topic")
.withBootstrapServers("kafka.example.com:9092")
.withSchemaRegistry("http://registry.example.com")
.withProperty("key1", "value1")
.withHeaders(string -> List.of(new RecordHeader("encoding", ...))
.withSSL(
KafkaStreamSslConfiguration.builder()
.withTruststore("/path/to/truststore.jks", "truststore-password")
.withKeystore("/path/to/keystore.jks", "keystore-password", "key-password")
.build()
);
API Details
Methods
KafkaStreamSinkConfiguration.
-
withTopicName(String topicName)
Sets the Kafka topic name where data will be sent. -
withBootstrapServers(String bootstrapServers)
Sets the Kafka bootstrap server(s) for connecting to the Kafka cluster. -
withSchemaRegistry(String schemaRegistryUrl)
Configures the schema registry URL for Avro serialization. -
withProperty(String key, String value)
withProperty(String key, long value)
withProperty(String key, int value)
-
withProperty(String key, boolean value)
Adds or overrides a specific Kafka client property, allowing customization for advanced configurations. -
withValueSerializer(String serializerClass)
-
withValueSerializer(Class<? extends Serializer<?>> serializerClass)
Defines the serializer class for serializing the value part of the Kafka messages. -
withKeyExtractor(Function<T, Long> extractorFunction)
Specifies a function to extract a long-type key from the input data for partitioning and routing messages. -
withKeyExtractor(Function<T, K> extractorFunction, Class<? extends Serializer<K>> serializerClass)
Configures a custom key extractor function along with a serializer for serializing the key. -
withValueExtractor(Function<T, K> extractorFunction, Class<? extends Serializer<?>> serializerClass)
Configures a custom value extractor function along with a serializer for serializing the value. -
withHeaders(Function<T, Iterable<Header>> extractorFunction)
Specifies function that will generate message headers given data element that is being sent. -
withExceptionHandler(ExceptionHandler exceptionHandler)
Allows a custom exception handler to process errors during execution. -
withSSL(KafkaStreamSslConfiguration sslConfiguration)
Configures SSL settings for secure communication with Kafka.
KafkaStreamSslConfiguration.Builder allows configuring SSL/TLS for secure communication
withTruststore()
-
withTruststore(String truststoreFile, String truststorePassword)
Configures the truststore for verifying server certificates. When deployed to a Kubernetes cluster invoking the no-argument version of this method will retrieve the necessary configuration from the environment. -
withKeystore()
-
withKeystore(String keyStoreFile, String keyStorePassword, String keyPassword)
Configures the keystore for client certificate authentication. When deployed to a Kubernetes cluster invoking the no-argument version of this method will retrieve the necessary configuration from the environment. -
withSSLProtocols(TLSProtocols protocol)
Sets the SSL protocol version. -
withSSLEnabledProtocols(TLSProtocols... protocols)
Configures multiple SSL protocols for enhanced security. -
build()
Creates KafkaStreamSslConfiguration
VoltDB Bulk Sink
Overview
The voltdb
sink is used to insert data into a VoltDB table in bulk. It supports configurations such as batch size, flush interval, table name, and the type of bulk operation. This sink is suitable for efficiently inserting large volumes of data into VoltDB.
Configuration Options
Helm Configuration (YAML)
voltdb:
sink:
tableName: "my_table"
batchSize: 100000
flushInterval: 5000
operationType: "INSERT"
Java Configuration
BulkInsertVoltStreamSinkConfigurator.aSink()
.withTableName("my_table")
.withBatchSize(100000)
.withFlushInterval(5000)
.withOperationType(VoltBulkOperationType.INSERT);
API Details
Methods
-
withTableName(String tableName)
Specifies the name of the VoltDB table to insert data into. -
withBatchSize(int batchSize)
Sets the maximum number of records in a batch before data is inserted. -
withFlushInterval(int flushInterval)
Specifies the interval (in milliseconds) to flush data to VoltDB. -
withOperationType(VoltBulkOperationType operationType)
Configures the type of bulk operation to perform (INSERT
orUPSERT
).
VoltDB Procedure Sink
Overview
The voltdb
sink can also be used to invoke stored procedures in VoltDB. This type of sink configuration allows calling a stored procedure with configurable retries, making it suitable for dynamic or transactional data handling.
Configuration Options
Helm Configuration (YAML)
voltdb:
sink:
procedureName: "my_procedure"
retries: 3
Java Configuration
ProcedureVoltStreamSinkConfigurator.aSink()
.withProcedureName("my_procedure")
.withRetry(3);
API Details
Methods
-
withProcedureName(String procedureName)
Specifies the name of the VoltDB stored procedure to invoke. -
withRetry(int retries)
Configures the number of retry attempts for invoking the procedure.
Network Sink
Overview
The network
sink is a socket-based sink that facilitates sending byte data over the network via protocols such as UDP or TCP. It supports Helm-based auto-configuration.
Configuration Options
Helm Configuration (YAML)
sink:
network:
type: udp
address: 10.11.12.13:34567
Java Configuration
NetworkStreamSinkConfigurator sinkConfig = new NetworkStreamSinkConfigurator()
.withType(NetworkType.UDP)
.withAddress("127.0.0.1:3000");
API Details
Methods
-
withType(NetworkType type)
Specifies the network protocol to use (UDP
,TCP
, etc.). -
withAddress(String address)
Sets the target address in the formathost:port
. -
withExceptionHandler(ExceptionHandler exceptionHandler)
Allows a custom exception handler to process errors during execution.
Elasticsearch Sink
Overview
The elasticsearch
sink is used to send data to an Elasticsearch cluster. It supports version 7 and 8 and uses official low-level client with features such as:
- load balancing across all available nodes
- failover in case of node failures
- persistent connections
- SSL/TLS
- bulk mode
Configuration Options
Helm Configuration (YAML)
sink:
elastic:
host: "elasticsearch.example.com"
port: 9200
indexName: "my-index"
auth:
username: "user"
password: "password"
truststore:
file: "/path/to/truststore.jks"
password: "truststore-password"
keystore:
file: "/path/to/keystore.jks"
password: "keystore-password"
key-password: "key-password"
request-headers:
Content-Type: "application/json"
request-parameters:
timeout: "1m"
When the CA certificate is available as a PEM-encoded file, set sink.elastic.keystore.cert-file
in the configuration.
Note that truststore.cert-file
cannot be used together with truststore.file
. They are mutually exclusive.
Java Configuration
ElasticSearchSinkConfigurator.accepting(StringEntity.class)
.withHost("elasticsearch.example.com")
.withPort(9200)
.withIndex("my-index")
.withCredentials("user", "password")
.withSSL(
ElasticSearchSslConfiguration.builder()
.withTruststore("/path/to/truststore.jks", "truststore-password")
.withKeystore("/path/to/keystore.jks", "keystore-password", "key-password")
.build()
)
.addRequestHeader("Content-Type", "application/json")
.addRequestParameter("refresh", "true");
API Details
Methods
ElasticSearchSinkConfigurator
-
withHost(String host)
Sets the Elasticsearch server hostname. -
withPort(int port)
Sets the port for connecting to Elasticsearch. Default is9200
. -
withIndex(String indexName)
Specifies the index name to store the data. -
addRequestHeader(String name, String value)
Adds a custom request header. -
addRequestParameter(String name, String value)
Adds a custom request parameter to Elasticsearch requests. -
withExceptionHandler(ExceptionHandler exceptionHandler)
-
withNoopHostnameVerifier()
Configures the producer to use a "no-op" hostname verifier, effectively disabling hostname verification during SSL/TLS communication -
withHostnameVerifier(HostnameVerifier hostnameVerifier)
Configures the producer with a custom HostnameVerifier for SSL/TLS communication. -
withCredentials()
-
withCredentials(String username, String password)
Sets the authentication credentials. When deployed to a Kubernetes cluster invoking the no-argument version of this method will retrieve the necessary configuration from the environment. -
withSSL(ElasticSearchSslConfiguration sslConfiguration)
withSSL(SSLContext sslContext)
Configures SSL settings for secure communication with Elasticsearch.
ElasticSearchSslConfiguration.Builder allows configuring SSL/TLS for secure communication
withTruststore()
withTruststore(String truststoreFile, String truststorePassword)
-
withTruststoreFromCrt(String pemEncodedFile)
Configures the truststore for verifying server certificates. When deployed to a Kubernetes cluster invoking the no-argument version of this method will retrieve the necessary configuration from the environment. -
ignoreSslValidation()
Ignore server certificates verification. -
withKeystore()
-
withKeystore(String keyStoreFile, String keyStorePassword, String keyPassword)
Configures the keystore for client certificate authentication. When deployed to a Kubernetes cluster invoking the no-argument version of this method will retrieve the necessary configuration from the environment. -
build()
Creates ElasticSearchSslConfiguration
Syslog Sink
Overview
The syslog
sink is used to send log messages to a remote syslog server in RFC3164 format. It supports Helm-based auto-configuration and allows customization of message facility, severity, hostname, and other syslog-specific settings.
Configuration Options
Helm Configuration (YAML)
sink:
syslog:
host: "syslog.example.com"
port: 514
message:
facility: USER
severity: NOTICE
hostname: "my-host"
tag: "my-app"
Java Configuration
SyslogSinkConfigurator.aSink()
.withHostAndPort("syslog.example.com", 514)
.withMessageConfiguration(
new SyslogRFC3164MessageConfigurator()
.withFacility(SyslogMessageFacility.USER)
.withSeverity(SyslogMessageSeverity.NOTICE)
.withHostname("my-host")
.withTag("my-app")
);
API Details
SyslogRFC3164MessageConfigurator Options
-
withFacility(SyslogMessageFacility facility)
Sets the facility for the syslog message, such asUSER
,DAEMON
, etc. -
withSeverity(SyslogMessageSeverity severity)
Sets the severity level for the syslog message, such asNOTICE
,ERROR
, etc. -
withHostname(String hostname)
Sets the hostname to be included in the syslog message. Must contain only ASCII characters. -
withTag(String tag)
Sets the tag to be included in the syslog message. Must contain only ASCII characters.
Available Facilities and Severities
- Facilities: KERN, USER, MAIL, DAEMON, AUTH, SYSLOG, LPR, NEWS, UUCP, CRON, AUTHPRIV, FTP, NTP, AUDIT, ALERT, CLOCK, LOCAL0, LOCAL1, LOCAL2, LOCAL3, LOCAL4, LOCAL5, LOCAL6, LOCAL7
- Severities: EMERGENCY, ALERT, CRITICAL, ERROR, WARNING, NOTICE, INFORMATIONAL, DEBUG
Methods
-
withHostAndPort(String host, int port)
Sets the target syslog server's hostname and port. -
withMessageConfiguration(SyslogRFC3164MessageConfigurator messageConfigurator)
Configures the syslog message details, such as facility, severity, hostname, and tag.
SingleFile Sink
The singlefile
sink writes stream items to a single local output file using buffered I/O. It ensures data durability by performing an fsync every few lines.
It requires parallelism to be set to 1
- only a single thread can write to it.
Configuration Options
Java Configuration
SingleFileSink sink = new SingleFileSink("/path/to/output.txt");
Blackhole Sink
The blackhole
sink discards all the data it consumes. It is mainly used for testing or scenarios where data output is not required.
Configuration Options
Java Configuration
BlackholeSink<String> sink = new BlackholeSink<>();
File Sink
The file
sink writes streamed data items to separate files within a specified directory. Each instance creates a new file with a unique name in the designated directory. This sink supports parallelism higher than 1
.
Configuration Options
Java Configuration
FileSink<String> sink = new FileSink<>("/path/to/output/directory");
Iterable Sink
The iterable
sink captures all stream items into an in-memory list. This is useful for testing or scenarios where bounded streams can be held in memory.
Configuration Options
Java Configuration
IterableSink<String> sink = new IterableSink<>();
Stdout Sink
The stdout
sink writes streamed data items to the standard output (console). It is useful for debugging or logging streamed data.
Configuration Options
Java Configuration
StdoutSink<String> sink = new StdoutSink<>(Objects::toString);