Helm Configuration Options
VoltSP uses Helm to manage pods in Kubernetes. For example, the basic YAML for configuring a VoltSP pipeline looks something like this:
replicaCount: 1
resources:
limits:
cpu: 2
memory: 2G
requests:
cpu: 2
memory: 2G
streaming:
pipeline:
className: org.acme.KafkaToVoltPipeline
In this example, the Helm configuration creates one instance of the pipeline, sets the number and size of the virtual machines in use, and identifies the pipeline definition to use by specifying the Java class.
By default, the Java Virtual Machine (JVM) uses 80% of the container's RAM.
This can be overridden by setting either the Java environment variables -Xms2g -Xmx2g
or
-XX:InitialRAMPercentage
, -XX:MinRAMPercentage
, and -XX:MaxRAMPercentage
.
The Helm configuration is also responsible for setting any pipeline properties that your streaming application needs. There are two ways to pass in pipeline properties from Helm:
The method you use to pass the properties affects how the Java code for your pipeline retrieves them. The following sections explain how to declare pipeline properties using the two methods and how to retrieve those values in the Java code.
streaming.javaProperties
One alternative for assigning custom configuration properties to your pipeline is to specify
Java properties in the Helm configuration using streaming.javaProperties
. For example:
replicaCount: 1
resources:
limits:
cpu: 2
memory: 2G
requests:
cpu: 2
memory: 2G
streaming:
javaProperties: >
-Xms1g
-Xmx1g
-Dproducer.msg.count=5000
-Dproducer.msg.bytes=1024
-Dpipeline=org.acme.KafkaToVoltPipeline
-Dkafka.consumer.group=1
-Dkafka.topic=greetings
-Dkafka.bootstrap.servers=kafka.my.corp.com
-Dkafka.schema.registry.url=http://registry.my.corp.com
All of the properties you declare this way are available at runtime as standard Java properties
and can be read using the System.getProperty()
method:
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.voltdb.stream.api.Sinks;
import org.voltdb.stream.api.pipeline.VoltStreamBuilder;
import org.voltdb.stream.api.pipeline.VoltPipeline;
import org.voltdb.stream.execution.Property;
import org.voltdb.stream.function.CancelingFunction;
public class ProducerPipeline implements VoltPipeline {
@Override
public void define(VoltStreamBuilder stream) {
var group = System.getProperty("kafka.consumer.group");
var topic = Property.extractSafe("${kafka.topic}");
int messageCount = Integer.parseInt(Property.extractSafe("${producer.msg.count}"));
int messageBytesLength = Integer.parseInt(Property.extractSafe("${producer.msg.bytes}"));
stream
.withName("event producer")
.consumeFromSource(new EventsSource(messageBytesLength))
.processWith(new CancelingFunction<>(0, messageCount, (acc, event) -> acc + 1))
.terminateWithSink(Sinks.kafka().accepting(EventMessage.class)
.withBootstrapServers("${kafka.bootstrap.servers}")
.withTopicName("${kafka.topic}")
.withSchemaRegistry("${kafka.schema.registry.url}")
.withKeyExtractor(EventMessage::getSessionId)
.withValueSerializer(KafkaAvroSerializer.class));
}
}
streaming.pipeline
Another alternative is to declare your custom pipeline properties as child properties
of the Helm streaming.pipeline
property. For example:
replicaCount: 1
resources:
limits:
cpu: 2
memory: 2G
requests:
cpu: 2
memory: 2G
streaming:
pipeline:
className: org.acme.KafkaToVoltPipeline
configuration:
sink:
kafka:
topicName: greetings
bootstrapServers: kafka.my.corp.com
### optional parameters
schemaRegistry: http://registry.my.corp.com
properties:
key1: value1
key2: value2
producer:
message:
count: 5000
bytes: 1024
javaProperties: >
-Xms1g
-Xmx1g
In addition to any custom properties you define, the Kafka source and sink define
a default structure for their own configuration options. For more information on
the default structure of Kafka properties, see
org.voltdb.stream.api.kafka.KafkaStreamSourceConfigurator
and
org.voltdb.stream.api.kafka.KafkaStreamSinkConfigurator
.
Helm saves the custom configuration as a Kubernetes secret under /etc/voltsp/configuration.yaml
.
A third alternative for setting custom properties is to mount custom configuration file
and override the configurayion location by setting the org.voltsp.configuration.path
property.
At runtime, the configuration options are available to the Java code via the
stream.getExecutionContext().configurator()
class. For example:
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.voltdb.stream.api.ExecutionContext.ConfigurationContext;
import org.voltdb.stream.api.Sinks;
import org.voltdb.stream.api.pipeline.VoltStreamBuilder;
import org.voltdb.stream.api.pipeline.VoltPipeline;
import org.voltdb.stream.execution.Property;
import org.voltdb.stream.function.CancelingFunction;
public class ProducerPipeline implements VoltPipeline {
@Override
public void define(VoltStreamBuilder stream) {
ExecutionContext.ConfigurationContext configurator = stream.getExecutionContext().configurator();
int messageCount = configurator.findByPath("producer.msg.count").asInt();
int messageBytesLength = configurator.findByPath("producer.msg.bytes").asInt();
int topicName = configurator.findByPath("sink.kafka.topicName").asString();
System.out.println("Producing events to " + topicName);
stream
.withName("event producer")
.consumeFromSource(new EventsSource(messageBytesLength))
.processWith(new CancelingFunction<>(0, messageCount, (acc, event) -> acc + 1))
.terminateWithSink(Sinks.kafka().accepting(EventMessage.class)
.withKeyExtractor(EventMessage::getSessionId)
.withValueSerializer(KafkaAvroSerializer.class));
}
}