Skip to content

S3-sqs-event-listener

The S3 SQS Event Listener receives Amazon S3 bucket notifications via an SQS queue. This approach is more efficient and real-time compared to polling-based solutions.

The listener will read all messages from SQS. The listener can be configured with filtering prefix and if prefix doesn't match the message will not be processed, it will be deleted along all other processed messages.

The metadata contains additional information about eventTime, eventType, versionId read from SQS.

Prerequisites: 1. Configure S3 bucket notifications to send events to an SQS queue A: Create SQS Queue: aws sqs create-queue --queue-name s3-events-queue B: Allow S3 to send messages to your queue (the s3-events-queue) by applying the policy

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "s3.amazonaws.com"
      },
      "Action": "sqs:SendMessage",
      "Resource": "arn:aws:sqs:us-east-1:123456789012:s3-events-queue",
      "Condition": {
        "ArnEquals": {
          "aws:SourceArn": "arn:aws:s3:::your-bucket-name"
        }
      }
    }
  ]
}
C: Configure S3 Bucket Notifications
aws s3api put-bucket-notification-configuration          --bucket your-bucket-name          --notification-configuration '{
      "QueueConfigurations": [
        {
          "Id": "s3-sqs-notification",
          "QueueArn": "arn:aws:sqs:us-east-1:123456789012:s3-events-queue",
          "Events": ["s3:ObjectCreated:*", "s3:ObjectRemoved:*"],
          "Filter": {
            "Key": {
              "FilterRules": [
                {
                  "Name": "prefix",
                  "Value": "uploads/"
                }
              ]
            }
          }
        }
      ]
    }
2. Ensure the SQS queue has appropriate permissions to receive S3 notifications 3. Configure IAM permissions for the application to read from the SQS queue

The listener automatically handles: - SNS-wrapped messages (if S3 notifications go through SNS first) - Message deletion from the queue after successful processing - Event type mapping (ObjectCreated -> CREATED, ObjectRemoved -> DELETED) - Bucket and prefix filtering

The S3ObjectMessage includes the following metadata: - bucketName: The name of the S3 bucket - key: The key (path) of the S3 object - content: The actual content of the S3 object as a ByteBuffer, can be null - contentType: The content type of the S3 object - contentLength: The size of the S3 object in bytes - lastModified: The timestamp when the S3 object was last modified - eTag: The entity tag of the S3 object - metadata: A map of user-defined metadata associated with the S3 object

.consumeFromSource(S3SqsEventListenerConfigBuilder.builder()
    .withS3ResourceName(value)
    .withSqsResourceName(value)
    .withQueueUrl(value)
    .withPrefix(value)
    .withDownloadContent(value)
    .withMaxMessages(value)
    .withMaxWaitTime(value)
)
.terminateWithSink(...);
source:
  s3-sqs-event-listener:
    s3ResourceName: value
    sqsResourceName: value
    queueUrl: value
    prefix: value
    downloadContent: value
    maxMessages: value
    maxWaitTime: value

Java dependency management

Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.

<dependency>
    <groupId>org.voltdb</groupId>
    <artifactId>volt-stream-plugin-aws-api</artifactId>
    <version>1.0-20250910-124207-release-1.5.3</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-aws-api', version: '1.0-20250910-124207-release-1.5.3'

Properties

s3ResourceName

The name of the S3 client resource. Required.

Type: string

sqsResourceName

The name of the SQS client resource. Required.

Type: string

queueUrl

The name of the SQS queue. Required.

Type: string

prefix

Optional prefix filter. If specified, only events for objects with keys starting with this prefix will be processed. If null or empty, objects will not be filtered by prefix.

Type: string

downloadContent

Whether to pull object's content into worker's memory. This can be expensive operation for big files. Type: boolean

Default value: false

maxMessages

Maximum number of messages to receive in a single request (1-10). Type: number

Default value: 10

maxWaitTime

The duration for which the call waits for a message to arrive. Minimum 1 second. Type: object

Default value: PT1M

Usage Examples

stream
       .withName("Process S3 bucket events via SQS")
       .consumeFromSource(
               S3SqsEventListenerConfigBuilder.builder()
                       .withS3ResourceName("s3-client")
                       .withSqsResourceName("sqs-client")
                       .withPrefix("uploads/")       // Optional filter
                       .withMaxMessages(10)
                       .withMaxWaitTime(Duration.ofSeconds(20))
       )
       .processWith(event -> {
           // logic
           return event;
       })
       .terminateWithSink(sink);