SQS S3
Enables seamless ingestion of S3 objects via Amazon Simple Queue Service (SQS) notifications.
Details
The Amazon SQS S3 connector allows you to automatically process files uploaded to one or more S3 buckets by listening for S3 event notifications through a single SQS queue. Whenever files are uploaded to your S3 buckets, S3 publishes event notifications to an SQS queue, which this connector monitors. The connector then automatically downloads and processes the files.
The connector polls an Amazon SQS queue for S3 event notifications, downloads the referenced S3 objects, processes them according to your configured format and compression settings, and extracts records for further processing in your data pipeline.
How It Works
- S3 Event Generation: When objects are created, modified, or deleted in your S3 bucket, S3 generates event notifications
- SQS Message Receipt: These events are sent to your configured SQS queue as JSON messages
- Event Processing: The connector polls the SQS queue, receives S3 event messages, and parses them
- File Download: For each S3 event, the connector downloads the referenced object from S3
- Data Processing: Downloaded files are decompressed (if needed) and parsed according to your format settings
- Record Extraction: Individual records are extracted and sent to your data pipeline
- Cleanup: Successfully processed SQS messages are deleted from the queue
Prerequisites
- An AWS account with existing S3 buckets and SQS queue, or permissions to create them
- An IAM role that your platform can assume, with appropriate permissions to access both SQS and S3, or using static AWS credentials.
- S3 bucket configured to send event notifications to your SQS queue
- Network connectivity between your platform and AWS
Setup Instructions
Step 1: Create IAM Policy
Create an IAM policy that grants the connector access to both S3 and SQS resources:
- IAM Role Assumption / Static Credentials
- Example permission to attach to the role/user:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:ListBucket",
"sqs:ChangeMessageVisibility",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:GetQueueUrl",
"sqs:ReceiveMessage",
"sqs:SendMessage"
],
"Resource": ["*", "*"]
}
]
}
Step 2: Create S3 Bucket (if needed)
Create an S3 bucket to store your source data:
- Sign in to the AWS Management Console at https://console.aws.amazon.com/s3.
- Click the AWS region drop-down list next to your name in the upper right and select the desired region.
- Under General Purpose Buckets, click the Create Bucket button.
- Under General Configuration, enter a name for your bucket (e.g.,
my-data-source-bucket). - Under Object Ownership, ensure ACLs Disabled is selected.
- Leave the remaining default selections unchanged and click the Create Bucket button.
- Note the bucket name and ARN for later use.
Step 3: Create SQS Queue
Set up an SQS queue to receive S3 bucket event notifications:
- Sign in to the AWS Management Console at https://console.aws.amazon.com/sqs.
- Under Get Started, click the Create Queue button.
- Under Details, enter a name for your queue (e.g.,
s3-events-queue). - Under Configuration:
- For Visibility Timeout: Enter
600seconds (10 minutes) - For Message Retention Period: Enter
7days - Set Receive message wait time to
20seconds for long polling
- For Visibility Timeout: Enter
- Under Access Policy > Choose Method, select the Advanced radio button.
- Delete the entire policy JSON and copy/paste the following policy:
{
"Version":"2012-10-17",
"Statement": [
{
"Sid": "example-statement-ID",
"Effect": "Allow",
"Principal": {
"Service": "s3.amazonaws.com"
},
"Action": [
"SQS:SendMessage"
],
"Resource": "arn:aws:sqs:{AWS_REGION}:{AWS_ACCOUNT_ID}:{QUEUE_NAME}",
"Condition": {
"ArnLike": {
"aws:SourceArn": "arn:aws:s3:*:*:{BUCKET_NAME}"
},
"StringEquals": {
"aws:SourceAccount": "{AWS_ACCOUNT_ID}"
}
}
}
]
}
- Replace the placeholders:
{AWS_REGION}with your AWS region (e.g.,us-east-1){AWS_ACCOUNT_ID}with your 12-digit AWS account ID{QUEUE_NAME}with your queue name{BUCKET_NAME}with your S3 bucket name
- Click the Create Queue button.
- Note the queue URL and ARN for later use.
Step 4: Configure S3 Event Notifications
Configure your S3 bucket to send event notifications to the SQS queue:
- Sign in to the AWS Management Console at https://console.aws.amazon.com/s3.
- Under General Purpose Buckets, click on your source bucket.
- Click the Properties tab at the top.
- Locate the Event Notifications section and click Create Event Notification.
- Configure the event notification:
- Event Name: Enter a descriptive name (e.g.,
object-created-notification) - Event Types: Select the checkbox for All object create events
- Prefix/Suffix: Optionally filter by object key prefix or suffix
- Destination: Select the radio button for SQS queue
- SQS Queue: Choose your queue from the dropdown or enter the queue ARN
- Event Name: Enter a descriptive name (e.g.,
- Click the Save Changes button.
Step 5: Verify Event Notifications
AWS sends a test notification to the queue after creation. To verify:
- Navigate to your SQS queue in the AWS Console.
- Under Details, locate Messages available, which should have a value of 1.
- This confirms that S3 can successfully send notifications to your queue.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes"
],
"Resource": "arn:aws:sqs:[REGION]:[ACCOUNT-ID]:[QUEUE-NAME]"
},
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:GetObjectVersion"
],
"Resource": "arn:aws:s3:::[BUCKET-NAME]/*"
}
]
}
Step 6: Test the Configuration
Before configuring the connector in your platform, verify that the AWS setup is working:
- Upload a test file to your S3 bucket to trigger an event notification.
- Check the SQS queue to confirm a message was received:
- Navigate to your SQS queue in the AWS Console
- Under Details, verify Messages available has increased
- Use the IAM Policy Simulator (optional) to test permissions:
- Go to https://policysim.aws.amazon.com/home/index.jsp
- Select your IAM role
- Test the required S3 and SQS actions listed in the permissions section
Note: Make sure to clean the queue of test messages if any before proceeding to avoid any odd behavior during real data processing from your bucket.
Settings
| Setting | Type | Required | Description |
|---|---|---|---|
| Queue URL | string | Yes | The URL of the SQS queue that receives S3 event notifications (e.g., https://sqs.us-east-1.amazonaws.com/123456789012/my-queue) |
| Role ARN | string | Yes | The ARN of the IAM role to assume for accessing SQS and S3 (e.g., arn:aws:iam::123456789012:role/s3-sqs-connector-role) |
| Region | string | Yes | The AWS region where your SQS queue and S3 buckets are located (e.g., us-east-1, us-west-2) |
| Format | string | Yes | File format for processing S3 objects. Supported values: json, csv, wsv |
| Compression | string | No | Compression format of S3 objects. Supported values: gzip, zip, none (default: none) |
| Record Location | string | No | JSONPath expression for extracting records from JSON files (e.g., records to extract from {"records": [...]}) |
Files are automatically decompressed before processing based on the compression setting in your connector configuration.
Example S3 Event Message
When S3 sends an event notification to SQS, the message body contains JSON like this:
{
"Records": [
{
"eventSource": "aws:s3",
"eventName": "ObjectCreated:Put",
"s3": {
"bucket": {
"name": "my-data-bucket"
},
"object": {
"key": "data/2024/01/15/events.json.gz"
}
}
}
]
}
The connector parses these events and downloads the referenced S3 objects for processing.
Important Notes
- Event Filtering: Only messages with
eventSourceofaws:s3are processed; other messages in the queue are ignored - File Processing: Each S3 object is downloaded, decompressed, and parsed according to your format settings
- Record Extraction: Individual records are extracted from files and sent downstream
- Error Handling: If file processing fails, the SQS message remains in the queue and will be retried
- Message Deletion: SQS messages are only deleted after successful file processing, or are deleted if the message is not from an S3. Monad checks the metadata on the SQS message to verify the same.
IAM Permissions Required
The IAM role specified in Role ARN must have permissions for both SQS and S3:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes"
],
"Resource": "arn:aws:sqs:[REGION]:[ACCOUNT-ID]:[QUEUE-NAME]"
},
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:GetObjectVersion"
],
"Resource": "arn:aws:s3:::[BUCKET-NAME]/*"
}
]
}