The first thing the pipeline needs is a data source. This is defined in the .consumeFromSource method. Active(SP) supports Apache Kafka as a data source out of the box. You specify the type of source with the Sources.kafka() class, which has methods for setting the associated properties. For example, the following code sample creates a data source from a Kafka topic:
.consumeFromSource( Sources.kafka() .withBootstrapServers("${kafka.bootstrap.servers}") .withTopicNames("${kafka.topic}") .withGroupId("${kafka.consumer.group}") .withStartingOffset(KafkaStartingOffset.EARLIEST) .withKeyDeserializer(LongDeserializer.class) .withValueDeserializer(StringDeserializer.class)
Note that the source definition uses placeholders (property names enclosed in braces and prefixed with a dollar sign) in place of actual servers addresses, topic names, and group IDs. This way the pipeline acts a template, defining the actions to take, while the actual sources and destinations can be defined at runtime by setting property values in the Helm YAML files (see Section 4.5, “Running Your Pipeline”).
Similarly, Active(SP) supports two standard data destinations, or sinks: Kafka and Volt Active Data. You define the sink in much the same way you define the source. The following code samples define a Kafka topic and a Volt stored procedure as the final destination for the stream:
.terminateWithSink( Sinks.kafka() .withBootstrapServers("${kafka.bootstrap.servers}") .withTopicName("${kafka.topic}") .withValueSerializer(StringSerializer.class) .withKeyExtractor(String::hashCode, IntegerSerializer.class)
.terminateWithSink( Sinks.volt().procedureCall() .withHostAndStandardPort("${voltdb.host}") .withProcedureName("${voltdb.procedure}")