Http¶
The http
sink connects and sends http(s) requests to configured downstream HTTP(s) server over single connection per worker thread.
If server doesn't close the connection, it will be reused for subsequent requests.
If server is not available, sink will keep on trying to re-connect each time new event arrives. In other words, if there are not events in the pipeline, this sink will not try to connect.
This sink can execute only POST, PUT, PATCH or DELETE requests.
.consumeFromSource(...)
.terminateWithSink(HttpSinkConfigBuilder.builder()
.withAddress(builder -> builder
.withHost(value)
.withPort(value)
.withHasBracketlessColons(value)
)
.withHeaders(value)
.withConnectTimeout(value)
.withRetryConfiguration(builder -> builder
.withRetries(value)
.withBackoffDelay(value)
.withMaxBackoffDelay(value)
)
.withRequestTimeout(value)
.withSsl(builder -> builder
.withTrustStoreFile(value)
.withTrustStorePassword(value)
.withKeyStoreFile(value)
.withKeyStorePassword(value)
.withKeyPassword(value)
.withInsecure(value)
.withHostnameVerifier(value)
)
.withExceptionHandler(value)
.withSocketOptions(value)
)
sink:
http:
address:
host: value
port: value
hasBracketlessColons: value
headers: value
connectTimeout: value
retryConfiguration:
retries: value
backoffDelay: value
maxBackoffDelay: value
requestTimeout: value
ssl:
trustStoreFile: value
trustStorePassword: value
keyStoreFile: value
keyStorePassword: value
keyPassword: value
insecure: value
hostnameVerifier: value
exceptionHandler: value
socketOptions: value
Java dependency management¶
Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.
<dependency>
<groupId>org.voltdb</groupId>
<artifactId>volt-stream-plugin-http-api</artifactId>
<version>1.0-20250910-124207-release-1.5.3</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-http-api', version: '1.0-20250910-124207-release-1.5.3'
Properties¶
address
¶
Downstream server's address to connect to.
Required.
Type: object
Fields of address
:
address.host
¶
Type: string
address.port
¶
Type: number
address.hasBracketlessColons
¶
Type: boolean
headers
¶
A set of headers to add to the HTTP request.
Type: object
connectTimeout
¶
Connection timeout. Sink will retry if the connection could not be established.
Type: object
Default value: PT20S
retryConfiguration
¶
Retry configuration of failed request.
Type: object
Fields of retryConfiguration
:
retryConfiguration.retries
¶
Number of retry attempts after a request failure.
Type: number
Default value: 3
retryConfiguration.backoffDelay
¶
Initial delay before the first retry attempt.
Type: object
Default value: PT0.2S
retryConfiguration.maxBackoffDelay
¶
Maximum delay between consecutive retry attempts.
Type: object
Default value: PT3S
requestTimeout
¶
Max timeout to wait before marking the request as unsuccessful.
Type: object
Default value: PT1S
ssl
¶
Secure transport configuration.
Type: object
Fields of ssl
:
ssl.trustStoreFile
¶
Truststore file or trusted CA certificate; supported formats include JKS, PKCS#12, or PEM.
Type: string
ssl.trustStorePassword
¶
Truststore password.
Type: string
ssl.keyStoreFile
¶
Keystore file; supported formats include JKS, PKCS#12, or PEM
Type: string
ssl.keyStorePassword
¶
Keystore password.
Type: string
ssl.keyPassword
¶
Private key password. Optional — if not set, the key store password will be used.
Type: string
ssl.insecure
¶
If set to true, disables SSL certificate and hostname validation. Intended for debugging purposes only. Doesn't work with mTLS.
Type: boolean
ssl.hostnameVerifier
¶
Custom hostname verifier for SSL connections. If not specified and 'insecure' is true, hostname verification will be disabled.
Type: object
exceptionHandler
¶
Custom exception handler enabling interception of all errors related to this sink.
Type: object
socketOptions
¶
Configures operating system options to be applied to the server socket.
Supported values are:
- SO_SNDBUF
,
- SO_RCVBUF
,
- SO_TIMEOUT
,
- SO_KEEPALIVE
,
- SO_LINGER
,
- SO_BACKLOG
,
- TCP_NODELAY
.
Type: object
Usage Examples¶
stream
.withName("Read data from HTTP server and print to stdout")
.consumeFromSource(...)
.processWith(event -> new HttpRequest(...))
.terminateWithSink(HttpSinkConfigBuilder.builder()
.withAddress(HostAndPort.fromParts("localhost", 9090))
.withHeaders(Map.of("Content-Type", "application/json"))
);