S3¶
The S3 source monitors an Amazon S3 bucket or compatible object storage service for objects.
It reads objects from the specified bucket and prefix, and emits them as S3ObjectMessage
objects
that include all file metadata about the S3 object, can also be configured to download the content of the file.
Objects can optionally be deleted after they are processed.
Note: This source should not be parallelized, each worker will read the same file from the same bucket. If the worker is restarted it will re-read all files from the same bucket and create duplicates. Please compare eTag value to deduplicate events.
The S3ObjectMessage
includes the following metadata:
- bucketName
: The name of the S3 bucket
- key
: The key (path) of the S3 object
- content
: The actual content of the S3 object as a ByteBuffer, can be null
- contentType
: The content type of the S3 object
- contentLength
: The size of the S3 object in bytes
- lastModified
: The timestamp when the S3 object was last modified
- eTag
: The entity tag of the S3 object
- metadata
: A map of user-defined metadata associated with the S3 object
.consumeFromSource(S3SourceConfigBuilder.builder()
.withBucketName(value)
.withPrefix(value)
.withDeleteOnCommit(value)
.withDownloadContent(value)
.withS3ResourceName(value)
.withInitialCheckTime(value)
)
.terminateWithSink(...);
source:
s3:
bucketName: value
prefix: value
deleteOnCommit: value
downloadContent: value
s3ResourceName: value
initialCheckTime: value
Java dependency management¶
Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.
<dependency>
<groupId>org.voltdb</groupId>
<artifactId>volt-stream-plugin-aws-api</artifactId>
<version>1.0-20250910-124207-release-1.5.3</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-aws-api', version: '1.0-20250910-124207-release-1.5.3'
Properties¶
bucketName
¶
The name of the S3 bucket to read objects from. Required.
Type: string
prefix
¶
The prefix to filter objects by. Only objects with keys starting with this prefix will be processed.
Type: string
deleteOnCommit
¶
Whether to delete objects from the bucket after they are read.
Type: boolean
Default value: false
downloadContent
¶
Whether to pull object's content. This can be expensive operation for big files.
Type: boolean
Default value: false
s3ResourceName
¶
The name of the s3 client resource. Required.
Type: string
initialCheckTime
¶
The instant time used to compare against the file’s modification time. If file's modification time is before given timestamp the file will be ignored. Default is epoch time, meaning all files will be processed.
Type: string
Usage Examples¶
stream
.withName("List files from S3")
.configureResource("s3-client", S3ResourceConfigBuilder.class, resourceBuilder -> {})
.consumeFromSource(
S3SourceConfigBuilder.builder()
.withBucketName("my-bucket")
.withPrefix("input/")
.withS3ResourceName("s3-client")
.withDeleteOnCommit(true)
.withDownloadContent(true)
.withPollingInterval(Duration.ofSeconds(30)
.withInitialCheckTime(Instant.now())
.processWith(message -> {
// Access S3 object metadata
String key = message.key();
String contentType = message.contentType();
long contentLength = message.contentLength();
// Convert content to string
byte[] content = new S3ObjectMessageConverter().extractContent(message);
return new String(content, StandardCharsets.UTF_8);
})
.terminateWithSink(sink);
This configuration reads objects from the "input/" prefix in the "my-bucket" bucket every 30 seconds, extracts metadata and content from the S3ObjectMessage, converts the content to strings, and sends them to a sink. The file will be deleted on source commit. Files older than Instant.now() will be skipped, the source will process only files created after this pipeline starts.