Sinks¶
Sinks are the components of a streaming pipeline responsible for sending processed data to an external destination or endpoint. They define how and where the data is delivered, supporting a variety of use cases such as network communication, file storage, database interactions, and more.
Each sink provides different configuration options to suit specific requirements, such as protocol selection, error handling, and destination customization. TLS/SSL configuration for sinks that support it is standarized.
The following are the sinks currently available for VoltSP. (See Custom Sources, Sinks, and Processors for information on defining your own custom sink.)
- VoltDB Sink: VoltDB integrations using stored procedure calls or bulk inserting data.
- Blackhole Sink: Discards all incoming data, useful for testing purposes.
- Elasticsearch Sink: Integrates with downstream Elasticsearch clusters (supports version 7 and 8, batch mode and streaming).
- File Sink: Writes streamed data items to separate files in a specified directory.
- Iterable Sink: Stores all streamed data in an in-memory list for later use or testing.
- Kafka Sink: Sends data to Apache Kafka topics.
- Network Sink: Sends byte data over a network using either UDP or TCP protocols.
- SingleFile Sink: Writes all streamed data into a single output file.
- Stdout Sink: Outputs streamed data to the console for debugging or logging purposes.
- Syslog Sink: Sends log messages to a remote syslog server in RFC3164 format.
- MQTT Sink: Publishes messages to an MQTT topic.
Transactional Semantics¶
An essential aspect of any sink is its transactional semantics. Sinks like Kafka and VoltDB will track which messages has been committed and provide this information back to the pipeline source. For example, when using a Kafka source, VoltSP guarantees that no messages will be committed at the source until they have been persisted by the sink. In this case, the entire pipeline ensures no data loss and achieves at-least-once processing semantics.
Configuring Sinks¶
There are three approaches to configuring sinks, as described in the following sections:
Java Code Configuration¶
sinks can be configured directly using Java code. For example, the following Java hard codes the address of the Kafka bootstrap server:
KafkaStreamSinkConfigurator.accepting(String.class)
.withBootstrapServers("statically.configured.address")
.withTopicName("topicName")
.withValueSerializer(StringSerializer.class)
Environment Variable Interpolation¶
To avoid hardcoding parameters, configuration values can be provided externally and interpolated using environment variables. For example:
KafkaStreamSinkConfigurator.accepting(String.class)
.withBootstrapServers("${bootstrap.servers}");
Helm-Based Auto-Configuration¶
Most sinks also support automatic configuration when running in Kubernetes. Providing Helm values under streaming.pipeline.configuration
that match the documented format will inject the provided values and reduce the amount of code needed for setting up sinks.
Taking the Kafka sink as an example, some parameters can be provided by Helm and therefore omitted in the code:
KafkaStreamSinkConfigurator.accepting(ByteBuffer.class)
// Omitting .withBootstrapServers(...) and others
.withValueSerializer(ByteBufferSerializer.class)
streaming:
pipeline:
configuration:
sink:
kafka:
topicName: topicA
bootstrapServers:
- serverA
- serverB
This approach allows for a flexible combination of code-based, environment-based, and Helm-based configuration to suit different deployment scenarios and requirements. See the section on Helm Configuration Options for alternative ways to configure sink properties in Helm.
Security¶
Connector | Credentials | TLS | mTLS | OAuth2 * |
---|---|---|---|---|
VoltDB | ✓ | ✓ | ||
Elasticsearch | ✓ | ✓ | ✓ | |
Kafka | ✓ | ✓ | ||
MQTT | ✓ | ✓ | ✓ | ✓ |
*
VoltSP supports the OAuth 2.0 Client Credentials grant type with JWT (JSON Web Token) access tokens.
Example of Kafka mTLS configuration:
KafkaStreamSslConfiguration sslConfiguration = KafkaStreamSslConfiguration.builder()
.withTruststore("/temp/truststore.jks", "pass1")
.withKeystore("/temp/keystore.jks", "pass2", "keypass3")
.build();
KafkaStreamSinkConfigurator.accepting(ByteBuffer.class)
.withBootstrapServers(kafka.getBootstrapServers())
.withTopicName(topicName)
.withValueSerializer(ByteBufferSerializer.class)
.withSSL(sslConfiguration)
Example of MQTT OAuth 2.0 configuration:
Java:
OAuthConfigurator oauth =
new OAuthConfigurator("http://localhost:8080/token", "test_client", "test_secret")
.withScopes("test_client_access");
MqttSinkConfigurator.aSink()
.withAddress("mqtt.example.com")
.withOAuth(oauth)
Config file:
streaming:
pipeline:
configuration:
sink:
mqtt:
address: mqtt.example.com
oauth:
token-url: http://localhost:8080/token
client-id: test_client
client-secret: test_secret
scopes: test_client_access