Source code for cgl.core.cloud.sqs_queue

import boto3
import json

CLOUD_QUEUES = {"remote_publish": "remote_publish.fifo"}


[docs] class SQSMessage(dict): pass
[docs] class SQS(): def __init__(self, region: str, access_key: str = None, secret_key: str = None, session_token: str= None): if access_key: self.sqs = boto3.client("sqs", region_name=region, aws_access_key_id=access_key, aws_secret_access_key=secret_key, aws_session_token=session_token ) else: self.sqs = boto3.client("sqs", region_name=region)
[docs] def publish_message(self, queue_name: str, message: SQSMessage, message_id: str, message_group: str = "default"): j_message = json.dumps(message) # Get the URL of the queue response = self.sqs.get_queue_url(QueueName=queue_name) queue_url = response["QueueUrl"] # Publish a message to the queue response = self.sqs.send_message( QueueUrl=queue_url, MessageBody=j_message, MessageGroupId=message_group, MessageDeduplicationId=message_id, ) return response
[docs] def recieve_messages(self, queue_name: str): response = self.sqs.get_queue_url(QueueName=queue_name) queue_url = response["QueueUrl"] # Receive messages from the queue response = self.sqs.receive_message( QueueUrl=queue_url, MaxNumberOfMessages=1, ) # Get the messages from the response messages = response.get("Messages", []) return messages
[docs] def delete_message(self, queue_name: str, receipt: str): response = self.sqs.get_queue_url(QueueName=queue_name) queue_url = response["QueueUrl"] # Receive messages from the queue response = self.sqs.delete_message( QueueUrl=queue_url, ReceiptHandle=receipt, ) return response