S3-sqs-event-listener¶
The S3 SQS Event Listener receives Amazon S3 bucket notifications via an SQS queue. This approach is more efficient and real-time compared to polling-based solutions.
The listener will read all messages from SQS. The listener can be configured with filtering prefix and if prefix doesn't match the message will not be processed, it will be deleted along all other processed messages.
The metadata contains additional information about eventTime, eventType, versionId read from SQS.
Prerequisites:
1. Configure S3 bucket notifications to send events to an SQS queue
A: Create SQS Queue: aws sqs create-queue --queue-name s3-events-queue
B: Allow S3 to send messages to your queue (the s3-events-queue) by applying the policy
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "s3.amazonaws.com"
},
"Action": "sqs:SendMessage",
"Resource": "arn:aws:sqs:us-east-1:123456789012:s3-events-queue",
"Condition": {
"ArnEquals": {
"aws:SourceArn": "arn:aws:s3:::your-bucket-name"
}
}
}
]
}
aws s3api put-bucket-notification-configuration --bucket your-bucket-name --notification-configuration '{
"QueueConfigurations": [
{
"Id": "s3-sqs-notification",
"QueueArn": "arn:aws:sqs:us-east-1:123456789012:s3-events-queue",
"Events": ["s3:ObjectCreated:*", "s3:ObjectRemoved:*"],
"Filter": {
"Key": {
"FilterRules": [
{
"Name": "prefix",
"Value": "uploads/"
}
]
}
}
}
]
}
The listener automatically handles: - SNS-wrapped messages (if S3 notifications go through SNS first) - Message deletion from the queue after successful processing - Event type mapping (ObjectCreated -> CREATED, ObjectRemoved -> DELETED) - Bucket and prefix filtering
The S3ObjectMessage includes the following metadata:
- bucketName: The name of the S3 bucket
- key: The key (path) of the S3 object
- content: The actual content of the S3 object as a ByteBuffer, can be null
- contentType: The content type of the S3 object
- contentLength: The size of the S3 object in bytes
- lastModified: The timestamp when the S3 object was last modified
- eTag: The entity tag of the S3 object
- metadata: A map of user-defined metadata associated with the S3 object
.consumeFromSource(S3SqsEventListenerSourceConfigBuilder.builder()
.withS3Resource(value)
.withSqsResource(value)
.withQueueUrl(value)
.withPrefix(value)
.withDownloadContent(value)
.withMaxMessages(value)
.withMaxWaitTime(value)
)
.terminateWithSink(...);
source:
s3-sqs-event-listener:
s3Resource: value
sqsResource: value
queueUrl: value
prefix: value
downloadContent: value
maxMessages: value
maxWaitTime: value
Java dependency management¶
Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.
<dependency>
<groupId>org.voltdb</groupId>
<artifactId>volt-stream-plugin-aws-api</artifactId>
<version>1.8.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-aws-api', version: '1.8.0'
Properties¶
s3Resource¶
Reference to the S3 client resource. Required.
Type: object
sqsResource¶
Reference to the SQS client resource. Required.
Type: object
queueUrl¶
The name of the SQS queue. Required.
Type: string
prefix¶
Optional prefix filter. If specified, only events for objects with keys starting with this prefix will be processed. If null or empty, objects will not be filtered by prefix.
Type: string
downloadContent¶
Whether to pull object's content into worker's memory. This can be expensive operation for big files.
Type: boolean
Default value: false
maxMessages¶
Maximum number of messages to receive in a single request (1-10).
Type: number
Default value: 10
maxWaitTime¶
The duration for which the call waits for a message to arrive. Minimum 1 second.
Type: object
Default value: PT1M
JSON Schema¶
You can validate or explore the configuration using its JSON Schema.
Usage Examples¶
stream
.withName("Process S3 bucket events via SQS")
.consumeFromSource(
S3SqsEventListenerSourceConfigBuilder.builder()
.withS3ResourceName("s3-client")
.withSqsResourceName("sqs-client")
.withPrefix("uploads/") // Optional filter
.withMaxMessages(10)
.withMaxWaitTime(Duration.ofSeconds(20))
)
.processWith(event -> {
// logic
return event;
})
.terminateWithSink(sink);
Metrics¶
S3 metrics¶
Metric enum: org.voltdb.stream.plugin.s3.S3Metric
| Prometheus name | Type | Description |
|---|---|---|
voltsp_s3_create_event_total |
counter |
Number of S3 object create events observed. |
voltsp_s3_delete_error_total |
counter |
Number of failed S3 delete requests. |
voltsp_s3_delete_event_total |
counter |
Number of S3 object delete events observed. |
voltsp_s3_delete_total |
counter |
Number of successful S3 delete requests. |
voltsp_s3_get_error_total |
counter |
Number of failed S3 get requests. |
voltsp_s3_get_total |
counter |
Number of successful S3 get requests. |
voltsp_s3_list_error_total |
counter |
Number of failed S3 list requests. |
voltsp_s3_list_total |
counter |
Number of successful S3 list requests. |
voltsp_s3_put_error_total |
counter |
Number of failed S3 put requests. |
voltsp_s3_put_size_bytes |
counter |
S3 put object size. |
voltsp_s3_put_total |
counter |
Number of successful S3 put requests. |
voltsp_s3_update_event_total |
counter |
Number of S3 object update events observed. |
voltsp_s3_delete_time_seconds |
histogram |
S3 delete request duration. |
voltsp_s3_get_size_bytes |
histogram |
S3 get object size. |
voltsp_s3_get_time_seconds |
histogram |
S3 get request duration statistics. |
voltsp_s3_list_time_seconds |
histogram |
S3 list request duration. |
voltsp_s3_put_time_seconds |
histogram |
S3 put request duration statistics. |
Sqs metrics¶
Metric enum: org.voltdb.stream.plugin.sqs.SqsMetric
| Prometheus name | Type | Description |
|---|---|---|
voltsp_sqs_delete_error_total |
counter |
Number of SQS delete errors. |
voltsp_sqs_deleted_total |
counter |
Number of SQS messages deleted. |
voltsp_sqs_read_error_total |
counter |
Number of SQS read errors. |
voltsp_sqs_read_size_bytes |
counter |
Number of bytes read from SQS messages. |
voltsp_sqs_read_timeout_total |
counter |
Number of SQS read timeouts. |
voltsp_sqs_delete_time_seconds |
histogram |
SQS delete request duration. |
voltsp_sqs_read_time_seconds |
histogram |
SQS read request duration. |
S3 tags¶
Tag enum: org.voltdb.stream.plugin.s3.S3Tag
Metrics reported by this component may include these tags. Not every metric includes every tag; tags are present only when they are relevant to the measurement.
| Tag | Description |
|---|---|
region |
AWS region used by the S3 or SQS client. |