Skip to content

S3-sqs-event-listener

The S3 SQS Event Listener receives Amazon S3 bucket notifications via an SQS queue. This approach is more efficient and real-time compared to polling-based solutions.

The listener will read all messages from SQS. The listener can be configured with filtering prefix and if prefix doesn't match the message will not be processed, it will be deleted along all other processed messages.

The metadata contains additional information about eventTime, eventType, versionId read from SQS.

Prerequisites: 1. Configure S3 bucket notifications to send events to an SQS queue A: Create SQS Queue: aws sqs create-queue --queue-name s3-events-queue B: Allow S3 to send messages to your queue (the s3-events-queue) by applying the policy

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "s3.amazonaws.com"
      },
      "Action": "sqs:SendMessage",
      "Resource": "arn:aws:sqs:us-east-1:123456789012:s3-events-queue",
      "Condition": {
        "ArnEquals": {
          "aws:SourceArn": "arn:aws:s3:::your-bucket-name"
        }
      }
    }
  ]
}
C: Configure S3 Bucket Notifications
aws s3api put-bucket-notification-configuration          --bucket your-bucket-name          --notification-configuration '{
      "QueueConfigurations": [
        {
          "Id": "s3-sqs-notification",
          "QueueArn": "arn:aws:sqs:us-east-1:123456789012:s3-events-queue",
          "Events": ["s3:ObjectCreated:*", "s3:ObjectRemoved:*"],
          "Filter": {
            "Key": {
              "FilterRules": [
                {
                  "Name": "prefix",
                  "Value": "uploads/"
                }
              ]
            }
          }
        }
      ]
    }
2. Ensure the SQS queue has appropriate permissions to receive S3 notifications 3. Configure IAM permissions for the application to read from the SQS queue

The listener automatically handles: - SNS-wrapped messages (if S3 notifications go through SNS first) - Message deletion from the queue after successful processing - Event type mapping (ObjectCreated -> CREATED, ObjectRemoved -> DELETED) - Bucket and prefix filtering

The S3ObjectMessage includes the following metadata: - bucketName: The name of the S3 bucket - key: The key (path) of the S3 object - content: The actual content of the S3 object as a ByteBuffer, can be null - contentType: The content type of the S3 object - contentLength: The size of the S3 object in bytes - lastModified: The timestamp when the S3 object was last modified - eTag: The entity tag of the S3 object - metadata: A map of user-defined metadata associated with the S3 object

.consumeFromSource(S3SqsEventListenerSourceConfigBuilder.builder()
    .withS3Resource(value)
    .withSqsResource(value)
    .withQueueUrl(value)
    .withPrefix(value)
    .withDownloadContent(value)
    .withMaxMessages(value)
    .withMaxWaitTime(value)
)
.terminateWithSink(...);
source:
  s3-sqs-event-listener:
    s3Resource: value
    sqsResource: value
    queueUrl: value
    prefix: value
    downloadContent: value
    maxMessages: value
    maxWaitTime: value

Java dependency management

Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.

<dependency>
    <groupId>org.voltdb</groupId>
    <artifactId>volt-stream-plugin-aws-api</artifactId>
    <version>1.8.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-aws-api', version: '1.8.0'

Properties

s3Resource

Reference to the S3 client resource. Required.

Type: object

sqsResource

Reference to the SQS client resource. Required.

Type: object

queueUrl

The name of the SQS queue. Required.

Type: string

prefix

Optional prefix filter. If specified, only events for objects with keys starting with this prefix will be processed. If null or empty, objects will not be filtered by prefix.

Type: string

downloadContent

Whether to pull object's content into worker's memory. This can be expensive operation for big files. Type: boolean

Default value: false

maxMessages

Maximum number of messages to receive in a single request (1-10). Type: number

Default value: 10

maxWaitTime

The duration for which the call waits for a message to arrive. Minimum 1 second. Type: object

Default value: PT1M

JSON Schema

You can validate or explore the configuration using its JSON Schema.

Usage Examples

stream
       .withName("Process S3 bucket events via SQS")
       .consumeFromSource(
               S3SqsEventListenerSourceConfigBuilder.builder()
                       .withS3ResourceName("s3-client")
                       .withSqsResourceName("sqs-client")
                       .withPrefix("uploads/")       // Optional filter
                       .withMaxMessages(10)
                       .withMaxWaitTime(Duration.ofSeconds(20))
       )
       .processWith(event -> {
           // logic
           return event;
       })
       .terminateWithSink(sink);

Metrics

S3 metrics

Metric enum: org.voltdb.stream.plugin.s3.S3Metric

Prometheus name Type Description
voltsp_s3_create_event_total counter Number of S3 object create events observed.
voltsp_s3_delete_error_total counter Number of failed S3 delete requests.
voltsp_s3_delete_event_total counter Number of S3 object delete events observed.
voltsp_s3_delete_total counter Number of successful S3 delete requests.
voltsp_s3_get_error_total counter Number of failed S3 get requests.
voltsp_s3_get_total counter Number of successful S3 get requests.
voltsp_s3_list_error_total counter Number of failed S3 list requests.
voltsp_s3_list_total counter Number of successful S3 list requests.
voltsp_s3_put_error_total counter Number of failed S3 put requests.
voltsp_s3_put_size_bytes counter S3 put object size.
voltsp_s3_put_total counter Number of successful S3 put requests.
voltsp_s3_update_event_total counter Number of S3 object update events observed.
voltsp_s3_delete_time_seconds histogram S3 delete request duration.
voltsp_s3_get_size_bytes histogram S3 get object size.
voltsp_s3_get_time_seconds histogram S3 get request duration statistics.
voltsp_s3_list_time_seconds histogram S3 list request duration.
voltsp_s3_put_time_seconds histogram S3 put request duration statistics.

Sqs metrics

Metric enum: org.voltdb.stream.plugin.sqs.SqsMetric

Prometheus name Type Description
voltsp_sqs_delete_error_total counter Number of SQS delete errors.
voltsp_sqs_deleted_total counter Number of SQS messages deleted.
voltsp_sqs_read_error_total counter Number of SQS read errors.
voltsp_sqs_read_size_bytes counter Number of bytes read from SQS messages.
voltsp_sqs_read_timeout_total counter Number of SQS read timeouts.
voltsp_sqs_delete_time_seconds histogram SQS delete request duration.
voltsp_sqs_read_time_seconds histogram SQS read request duration.

S3 tags

Tag enum: org.voltdb.stream.plugin.s3.S3Tag

Metrics reported by this component may include these tags. Not every metric includes every tag; tags are present only when they are relevant to the measurement.

Tag Description
region AWS region used by the S3 or SQS client.