S3-sqs-event-listener¶
The S3 SQS Event Listener receives Amazon S3 bucket notifications via an SQS queue. This approach is more efficient and real-time compared to polling-based solutions.
The listener will read all messages from SQS. The listener can be configured with filtering prefix and if prefix doesn't match the message will not be processed, it will be deleted along all other processed messages.
The metadata contains additional information about eventTime, eventType, versionId read from SQS.
Prerequisites:
1. Configure S3 bucket notifications to send events to an SQS queue
A: Create SQS Queue: aws sqs create-queue --queue-name s3-events-queue
B: Allow S3 to send messages to your queue (the s3-events-queue
) by applying the policy
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "s3.amazonaws.com"
},
"Action": "sqs:SendMessage",
"Resource": "arn:aws:sqs:us-east-1:123456789012:s3-events-queue",
"Condition": {
"ArnEquals": {
"aws:SourceArn": "arn:aws:s3:::your-bucket-name"
}
}
}
]
}
aws s3api put-bucket-notification-configuration --bucket your-bucket-name --notification-configuration '{
"QueueConfigurations": [
{
"Id": "s3-sqs-notification",
"QueueArn": "arn:aws:sqs:us-east-1:123456789012:s3-events-queue",
"Events": ["s3:ObjectCreated:*", "s3:ObjectRemoved:*"],
"Filter": {
"Key": {
"FilterRules": [
{
"Name": "prefix",
"Value": "uploads/"
}
]
}
}
}
]
}
The listener automatically handles: - SNS-wrapped messages (if S3 notifications go through SNS first) - Message deletion from the queue after successful processing - Event type mapping (ObjectCreated -> CREATED, ObjectRemoved -> DELETED) - Bucket and prefix filtering
The S3ObjectMessage
includes the following metadata:
- bucketName
: The name of the S3 bucket
- key
: The key (path) of the S3 object
- content
: The actual content of the S3 object as a ByteBuffer, can be null
- contentType
: The content type of the S3 object
- contentLength
: The size of the S3 object in bytes
- lastModified
: The timestamp when the S3 object was last modified
- eTag
: The entity tag of the S3 object
- metadata
: A map of user-defined metadata associated with the S3 object
.consumeFromSource(S3SqsEventListenerConfigBuilder.builder()
.withS3ResourceName(value)
.withSqsResourceName(value)
.withQueueUrl(value)
.withPrefix(value)
.withDownloadContent(value)
.withMaxMessages(value)
.withMaxWaitTime(value)
)
.terminateWithSink(...);
source:
s3-sqs-event-listener:
s3ResourceName: value
sqsResourceName: value
queueUrl: value
prefix: value
downloadContent: value
maxMessages: value
maxWaitTime: value
Java dependency management¶
Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.
<dependency>
<groupId>org.voltdb</groupId>
<artifactId>volt-stream-plugin-aws-api</artifactId>
<version>1.0-20250910-124207-release-1.5.3</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-aws-api', version: '1.0-20250910-124207-release-1.5.3'
Properties¶
s3ResourceName
¶
The name of the S3 client resource. Required.
Type: string
sqsResourceName
¶
The name of the SQS client resource. Required.
Type: string
queueUrl
¶
The name of the SQS queue. Required.
Type: string
prefix
¶
Optional prefix filter. If specified, only events for objects with keys starting with this prefix will be processed. If null or empty, objects will not be filtered by prefix.
Type: string
downloadContent
¶
Whether to pull object's content into worker's memory. This can be expensive operation for big files.
Type: boolean
Default value: false
maxMessages
¶
Maximum number of messages to receive in a single request (1-10).
Type: number
Default value: 10
maxWaitTime
¶
The duration for which the call waits for a message to arrive. Minimum 1 second.
Type: object
Default value: PT1M
Usage Examples¶
stream
.withName("Process S3 bucket events via SQS")
.consumeFromSource(
S3SqsEventListenerConfigBuilder.builder()
.withS3ResourceName("s3-client")
.withSqsResourceName("sqs-client")
.withPrefix("uploads/") // Optional filter
.withMaxMessages(10)
.withMaxWaitTime(Duration.ofSeconds(20))
)
.processWith(event -> {
// logic
return event;
})
.terminateWithSink(sink);