Skip to content

Beats

The beats source receives data from Elastic Beats agents.

BeatsSourceConfigBuilder.<BeatsMessage>builder()
     .withAddress("0.0.0.0", 123)
     .withClientInactivityTimeout(Duration.ofSeconds(42))
     .withExceptionHandler(exceptionHandler)
     .withDecoder(identity)
source:
  beats:
    address: "0.0.0.0:514"
    client_inactivity_timeout: "PT1S"

Properties

address

Sets the listening address in host:port format. Required.

Type: object

Fields of address:

address.host

Type: string

address.port

Type: number

address.hasBracketlessColons

Type: boolean

clientInactivityTimeout

Time after which inactive connections will be closed. Type: object

Default value: 30s

decoder

Decoder to be applied to Beats data payloads received. Examples are decoders that convert incoming data to string or to byte arrays.

Required.

Type: object

exceptionHandler

Custom exception handler enabling interception of all errors related to this source.

Type: object

Java dependency management

Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.

<dependency>
    <groupId>org.voltdb</groupId>
    <artifactId>volt-stream-plugin-beats-api</artifactId>
    <version>1.4.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-beats-api', version: '1.4.0'

Usage Examples

// Print beats messages to stdout but only if "some.key" is present in message metadata.
stream
     .consumeFromSource(BeatsSourceConfigBuilder.<BeatsMessage>builder()
         .withAddress("0.0.0.0", 5044)
         .withClientInactivityTimeout(100, TimeUnit.SECONDS)
         .withDecoder(Function.identity())
 )
 .processWith((VoltStreamFunction<BeatsMessage, String>) (beatsMessage, consumer, context) -> {
         if (beatsMessage.getMap().containsKey("some.key")) {
             consumer.consume(beatsMessage.getMessage());
         }
 })
 .terminateWithSink(Sinks.stdout());