Helm Configuration Options

VoltSP uses Helm to manage pods in Kubernetes. For example, the basic YAML for configuring a VoltSP pipeline looks something like this:

replicaCount: 1

resources:
  limits:
    cpu: 2
    memory: 2G
  requests:
    cpu: 2
    memory: 2G

streaming:
    pipeline:
      className: org.acme.KafkaToVoltPipeline

In this example, the Helm configuration creates one instance of the pipeline, sets the number and size of the virtual machines in use, and identifies the pipeline definition to use by specifying the Java class.

By default, the Java Virtual Machine (JVM) uses 80% of the container's RAM. This can be overridden by setting either the Java environment variables -Xms2g -Xmx2g or -XX:InitialRAMPercentage, -XX:MinRAMPercentage, and -XX:MaxRAMPercentage.

The Helm configuration is also responsible for setting any pipeline properties that your streaming application needs. There are two ways to pass in pipeline properties from Helm:

The method you use to pass the properties affects how the Java code for your pipeline retrieves them. The following sections explain how to declare pipeline properties using the two methods and how to retrieve those values in the Java code.

streaming.javaProperties

One alternative for assigning custom configuration properties to your pipeline is to specify Java properties in the Helm configuration using streaming.javaProperties. For example:

replicaCount: 1

resources:
  limits:
    cpu: 2
    memory: 2G
  requests:
    cpu: 2
    memory: 2G

streaming:
  javaProperties: >
    -Xms1g
    -Xmx1g
    -Dproducer.msg.count=5000
    -Dproducer.msg.bytes=1024
    -Dpipeline=org.acme.KafkaToVoltPipeline
    -Dkafka.consumer.group=1  
    -Dkafka.topic=greetings
    -Dkafka.bootstrap.servers=kafka.my.corp.com
    -Dkafka.schema.registry.url=http://registry.my.corp.com

All of the properties you declare this way are available at runtime as standard Java properties and can be read using the System.getProperty() method:

import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.voltdb.stream.api.Sinks;
import org.voltdb.stream.api.pipeline.VoltStreamBuilder;
import org.voltdb.stream.api.pipeline.VoltPipeline;
import org.voltdb.stream.execution.Property;
import org.voltdb.stream.function.CancelingFunction;

public class ProducerPipeline implements VoltPipeline {

    @Override
    public void define(VoltStreamBuilder stream) {
        var group = System.getProperty("kafka.consumer.group");
        var topic = Property.extractSafe("${kafka.topic}");
        int messageCount = Integer.parseInt(Property.extractSafe("${producer.msg.count}"));
        int messageBytesLength = Integer.parseInt(Property.extractSafe("${producer.msg.bytes}"));

        stream
                .withName("event producer")
                .consumeFromSource(new EventsSource(messageBytesLength))
                .processWith(new CancelingFunction<>(0, messageCount, (acc, event) -> acc + 1))
                .terminateWithSink(Sinks.kafka().accepting(EventMessage.class)
                        .withBootstrapServers("${kafka.bootstrap.servers}")
                        .withTopicName("${kafka.topic}")
                        .withSchemaRegistry("${kafka.schema.registry.url}")
                        .withKeyExtractor(EventMessage::getSessionId)
                        .withValueSerializer(KafkaAvroSerializer.class));
    }
}

streaming.pipeline

Another alternative is to declare your custom pipeline properties as child properties of the Helm streaming.pipeline property. For example:

replicaCount: 1

resources:
  limits:
    cpu: 2
    memory: 2G
  requests:
    cpu: 2
    memory: 2G

streaming:
  pipeline:
    className: org.acme.KafkaToVoltPipeline
    configuration:
      sink:
        kafka:
          topicName: greetings
          bootstrapServers: kafka.my.corp.com
          ### optional parameters
          schemaRegistry: http://registry.my.corp.com
          properties:
            key1: value1
            key2: value2
      producer:
        message:
          count: 5000
          bytes: 1024
  javaProperties: >
    -Xms1g
    -Xmx1g

In addition to any custom properties you define, the Kafka source and sink define a default structure for their own configuration options. For more information on the default structure of Kafka properties, see org.voltdb.stream.api.kafka.KafkaStreamSourceConfigurator and org.voltdb.stream.api.kafka.KafkaStreamSinkConfigurator.

Helm saves the custom configuration as a Kubernetes secret under /etc/voltsp/configuration.yaml. A third alternative for setting custom properties is to mount custom configuration file and override the configurayion location by setting the org.voltsp.configuration.path property.

At runtime, the configuration options are available to the Java code via the stream.getExecutionContext().configurator() class. For example:

import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.voltdb.stream.api.ExecutionContext.ConfigurationContext;
import org.voltdb.stream.api.Sinks;
import org.voltdb.stream.api.pipeline.VoltStreamBuilder;
import org.voltdb.stream.api.pipeline.VoltPipeline;
import org.voltdb.stream.execution.Property;
import org.voltdb.stream.function.CancelingFunction;

public class ProducerPipeline implements VoltPipeline {

    @Override
    public void define(VoltStreamBuilder stream) {
        ExecutionContext.ConfigurationContext configurator = stream.getExecutionContext().configurator();
        int messageCount = configurator.findByPath("producer.msg.count").asInt();
        int messageBytesLength = configurator.findByPath("producer.msg.bytes").asInt();
        int topicName = configurator.findByPath("sink.kafka.topicName").asString();
        System.out.println("Producing events to " + topicName);

        stream
                .withName("event producer")
                .consumeFromSource(new EventsSource(messageBytesLength))
                .processWith(new CancelingFunction<>(0, messageCount, (acc, event) -> acc + 1))
                .terminateWithSink(Sinks.kafka().accepting(EventMessage.class)
                        .withKeyExtractor(EventMessage::getSessionId)
                        .withValueSerializer(KafkaAvroSerializer.class));
    }
}