Skip to content

Elastic

The elastic sink is used to send data to an Elasticsearch cluster. It supports version 7 and 8 and uses official low-level client with features such as:

  • load balancing across all available nodes
  • failover in case of node failures
  • persistent connections
  • SSL/TLS
  • bulk mode

When the CA certificate is available as a PEM-encoded file, set sink.elastic.ssl.trustStoreFileCa in the configuration. Note that trustStoreFileCa cannot be used together with trustStoreFile. They are mutually exclusive.

ElasticSearchSinkConfiguratorBuilder.builder()
     .withAddress("elasticsearch.example.com", 9200)
     .withIndexName("my-index")
     .withAuthBuilder(builder ->
         builder.withUsername("username")
                .withPassword("password"))
     .withSslBuilder(builder ->
         builder.withTrustStoreFile("/path/to/truststore.jks")
                .withTrustStorePassword("truststore-password")
                .withKeyStoreFile("/path/to/keystore.jks")
                .withKeyStorePassword("keystore-password")
                .withKeyPassword("key-password"))
     .addRequestHeadersEntry("Content-Type", "application/json")
     .addRequestParametersEntry("refresh", "true")
sink:
   elastic:
     address: "elasticsearch.example.com:9200"
     indexName: "my-index"
     auth:
       username: "user"
       password: "password"
     ssl:
       trustStoreFile: "/path/to/truststore.jks"
       trustStorePassword: "truststore-password"
       keyStoreFile: "/path/to/keystore.jks"
       keyStorePassword: "keystore-password"
       keyPassword: "key-password"
     requestHeaders:
       Content-Type: "application/json"
     requestParameters:
       timeout: "1m"

Properties

indexName

Elasticsearch index name. Required.

Type: string

address

Elasticsearch address [host:port]. Required.

Type: object

Default value: localhost:9200

Fields of address:

address.host

Type: string

address.port

Type: number

address.hasBracketlessColons

Type: boolean

cacheSizeBytes

Size of the request content size sent to Elasticsearch for bulk operations. Type: number

Default value: 5242880

dataStream

In data stream mode, documents are indexed into a backing index managed by the data stream. Each document must include a @timestamp field, which is mapped as a date or date_nanos field type. Indexing operations are the primary workload, with only occasional updates or deletions. Documents are typically indexed without specifying an _id; however, if an _id is provided, the system assumes first-write-wins behavior, meaning existing documents with the same ID may be silently overwritten.

Type: boolean

Default value: false

retry

Defines the retry policy, including the number of attempts and backoff delays between retries after request failures. Type: object

Fields of retry:

retry.retries

Number of retry attempts after a request failure. Type: number

Default value: 3

retry.backoffDelay

Initial delay before the first retry attempt. Type: object

Default value: PT0.2S

retry.maxBackoffDelay

Maximum delay between consecutive retry attempts. Type: object

Default value: PT3S

auth

Elasticsearch authentication. Type: object

Fields of auth:

auth.username

Username used for authentication. Type: string

auth.password

Password used for authentication. Type: string

ssl

Secure transport configuration. Type: object

Fields of ssl:

ssl.trustStoreFile

Trust store file. Type: string

ssl.trustStorePassword

Trust store password. Type: string

ssl.trustStoreFileCa

Trust store CA certificate file Type: string

ssl.keyStoreFile

Key store file. Type: string

ssl.keyStorePassword

Key store password. Type: string

ssl.keyPassword

Private key password. Type: string

ssl.hostnameVerifier

SSL hostname verifier. Type: object

ssl.insecure

Ignore SSL/hostname validation Type: boolean

exceptionHandler

Custom exception handler enabling interception of all errors related to this source. Type: object

requestHeaders

Extra Elasticsearch client request headers. Type: object

requestParameters

Elasticsearch client request parameters. Type: object

Java dependency management

Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.

<dependency>
    <groupId>org.voltdb</groupId>
    <artifactId>volt-stream-plugin-elastic-api</artifactId>
    <version>1.4.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-elastic-api', version: '1.4.0'