Sources

Sources are key components of a streaming pipeline responsible for ingesting data to be processed. They determine how data is introduced into the pipeline, supporting a wide range of data origins including files, Kafka topics, and standard input. Each source can be configured to meet specific requirements, such as data formats, connection details, and polling intervals.

The following are the sources currently available for VoltSP. (See Custom Sources, Sinks, and Processors for information on defining your own custom source.)

  • File Source: Reads data items from a specified file, treating each line as a separate item. This is ideal for streaming data stored in text files.

  • Generator Source: Produces data items algorithmically, suitable for synthetic data generation or simulation purposes.

  • Iterable Source: Generates a stream of data items from an iterable collection, useful for testing or scenarios involving pre-defined in-memory data.

  • Kafka Source: Consumes data from an Apache Kafka topic, enabling seamless integration with real-time data streams.

  • Network Source: Receives byte data from a network via the UDP protocol.

  • Stdin Source: Reads data items from standard input, making it useful for interactive scenarios or debugging purposes.

Each source type offers flexibility in how data is ingested, with a variety of configuration options to support different data origins and processing needs. The following sections provide detailed documentation for each source, including configuration examples, API details, and usage guidelines.

Transactional Semantics

An essential aspect of any source is its transactional semantics. For example, when using a Kafka source, VoltSP guarantees that no messages will be committed unless they have been processed and sent to a downstream sink. If the sink also supports such guarantees (as the Kafka and VoltDB sinks do), the entire pipeline ensures no data loss and achieves at-least-once processing semantics.

Configuring Sources

There are three approaches to configuring sources, as described in the following sections:

Java Code Configuration

Sources can be configured directly using Java code. For example, the following Java hard codes the address of the Kafka bootstrap server:

KafkaStreamSourceConfigurator.aConsumer()
    .withBootstrapServers("statically.configured.address")
    .withGroupId(UUID.randomUUID().toString())
    .withTopicNames("some-topic-name")
    .withKeyDeserializer(ByteBufferDeserializer.class)
    .withValueDeserializer(ByteBufferDeserializer.class)
    .withStartingOffset(KafkaStartingOffset.EARLIEST);

Environment Variable Interpolation

To avoid hardcoding parameters, configuration values can be provided externally and interpolated using environment variables. For example:

KafkaStreamSourceConfigurator.aConsumer()
    .withBootstrapServers("${bootstrap.servers}");

Helm-Based Auto-Configuration

Most sources also support automatic configuration when running in Kubernetes. Providing Helm values under streaming.pipeline.configuration that match the documented format will inject the provided values and reduce the amount of code needed for setting up sources.

Taking the Kafka source as an example, some parameters can be provided by Helm and therefore omitted in the code:

KafkaStreamSourceConfigurator.aConsumer()
    // Omitting .withBootstrapServers(...) and others
    .withKeyDeserializer(ByteBufferDeserializer.class)
    .withValueDeserializer(ByteBufferDeserializer.class)
    .withStartingOffset(KafkaStartingOffset.EARLIEST);
streaming:
  pipeline:
    configuration:
      source:
        kafka:
          groupId: groupA
          topicNames:
            - topicA
            - topicB
          bootstrapServers:
            - serverA
            - serverB

This approach allows for a flexible combination of code-based, environment-based, and Helm-based configuration to suit different deployment scenarios and requirements. See the section on Helm Configuration Options for alternative ways to configure source properties in Helm.


Kafka

Overview

The kafka source consumes data from an Apache Kafka topic. It allows configuration of consumer properties, such as topic names, bootstrap servers, poll timeout, starting offsets, and more. This source is suitable for integrating real-time data from Kafka into your streaming pipeline.

Configuration Options

Helm Configuration (YAML)

source:
  kafka:
    groupId: "my-group"
    topicNames:
    - "topicA"
    - "topicB"
    bootstrapServers:
    - "serverA:9092"
    - "serverB:9092"
    startingOffset: "EARLIEST"
    pollTimeout: "PT0.25S"
    maxCommitTimeout: "PT10S"
    maxCommitRetries: 3
    properties:
      key1: value1
      key2: value2

Java Configuration

KafkaStreamSourceConfigurator.aConsumer()
    .withGroupId("my-group")
    .withTopicNames("topicA,topicB")
    .withBootstrapServers("serverA:9092,serverB:9092")
    .withStartingOffset(KafkaStartingOffset.EARLIEST)
    .withPollTimeout(Duration.ofMillis(250))
    .withMaxCommitRetries(3)
    .withMaxCommitTimeout(Duration.ofSeconds(10));

API Details

Methods

KafkaStreamSourceConfiguration.

  • withGroupId(String groupId)
    Sets the consumer group ID for the Kafka source.

  • withTopicNames(String topicNames)
    Specifies the Kafka topics to consume data from.

  • withBootstrapServers(String bootstrapServers)
    Configures the Kafka cluster's bootstrap servers.

  • withStartingOffset(KafkaStartingOffset startingOffsets)
    Sets the starting offset for the consumer (EARLIEST, LATEST, NONE).

  • withPollTimeout(Duration pollTimeout)
    Defines the poll timeout duration for Kafka consumer polling.

  • withMaxCommitRetries(int retries)
    Configures the maximum number of retries for committing offsets.

  • withMaxCommitTimeout(Duration timeout)
    Sets the maximum timeout for commit retries.

  • withProperty(String key, String value)

  • withProperty(String key, long value)
  • withProperty(String key, int value)
  • withProperty(String key, boolean value)
    Adds or overrides a specific Kafka client property, allowing customization for advanced configurations.

  • withSSL(KafkaStreamSslConfiguration sslConfigurator)
    Configures SSL settings for secure communication with Kafka.

  • withKeyDeserializer(Class<? extends Deserializer<KEY>> deserializerClass)
    Sets the deserializer for the Kafka message key.

  • withValueDeserializer(Class<? extends Deserializer<VALUE>> deserializerClass)

  • withValueDeserializer(Class<? extends Deserializer<?>> deserializerClass, Class<VALUE> deserializedType) Sets the deserializer for the Kafka message value.

  • withSchemaRegistryUrl(String schemaUrl)
    Configures the schema registry URL for Avro serialization.

  • withExceptionHandler(KafkaSourceExceptionHandler exceptionHandler)
    Sets a custom exception handler for handling Kafka consumer errors.

KafkaStreamSslConfiguration.Builder allows configuring SSL/TLS for secure communication

  • withTruststore()
  • withTruststore(String truststoreFile, String truststorePassword)
    Configures the truststore for verifying server certificates. When deployed to a Kubernetes cluster invoking the no-argument version of this method will retrieve the necessary configuration from the environment.

  • withKeystore()

  • withKeystore(String keyStoreFile, String keyStorePassword, String keyPassword)
    Configures the keystore for client certificate authentication. When deployed to a Kubernetes cluster invoking the no-argument version of this method will retrieve the necessary configuration from the environment.

  • withSSLProtocols(TLSProtocols protocol)
    Sets the SSL protocol version.

  • withSSLEnabledProtocols(TLSProtocols... protocols)
    Configures multiple SSL protocols for enhanced security.

  • build() Creates KafkaStreamSslConfiguration

Features

  1. Auto-configuration
    Automatically detects and applies configurations from kafka.source in the Helm YAML.

  2. Customizable
    Supports a wide range of configuration options, including SSL settings, poll timeout, and starting offsets.

  3. Error Handling
    Allows custom exception handling using KafkaSourceExceptionHandler to manage consumer errors gracefully.

  4. Flexible Deserialization
    Supports custom deserializers for both keys and values, making it adaptable to different Kafka data formats.

Usage Example

KafkaStreamSourceConfigurator.aConsumer()
    .withGroupId("my-group")
    .withTopicNames("topicA,topicB")
    .withBootstrapServers("serverA:9092,serverB:9092")
    .withStartingOffset(KafkaStartingOffset.LATEST)
    .withPollTimeout(Duration.ofMillis(500))
    .withMaxCommitRetries(5)
    .withMaxCommitTimeout(Duration.ofSeconds(15))
    .withSSL(KafkaStreamSslConfiguration.builder()
        .withTruststore("/path/to/truststore.jks", "truststore-password")
        .withKeystore("/path/to/keystore.jks", "keystore-password", "key-password")
        .build())
    .withExceptionHandler((kafkaConsumer, context, ex) -> {
        System.err.println("Error while consuming data: " + ex.getMessage());
    });

This configuration is designed for consuming real-time data from Kafka topics, with support for secure communication and custom error handling.


Network

Overview

The network source is used to receive byte data from a network, currently supporting only UDP protocol. It is suitable for use cases that require ingesting data from other networked services or systems, such as log collection or real-time event processing. Configuration options allow setting the network type and listening address.

Configuration Options

Helm Configuration (YAML)

source:
  network:
    type: udp
    port: 34567

Java Configuration

new NetworkStreamSourceConfigurator()
    .withType(NetworkType.UDP)
    .withPort(34567);

API Details

Methods

  • withType(NetworkType type)
    Specifies the network protocol type (UDP or TCP).

  • withPort(int port)
    Sets the listen port to bind to for receiving data.

  • withExceptionHandler(ExceptionHandler exceptionHandler)
    Adds a custom exception handler for source failures.

Features

  1. Auto-configuration
    Automatically detects and applies configurations from network.source in the Helm YAML.

  2. Custom Error Handling
    Provides an option to specify a custom exception handler to manage errors during data reception.

Usage Example

NetworkStreamSourceConfigurator()
    .withType(NetworkType.TCP)
    .withPort(8080)
    .withExceptionHandler((context, exception) -> {
        System.err.println("Error while receiving data: " + exception.getMessage());
    });

This configuration allows the source to receive data from the network using the specified protocol and port, with support for handling reception errors.


Iterable Source

The iterable source generates a stream of data items from an iterable collection. It is useful for testing or for scenarios where data is pre-defined and available in-memory.

Configuration Options

Java Configuration:

IterableSource.from(List.of("item1", "item2", "item3"));
IterableSource.iterate(1, 2, 3);

File Source

The file source reads data items from a specified file, where each line is treated as a new data item. It is useful for streaming data stored in text files.

It requires parallelism to be set to 1 as only one thread can read lines from the file.

Configuration Options

Java Configuration:

FileSource.withNewlineDelimiter("/path/to/input.txt");

Stdin Source

The stdin source reads data items from standard input (stdin), treating each line as a separate data item. It is useful for interactive scenarios or debugging where input is provided manually.

Configuration Options

Java Configuration:

StdinSource source = new StdinSource();