Sinks

Sinks are the components of a streaming pipeline responsible for sending processed data to an external destination or endpoint. They define how and where the data is delivered, supporting a variety of use cases such as network communication, file storage, database interactions, and more.

Each sink provides different configuration options to suit specific requirements, such as protocol selection, error handling, and destination customization. TLS/SSL configuration for sinks that support it is standarized.

The following are the sinks currently available for VoltSP. (See Custom Sources, Sinks, and Processors for information on defining your own custom sink.)

  • Blackhole Sink: Discards all incoming data, useful for testing purposes.
  • Elasticsearch Sink: Integrates with downstream Elasticsearch clusters (supports version 7 and 8).
  • File Sink: Writes streamed data items to separate files in a specified directory.
  • Iterable Sink: Stores all streamed data in an in-memory list for later use or testing.
  • Kafka Sink: Sends data to an Apache Kafka topic.
  • Network Sink: Sends byte data over a network using either UDP or TCP protocols.
  • SingleFile Sink: Writes all streamed data into a single output file, ensuring data durability.
  • Stdout Sink: Outputs streamed data to the console for debugging or logging purposes.
  • Syslog Sink: Sends log messages to a remote syslog server in RFC3164 format.
  • VoltDB Sink: VoltDB integrations using stored procedure calls or bulk inserting data.

Transactional Semantics

An essential aspect of any sink is its transactional semantics. Sinks like Kafka and VoltDB will track which messages has been committed and provide this information back to the pipeline source. For example, when using a Kafka source, VoltSP guarantees that no messages will be committed at the source until they have been persisted by the sink. In this case, the entire pipeline ensures no data loss and achieves at-least-once processing semantics.

Configuring Sinks

There are three approaches to configuring sinks, as described in the following sections:

Java Code Configuration

sinks can be configured directly using Java code. For example, the following Java hard codes the address of the Kafka bootstrap server:

KafkaStreamSinkConfigurator.accepting(String.class)
    .withBootstrapServers("statically.configured.address")
    .withTopicName("topicName")  
    .withValueSerializer(StringSerializer.class)

Environment Variable Interpolation

To avoid hardcoding parameters, configuration values can be provided externally and interpolated using environment variables. For example:

KafkaStreamSinkConfigurator.accepting(String.class)
    .withBootstrapServers("${bootstrap.servers}");

Helm-Based Auto-Configuration

Most sinks also support automatic configuration when running in Kubernetes. Providing Helm values under streaming.pipeline.configuration that match the documented format will inject the provided values and reduce the amount of code needed for setting up sinks.

Taking the Kafka sink as an example, some parameters can be provided by Helm and therefore omitted in the code:

 KafkaStreamSinkConfigurator.accepting(ByteBuffer.class)
    // Omitting  .withBootstrapServers(...) and others
    .withValueSerializer(ByteBufferSerializer.class)  
streaming:
  pipeline:
    configuration:
      sink:
        kafka:
          topicName: topicA
          bootstrapServers:
            - serverA
            - serverB

This approach allows for a flexible combination of code-based, environment-based, and Helm-based configuration to suit different deployment scenarios and requirements. See the section on Helm Configuration Options for alternative ways to configure sink properties in Helm.


Kafka Sink

Overview

The kafka sink sends data to an Apache Kafka topic. It supports a variety of configurations such as specifying the topic name, bootstrap servers, key and value serializers, SSL configuration, and more. This sink is ideal for integrating with Kafka for real-time data streaming.

Configuration Options

Helm Configuration (YAML)

kafka:
  sink:
    topicName: "my-topic"
    bootstrapServers: "kafka.example.com:9092"
    schemaRegistry: "http://registry.example.com"
    properties:
        key1: value1

Java Configuration

KafkaStreamSinkConfigurator.accepting(String.class)
    .withTopicName("my-topic")
    .withBootstrapServers("kafka.example.com:9092")
    .withSchemaRegistry("http://registry.example.com")
    .withProperty("key1", "value1")
    .withHeaders(string -> List.of(new RecordHeader("encoding", ...))
    .withSSL(
        KafkaStreamSslConfiguration.builder()
            .withTruststore("/path/to/truststore.jks", "truststore-password")
            .withKeystore("/path/to/keystore.jks", "keystore-password", "key-password")
            .build()
    );

API Details

Methods

KafkaStreamSinkConfiguration.

  • withTopicName(String topicName)
    Sets the Kafka topic name where data will be sent.

  • withBootstrapServers(String bootstrapServers)
    Sets the Kafka bootstrap server(s) for connecting to the Kafka cluster.

  • withSchemaRegistry(String schemaRegistryUrl)
    Configures the schema registry URL for Avro serialization.

  • withProperty(String key, String value)

  • withProperty(String key, long value)
  • withProperty(String key, int value)
  • withProperty(String key, boolean value)
    Adds or overrides a specific Kafka client property, allowing customization for advanced configurations.

  • withValueSerializer(String serializerClass)

  • withValueSerializer(Class<? extends Serializer<?>> serializerClass)
    Defines the serializer class for serializing the value part of the Kafka messages.

  • withKeyExtractor(Function<T, Long> extractorFunction) Specifies a function to extract a long-type key from the input data for partitioning and routing messages.

  • withKeyExtractor(Function<T, K> extractorFunction, Class<? extends Serializer<K>> serializerClass) Configures a custom key extractor function along with a serializer for serializing the key.

  • withValueExtractor(Function<T, K> extractorFunction, Class<? extends Serializer<?>> serializerClass) Configures a custom value extractor function along with a serializer for serializing the value.

  • withHeaders(Function<T, Iterable<Header>> extractorFunction)
    Specifies function that will generate message headers given data element that is being sent.

  • withExceptionHandler(ExceptionHandler exceptionHandler) Allows a custom exception handler to process errors during execution.

  • withSSL(KafkaStreamSslConfiguration sslConfiguration)
    Configures SSL settings for secure communication with Kafka.

KafkaStreamSslConfiguration.Builder allows configuring SSL/TLS for secure communication

  • withTruststore()
  • withTruststore(String truststoreFile, String truststorePassword)
    Configures the truststore for verifying server certificates. When deployed to a Kubernetes cluster invoking the no-argument version of this method will retrieve the necessary configuration from the environment.

  • withKeystore()

  • withKeystore(String keyStoreFile, String keyStorePassword, String keyPassword)
    Configures the keystore for client certificate authentication. When deployed to a Kubernetes cluster invoking the no-argument version of this method will retrieve the necessary configuration from the environment.

  • withSSLProtocols(TLSProtocols protocol)
    Sets the SSL protocol version.

  • withSSLEnabledProtocols(TLSProtocols... protocols)
    Configures multiple SSL protocols for enhanced security.

  • build() Creates KafkaStreamSslConfiguration


VoltDB Bulk Sink

Overview

The voltdb sink is used to insert data into a VoltDB table in bulk. It supports configurations such as batch size, flush interval, table name, and the type of bulk operation. This sink is suitable for efficiently inserting large volumes of data into VoltDB.

Configuration Options

Helm Configuration (YAML)

voltdb:
  sink:
    tableName: "my_table"
    batchSize: 100000
    flushInterval: 5000
    operationType: "INSERT"

Java Configuration

BulkInsertVoltStreamSinkConfigurator.aSink()
    .withTableName("my_table")
    .withBatchSize(100000)
    .withFlushInterval(5000)
    .withOperationType(VoltBulkOperationType.INSERT);

API Details

Methods

  • withTableName(String tableName)
    Specifies the name of the VoltDB table to insert data into.

  • withBatchSize(int batchSize)
    Sets the maximum number of records in a batch before data is inserted.

  • withFlushInterval(int flushInterval)
    Specifies the interval (in milliseconds) to flush data to VoltDB.

  • withOperationType(VoltBulkOperationType operationType)
    Configures the type of bulk operation to perform (INSERT or UPSERT).


VoltDB Procedure Sink

Overview

The voltdb sink can also be used to invoke stored procedures in VoltDB. This type of sink configuration allows calling a stored procedure with configurable retries, making it suitable for dynamic or transactional data handling.

Configuration Options

Helm Configuration (YAML)

voltdb:
  sink:
    procedureName: "my_procedure"
    retries: 3

Java Configuration

ProcedureVoltStreamSinkConfigurator.aSink()
    .withProcedureName("my_procedure")
    .withRetry(3);

API Details

Methods

  • withProcedureName(String procedureName)
    Specifies the name of the VoltDB stored procedure to invoke.

  • withRetry(int retries)
    Configures the number of retry attempts for invoking the procedure.


Network Sink

Overview

The network sink is a socket-based sink that facilitates sending byte data over the network via protocols such as UDP or TCP. It supports Helm-based auto-configuration.

Configuration Options

Helm Configuration (YAML)

sink:
  network:
    type: udp
    address: 10.11.12.13:34567

Java Configuration

NetworkStreamSinkConfigurator sinkConfig = new NetworkStreamSinkConfigurator()
    .withType(NetworkType.UDP)
    .withAddress("127.0.0.1:3000");

API Details

Methods

  • withType(NetworkType type) Specifies the network protocol to use (UDP, TCP, etc.).

  • withAddress(String address) Sets the target address in the format host:port.

  • withExceptionHandler(ExceptionHandler exceptionHandler) Allows a custom exception handler to process errors during execution.


Elasticsearch Sink

Overview

The elasticsearch sink is used to send data to an Elasticsearch cluster. It supports version 7 and 8 and uses official low-level client with features such as:

  • load balancing across all available nodes
  • failover in case of node failures
  • persistent connections
  • SSL/TLS
  • bulk mode

Configuration Options

Helm Configuration (YAML)

sink:
  elastic:
    host: "elasticsearch.example.com"
    port: 9200
    indexName: "my-index"
    auth:
      username: "user"
      password: "password"
    truststore:
      file: "/path/to/truststore.jks"
      password: "truststore-password"
    keystore:
      file: "/path/to/keystore.jks"
      password: "keystore-password"
      key-password: "key-password"
    request-headers:
      Content-Type: "application/json"
    request-parameters:
      timeout: "1m"

When the CA certificate is available as a PEM-encoded file, set sink.elastic.keystore.cert-file in the configuration. Note that truststore.cert-file cannot be used together with truststore.file. They are mutually exclusive.

Java Configuration

ElasticSearchSinkConfigurator.accepting(StringEntity.class)
    .withHost("elasticsearch.example.com")
    .withPort(9200)
    .withIndex("my-index")
    .withCredentials("user", "password")
    .withSSL(
        ElasticSearchSslConfiguration.builder()
            .withTruststore("/path/to/truststore.jks", "truststore-password")
            .withKeystore("/path/to/keystore.jks", "keystore-password", "key-password")
            .build()
    )
    .addRequestHeader("Content-Type", "application/json")
    .addRequestParameter("refresh", "true");

API Details

Methods

ElasticSearchSinkConfigurator

  • withHost(String host)
    Sets the Elasticsearch server hostname.

  • withPort(int port)
    Sets the port for connecting to Elasticsearch. Default is 9200.

  • withIndex(String indexName)
    Specifies the index name to store the data.

  • addRequestHeader(String name, String value)
    Adds a custom request header.

  • addRequestParameter(String name, String value)
    Adds a custom request parameter to Elasticsearch requests.

  • withExceptionHandler(ExceptionHandler exceptionHandler)

  • withNoopHostnameVerifier() Configures the producer to use a "no-op" hostname verifier, effectively disabling hostname verification during SSL/TLS communication

  • withHostnameVerifier(HostnameVerifier hostnameVerifier) Configures the producer with a custom HostnameVerifier for SSL/TLS communication.

  • withCredentials()

  • withCredentials(String username, String password)
    Sets the authentication credentials. When deployed to a Kubernetes cluster invoking the no-argument version of this method will retrieve the necessary configuration from the environment.

  • withSSL(ElasticSearchSslConfiguration sslConfiguration)

  • withSSL(SSLContext sslContext) Configures SSL settings for secure communication with Elasticsearch.

ElasticSearchSslConfiguration.Builder allows configuring SSL/TLS for secure communication

  • withTruststore()
  • withTruststore(String truststoreFile, String truststorePassword)
  • withTruststoreFromCrt(String pemEncodedFile) Configures the truststore for verifying server certificates. When deployed to a Kubernetes cluster invoking the no-argument version of this method will retrieve the necessary configuration from the environment.

  • ignoreSslValidation() Ignore server certificates verification.

  • withKeystore()

  • withKeystore(String keyStoreFile, String keyStorePassword, String keyPassword)
    Configures the keystore for client certificate authentication. When deployed to a Kubernetes cluster invoking the no-argument version of this method will retrieve the necessary configuration from the environment.

  • build() Creates ElasticSearchSslConfiguration


Syslog Sink

Overview

The syslog sink is used to send log messages to a remote syslog server in RFC3164 format. It supports Helm-based auto-configuration and allows customization of message facility, severity, hostname, and other syslog-specific settings.

Configuration Options

Helm Configuration (YAML)

sink:
  syslog:
    host: "syslog.example.com"
    port: 514
    message:
      facility: USER
      severity: NOTICE
      hostname: "my-host"
      tag: "my-app"

Java Configuration

SyslogSinkConfigurator.aSink()
    .withHostAndPort("syslog.example.com", 514)
    .withMessageConfiguration(
        new SyslogRFC3164MessageConfigurator()
            .withFacility(SyslogMessageFacility.USER)
            .withSeverity(SyslogMessageSeverity.NOTICE)
            .withHostname("my-host")
            .withTag("my-app")
    );

API Details

SyslogRFC3164MessageConfigurator Options

  • withFacility(SyslogMessageFacility facility)
    Sets the facility for the syslog message, such as USER, DAEMON, etc.

  • withSeverity(SyslogMessageSeverity severity)
    Sets the severity level for the syslog message, such as NOTICE, ERROR, etc.

  • withHostname(String hostname)
    Sets the hostname to be included in the syslog message. Must contain only ASCII characters.

  • withTag(String tag)
    Sets the tag to be included in the syslog message. Must contain only ASCII characters.

Available Facilities and Severities

  • Facilities: KERN, USER, MAIL, DAEMON, AUTH, SYSLOG, LPR, NEWS, UUCP, CRON, AUTHPRIV, FTP, NTP, AUDIT, ALERT, CLOCK, LOCAL0, LOCAL1, LOCAL2, LOCAL3, LOCAL4, LOCAL5, LOCAL6, LOCAL7
  • Severities: EMERGENCY, ALERT, CRITICAL, ERROR, WARNING, NOTICE, INFORMATIONAL, DEBUG

Methods

  • withHostAndPort(String host, int port)
    Sets the target syslog server's hostname and port.

  • withMessageConfiguration(SyslogRFC3164MessageConfigurator messageConfigurator)
    Configures the syslog message details, such as facility, severity, hostname, and tag.


SingleFile Sink

The singlefile sink writes stream items to a single local output file using buffered I/O. It ensures data durability by performing an fsync every few lines.

It requires parallelism to be set to 1 - only a single thread can write to it.

Configuration Options

Java Configuration

SingleFileSink sink = new SingleFileSink("/path/to/output.txt");

Blackhole Sink

The blackhole sink discards all the data it consumes. It is mainly used for testing or scenarios where data output is not required.

Configuration Options

Java Configuration

BlackholeSink<String> sink = new BlackholeSink<>();

File Sink

The file sink writes streamed data items to separate files within a specified directory. Each instance creates a new file with a unique name in the designated directory. This sink supports parallelism higher than 1.

Configuration Options

Java Configuration

FileSink<String> sink = new FileSink<>("/path/to/output/directory");

Iterable Sink

The iterable sink captures all stream items into an in-memory list. This is useful for testing or scenarios where bounded streams can be held in memory.

Configuration Options

Java Configuration

IterableSink<String> sink = new IterableSink<>();

Stdout Sink

The stdout sink writes streamed data items to the standard output (console). It is useful for debugging or logging streamed data.

Configuration Options

Java Configuration

StdoutSink<String> sink = new StdoutSink<>(Objects::toString);