Table of Contents
- Active(SP) Stream Data Processing
- See Active(SP) in Action
- How VoltSP Stream Processing Works
- Developing VoltSP Pipelines
- Helm Configuration Options
- Running VoltSP Pipelines
- Sinks
- Sources
- Transactional Semantics
- Configuring Sources
- Kafka
- Network
- Iterable Source
- File Source
- Stdin Source
- Custom Sources, Sinks, and Processors
Sources
Sources are key components of a streaming pipeline responsible for ingesting data to be processed. They determine how data is introduced into the pipeline, supporting a wide range of data origins including files, Kafka topics, and standard input. Each source can be configured to meet specific requirements, such as data formats, connection details, and polling intervals.
The following are the sources currently available for VoltSP. (See Custom Sources, Sinks, and Processors for information on defining your own custom source.)
-
File Source: Reads data items from a specified file, treating each line as a separate item. This is ideal for streaming data stored in text files.
-
Generator Source: Produces data items algorithmically, suitable for synthetic data generation or simulation purposes.
-
Iterable Source: Generates a stream of data items from an iterable collection, useful for testing or scenarios involving pre-defined in-memory data.
-
Kafka Source: Consumes data from an Apache Kafka topic, enabling seamless integration with real-time data streams.
-
Network Source: Receives byte data from a network via the UDP protocol.
-
Stdin Source: Reads data items from standard input, making it useful for interactive scenarios or debugging purposes.
Each source type offers flexibility in how data is ingested, with a variety of configuration options to support different data origins and processing needs. The following sections provide detailed documentation for each source, including configuration examples, API details, and usage guidelines.
Transactional Semantics
An essential aspect of any source is its transactional semantics. For example, when using a Kafka source, VoltSP guarantees that no messages will be committed unless they have been processed and sent to a downstream sink. If the sink also supports such guarantees (as the Kafka and VoltDB sinks do), the entire pipeline ensures no data loss and achieves at-least-once processing semantics.
Configuring Sources
There are three approaches to configuring sources, as described in the following sections:
Java Code Configuration
Sources can be configured directly using Java code. For example, the following Java hard codes the address of the Kafka bootstrap server:
KafkaStreamSourceConfigurator.aConsumer()
.withBootstrapServers("statically.configured.address")
.withGroupId(UUID.randomUUID().toString())
.withTopicNames("some-topic-name")
.withKeyDeserializer(ByteBufferDeserializer.class)
.withValueDeserializer(ByteBufferDeserializer.class)
.withStartingOffset(KafkaStartingOffset.EARLIEST);
Environment Variable Interpolation
To avoid hardcoding parameters, configuration values can be provided externally and interpolated using environment variables. For example:
KafkaStreamSourceConfigurator.aConsumer()
.withBootstrapServers("${bootstrap.servers}");
Helm-Based Auto-Configuration
Most sources also support automatic configuration when running in Kubernetes. Providing Helm values under streaming.pipeline.configuration
that match the documented format will inject the provided values and reduce the amount of code needed for setting up sources.
Taking the Kafka source as an example, some parameters can be provided by Helm and therefore omitted in the code:
KafkaStreamSourceConfigurator.aConsumer()
// Omitting .withBootstrapServers(...) and others
.withKeyDeserializer(ByteBufferDeserializer.class)
.withValueDeserializer(ByteBufferDeserializer.class)
.withStartingOffset(KafkaStartingOffset.EARLIEST);
streaming:
pipeline:
configuration:
source:
kafka:
groupId: groupA
topicNames:
- topicA
- topicB
bootstrapServers:
- serverA
- serverB
This approach allows for a flexible combination of code-based, environment-based, and Helm-based configuration to suit different deployment scenarios and requirements. See the section on Helm Configuration Options for alternative ways to configure source properties in Helm.
Kafka
Overview
The kafka
source consumes data from an Apache Kafka topic. It allows configuration of consumer properties, such as topic names, bootstrap servers, poll timeout, starting offsets, and more. This source is suitable for integrating real-time data from Kafka into your streaming pipeline.
Configuration Options
Helm Configuration (YAML)
source:
kafka:
groupId: "my-group"
topicNames:
- "topicA"
- "topicB"
bootstrapServers:
- "serverA:9092"
- "serverB:9092"
startingOffset: "EARLIEST"
pollTimeout: "PT0.25S"
maxCommitTimeout: "PT10S"
maxCommitRetries: 3
properties:
key1: value1
key2: value2
Java Configuration
KafkaStreamSourceConfigurator.aConsumer()
.withGroupId("my-group")
.withTopicNames("topicA,topicB")
.withBootstrapServers("serverA:9092,serverB:9092")
.withStartingOffset(KafkaStartingOffset.EARLIEST)
.withPollTimeout(Duration.ofMillis(250))
.withMaxCommitRetries(3)
.withMaxCommitTimeout(Duration.ofSeconds(10));
API Details
Methods
KafkaStreamSourceConfiguration.
-
withGroupId(String groupId)
Sets the consumer group ID for the Kafka source. -
withTopicNames(String topicNames)
Specifies the Kafka topics to consume data from. -
withBootstrapServers(String bootstrapServers)
Configures the Kafka cluster's bootstrap servers. -
withStartingOffset(KafkaStartingOffset startingOffsets)
Sets the starting offset for the consumer (EARLIEST
,LATEST
,NONE
). -
withPollTimeout(Duration pollTimeout)
Defines the poll timeout duration for Kafka consumer polling. -
withMaxCommitRetries(int retries)
Configures the maximum number of retries for committing offsets. -
withMaxCommitTimeout(Duration timeout)
Sets the maximum timeout for commit retries. -
withProperty(String key, String value)
withProperty(String key, long value)
withProperty(String key, int value)
-
withProperty(String key, boolean value)
Adds or overrides a specific Kafka client property, allowing customization for advanced configurations. -
withSSL(KafkaStreamSslConfiguration sslConfigurator)
Configures SSL settings for secure communication with Kafka. -
withKeyDeserializer(Class<? extends Deserializer<KEY>> deserializerClass)
Sets the deserializer for the Kafka message key. -
withValueDeserializer(Class<? extends Deserializer<VALUE>> deserializerClass)
-
withValueDeserializer(Class<? extends Deserializer<?>> deserializerClass, Class<VALUE> deserializedType)
Sets the deserializer for the Kafka message value. -
withSchemaRegistryUrl(String schemaUrl)
Configures the schema registry URL for Avro serialization. -
withExceptionHandler(KafkaSourceExceptionHandler exceptionHandler)
Sets a custom exception handler for handling Kafka consumer errors.
KafkaStreamSslConfiguration.Builder allows configuring SSL/TLS for secure communication
withTruststore()
-
withTruststore(String truststoreFile, String truststorePassword)
Configures the truststore for verifying server certificates. When deployed to a Kubernetes cluster invoking the no-argument version of this method will retrieve the necessary configuration from the environment. -
withKeystore()
-
withKeystore(String keyStoreFile, String keyStorePassword, String keyPassword)
Configures the keystore for client certificate authentication. When deployed to a Kubernetes cluster invoking the no-argument version of this method will retrieve the necessary configuration from the environment. -
withSSLProtocols(TLSProtocols protocol)
Sets the SSL protocol version. -
withSSLEnabledProtocols(TLSProtocols... protocols)
Configures multiple SSL protocols for enhanced security. -
build()
Creates KafkaStreamSslConfiguration
Features
-
Auto-configuration
Automatically detects and applies configurations fromkafka.source
in the Helm YAML. -
Customizable
Supports a wide range of configuration options, including SSL settings, poll timeout, and starting offsets. -
Error Handling
Allows custom exception handling usingKafkaSourceExceptionHandler
to manage consumer errors gracefully. -
Flexible Deserialization
Supports custom deserializers for both keys and values, making it adaptable to different Kafka data formats.
Usage Example
KafkaStreamSourceConfigurator.aConsumer()
.withGroupId("my-group")
.withTopicNames("topicA,topicB")
.withBootstrapServers("serverA:9092,serverB:9092")
.withStartingOffset(KafkaStartingOffset.LATEST)
.withPollTimeout(Duration.ofMillis(500))
.withMaxCommitRetries(5)
.withMaxCommitTimeout(Duration.ofSeconds(15))
.withSSL(KafkaStreamSslConfiguration.builder()
.withTruststore("/path/to/truststore.jks", "truststore-password")
.withKeystore("/path/to/keystore.jks", "keystore-password", "key-password")
.build())
.withExceptionHandler((kafkaConsumer, context, ex) -> {
System.err.println("Error while consuming data: " + ex.getMessage());
});
This configuration is designed for consuming real-time data from Kafka topics, with support for secure communication and custom error handling.
Network
Overview
The network
source is used to receive byte data from a network, currently supporting only UDP protocol. It is suitable for use cases that require ingesting data from other networked services or systems, such as log collection or real-time event processing. Configuration options allow setting the network type and listening address.
Configuration Options
Helm Configuration (YAML)
source:
network:
type: udp
port: 34567
Java Configuration
new NetworkStreamSourceConfigurator()
.withType(NetworkType.UDP)
.withPort(34567);
API Details
Methods
-
withType(NetworkType type)
Specifies the network protocol type (UDP
orTCP
). -
withPort(int port)
Sets the listen port to bind to for receiving data. -
withExceptionHandler(ExceptionHandler exceptionHandler)
Adds a custom exception handler for source failures.
Features
-
Auto-configuration
Automatically detects and applies configurations fromnetwork.source
in the Helm YAML. -
Custom Error Handling
Provides an option to specify a custom exception handler to manage errors during data reception.
Usage Example
NetworkStreamSourceConfigurator()
.withType(NetworkType.TCP)
.withPort(8080)
.withExceptionHandler((context, exception) -> {
System.err.println("Error while receiving data: " + exception.getMessage());
});
This configuration allows the source to receive data from the network using the specified protocol and port, with support for handling reception errors.
Iterable Source
The iterable
source generates a stream of data items from an iterable collection. It is useful for testing or for scenarios where data is pre-defined and available in-memory.
Configuration Options
Java Configuration:
IterableSource.from(List.of("item1", "item2", "item3"));
IterableSource.iterate(1, 2, 3);
File Source
The file
source reads data items from a specified file, where each line is treated as a new data item. It is useful for streaming data stored in text files.
It requires parallelism to be set to 1
as only one thread can read lines from the file.
Configuration Options
Java Configuration:
FileSource.withNewlineDelimiter("/path/to/input.txt");
Stdin Source
The stdin
source reads data items from standard input (stdin), treating each line as a separate data item. It is useful for interactive scenarios or debugging where input is provided manually.
Configuration Options
Java Configuration:
StdinSource source = new StdinSource();