Skip to content

S3

The S3 source monitors an Amazon S3 bucket or compatible object storage service for objects. It reads objects from the specified bucket and prefix, and emits them as S3ObjectMessage objects that include all file metadata about the S3 object, can also be configured to download the content of the file.

Objects can optionally be deleted after they are processed.

Note: This source should not be parallelized, each worker will read the same file from the same bucket. If the worker is restarted it will re-read all files from the same bucket and create duplicates. Please compare eTag value to deduplicate events.

The S3ObjectMessage includes the following metadata: - bucketName: The name of the S3 bucket - key: The key (path) of the S3 object - content: The actual content of the S3 object as a ByteBuffer, can be null - contentType: The content type of the S3 object - contentLength: The size of the S3 object in bytes - lastModified: The timestamp when the S3 object was last modified - eTag: The entity tag of the S3 object - metadata: A map of user-defined metadata associated with the S3 object

.consumeFromSource(S3SourceConfigBuilder.builder()
    .withBucketName(value)
    .withPrefix(value)
    .withDeleteOnCommit(value)
    .withDownloadContent(value)
    .withS3ResourceName(value)
    .withInitialCheckTime(value)
)
.terminateWithSink(...);
source:
  s3:
    bucketName: value
    prefix: value
    deleteOnCommit: value
    downloadContent: value
    s3ResourceName: value
    initialCheckTime: value

Java dependency management

Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.

<dependency>
    <groupId>org.voltdb</groupId>
    <artifactId>volt-stream-plugin-aws-api</artifactId>
    <version>1.0-20250910-124207-release-1.5.3</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-aws-api', version: '1.0-20250910-124207-release-1.5.3'

Properties

bucketName

The name of the S3 bucket to read objects from. Required.

Type: string

prefix

The prefix to filter objects by. Only objects with keys starting with this prefix will be processed. Type: string

deleteOnCommit

Whether to delete objects from the bucket after they are read. Type: boolean

Default value: false

downloadContent

Whether to pull object's content. This can be expensive operation for big files. Type: boolean

Default value: false

s3ResourceName

The name of the s3 client resource. Required.

Type: string

initialCheckTime

The instant time used to compare against the file’s modification time. If file's modification time is before given timestamp the file will be ignored. Default is epoch time, meaning all files will be processed.

Type: string

Usage Examples

stream
       .withName("List files from S3")
       .configureResource("s3-client", S3ResourceConfigBuilder.class, resourceBuilder -> {})
       .consumeFromSource(
               S3SourceConfigBuilder.builder()
                       .withBucketName("my-bucket")
                       .withPrefix("input/")
                       .withS3ResourceName("s3-client")
                       .withDeleteOnCommit(true)
                       .withDownloadContent(true)
                       .withPollingInterval(Duration.ofSeconds(30)
                       .withInitialCheckTime(Instant.now())
       .processWith(message -> {
           // Access S3 object metadata
           String key = message.key();
           String contentType = message.contentType();
           long contentLength = message.contentLength();

           // Convert content to string
           byte[] content = new S3ObjectMessageConverter().extractContent(message);
           return new String(content, StandardCharsets.UTF_8);
       })
       .terminateWithSink(sink);

This configuration reads objects from the "input/" prefix in the "my-bucket" bucket every 30 seconds, extracts metadata and content from the S3ObjectMessage, converts the content to strings, and sends them to a sink. The file will be deleted on source commit. Files older than Instant.now() will be skipped, the source will process only files created after this pipeline starts.