How to create AWS SQS queue with Python

AWS SQS is a fully managed message queuing service that enables you to decouple and scale microservices, distributed systems, and serverless applications. Using SQS, you can send, store, and receive messages between software components at any volume, without losing messages or requiring other services to be available. This post explains how to create AWS Standard and FIFO using Python Boto3 API.

Import required module and instantiate SQS Client
   
import logging

import time
import boto3
from botocore.exceptions import ClientError

logger = logging.getLogger(__name__)
sqs = boto3.resource('sqs')
 
Write below function to create SQS queues
   
def create_sample_queue(name, queue_attributes=None):
    if not queue_attributes:
        queue_attributes = {}
    try:
        queue = sqs.create_queue(
            QueueName=name,
            Attributes=queue_attributes
        )
        logger.info("Created queue '%s' with URL=%s", name, queue.url)
    except ClientError as error:
        logger.exception("Couldn't create queue named '%s'.", name)
        raise error
    else:
        return queue
 
Create Standard and FIFO queue
   
if __name__=="__main__":

    # create standard queue
    standard_queue_attributes = {
            'MaximumMessageSize': str(1024),
            'ReceiveMessageWaitTimeSeconds': str(20)
        }    
    standard_queue_name = "demo-standard-queue"
    standard_queue = create_sample_queue(standard_queue_name, standard_queue_attributes)
    print(f"Created queue with URL: {standard_queue.url}.")

    # create fifo queue    
    time.sleep(70) # wait before creating fifo queue
    fifo_queue_attributes =  {
            'MaximumMessageSize': str(1024),
            'ReceiveMessageWaitTimeSeconds': str(10),
            'VisibilityTimeout': str(200),
            'FifoQueue': str(True),            
        }
    fifo_queue_name = "demo-fifo-queue.fifo" 
    fifo_queue = create_sample_queue(fifo_queue_name, fifo_queue_attributes)
    print(f"Created queue with URL: {fifo_queue.url}.")

 
Complete code for creating Standard and FIFO SQS queues with Python
   
import logging

import time
import boto3
from botocore.exceptions import ClientError

logger = logging.getLogger(__name__)
sqs = boto3.resource('sqs')

def create_sample_queue(name, queue_attributes=None):
    if not queue_attributes:
        queue_attributes = {}
    try:
        queue = sqs.create_queue(
            QueueName=name,
            Attributes=queue_attributes
        )
        logger.info("Created queue '%s' with URL=%s", name, queue.url)
    except ClientError as error:
        logger.exception("Couldn't create queue named '%s'.", name)
        raise error
    else:
        return queue
    
if __name__=="__main__":
    # create standard queue
    standard_queue_attributes = {
            'MaximumMessageSize': str(1024),
            'ReceiveMessageWaitTimeSeconds': str(20)
        }    
    standard_queue_name = "demo-standard-queue"
    standard_queue = create_sample_queue(standard_queue_name, standard_queue_attributes)
    print(f"Created queue with URL: {standard_queue.url}.")

    # create fifo queue    
    time.sleep(70) # wait before creating fifo queue
    fifo_queue_attributes =  {
            'MaximumMessageSize': str(1024),
            'ReceiveMessageWaitTimeSeconds': str(10),
            'VisibilityTimeout': str(200),
            'FifoQueue': str(True),            
        }
    fifo_queue_name = "demo-fifo-queue.fifo"
    fifo_queue = create_sample_queue(fifo_queue_name, fifo_queue_attributes)
    print(f"Created queue with URL: {fifo_queue.url}.")
    
    
 

Categories: AWS

Similar Articles