Skip to content

S3

The S3 source monitors an Amazon S3 bucket or compatible object storage service for objects. It reads objects from the specified bucket and prefix, and emits them as S3ObjectMessage objects that include all file metadata about the S3 object, can also be configured to download the content of the file.

Objects can optionally be deleted after they are processed.

Note: This source should not be parallelized, each worker will read the same file from the same bucket. If the worker is restarted it will re-read all files from the same bucket and create duplicates. Please compare eTag value to deduplicate events.

The S3ObjectMessage includes the following metadata: - bucketName: The name of the S3 bucket - key: The key (path) of the S3 object - content: The actual content of the S3 object as a ByteBuffer, can be null - contentType: The content type of the S3 object - contentLength: The size of the S3 object in bytes - lastModified: The timestamp when the S3 object was last modified - eTag: The entity tag of the S3 object - metadata: A map of user-defined metadata associated with the S3 object

.consumeFromSource(S3SourceConfigBuilder.builder()
    .withBucketName(value)
    .withPrefix(value)
    .withDeleteOnCommit(value)
    .withDownloadContent(value)
    .withS3Resource(value)
    .withInitialCheckTime(value)
)
.terminateWithSink(...);
source:
  s3:
    bucketName: value
    prefix: value
    deleteOnCommit: value
    downloadContent: value
    s3Resource: value
    initialCheckTime: value

Java dependency management

Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.

<dependency>
    <groupId>org.voltdb</groupId>
    <artifactId>volt-stream-plugin-aws-api</artifactId>
    <version>1.8.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-aws-api', version: '1.8.0'

Properties

bucketName

The name of the S3 bucket to read objects from. Required.

Type: string

prefix

The prefix to filter objects by. Only objects with keys starting with this prefix will be processed. Type: string

deleteOnCommit

Whether to delete objects from the bucket after they are read. Type: boolean

Default value: false

downloadContent

Whether to pull object's content. This can be expensive operation for big files. Type: boolean

Default value: false

s3Resource

Reference to the S3 client resource. Required.

Type: object

initialCheckTime

The instant time used to compare against the file’s modification time. If file's modification time is before given timestamp the file will be ignored. Default is epoch time, meaning all files will be processed.

Type: string

JSON Schema

You can validate or explore the configuration using its JSON Schema.

Usage Examples

stream
       .withName("List files from S3")
       .configureResource("s3-client", S3ResourceConfigBuilder.class, resourceBuilder -> {})
       .consumeFromSource(
               S3SourceConfigBuilder.builder()
                       .withBucketName("my-bucket")
                       .withPrefix("input/")
                       .withS3ResourceName("s3-client")
                       .withDeleteOnCommit(true)
                       .withDownloadContent(true)
                       .withPollingInterval(Duration.ofSeconds(30)
                       .withInitialCheckTime(Instant.now())
       .processWith(message -> {
           // Access S3 object metadata
           String key = message.key();
           String contentType = message.contentType();
           long contentLength = message.contentLength();

           // Convert content to string
           byte[] content = new S3ObjectMessageConverter().extractContent(message);
           return new String(content, StandardCharsets.UTF_8);
       })
       .terminateWithSink(sink);

This configuration reads objects from the "input/" prefix in the "my-bucket" bucket every 30 seconds, extracts metadata and content from the S3ObjectMessage, converts the content to strings, and sends them to a sink. The file will be deleted on source commit. Files older than Instant.now() will be skipped, the source will process only files created after this pipeline starts.

Metrics

S3 metrics

Metric enum: org.voltdb.stream.plugin.s3.S3Metric

Prometheus name Type Description
voltsp_s3_create_event_total counter Number of S3 object create events observed.
voltsp_s3_delete_error_total counter Number of failed S3 delete requests.
voltsp_s3_delete_event_total counter Number of S3 object delete events observed.
voltsp_s3_delete_total counter Number of successful S3 delete requests.
voltsp_s3_get_error_total counter Number of failed S3 get requests.
voltsp_s3_get_total counter Number of successful S3 get requests.
voltsp_s3_list_error_total counter Number of failed S3 list requests.
voltsp_s3_list_total counter Number of successful S3 list requests.
voltsp_s3_put_error_total counter Number of failed S3 put requests.
voltsp_s3_put_size_bytes counter S3 put object size.
voltsp_s3_put_total counter Number of successful S3 put requests.
voltsp_s3_update_event_total counter Number of S3 object update events observed.
voltsp_s3_delete_time_seconds histogram S3 delete request duration.
voltsp_s3_get_size_bytes histogram S3 get object size.
voltsp_s3_get_time_seconds histogram S3 get request duration statistics.
voltsp_s3_list_time_seconds histogram S3 list request duration.
voltsp_s3_put_time_seconds histogram S3 put request duration statistics.

S3 tags

Tag enum: org.voltdb.stream.plugin.s3.S3Tag

Metrics reported by this component may include these tags. Not every metric includes every tag; tags are present only when they are relevant to the measurement.

Tag Description
region AWS region used by the S3 or SQS client.