Source code for cgl.core.cloud.sqs_queue
import boto3
import json
CLOUD_QUEUES = {"remote_publish": "remote_publish.fifo"}
[docs]
class SQSMessage(dict):
pass
[docs]
class SQS():
def __init__(self,
region: str,
access_key: str = None,
secret_key: str = None,
session_token: str= None):
if access_key:
self.sqs = boto3.client("sqs",
region_name=region,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
aws_session_token=session_token
)
else:
self.sqs = boto3.client("sqs",
region_name=region)
[docs]
def publish_message(self,
queue_name: str,
message: SQSMessage,
message_id: str,
message_group: str = "default"):
j_message = json.dumps(message)
# Get the URL of the queue
response = self.sqs.get_queue_url(QueueName=queue_name)
queue_url = response["QueueUrl"]
# Publish a message to the queue
response = self.sqs.send_message(
QueueUrl=queue_url,
MessageBody=j_message,
MessageGroupId=message_group,
MessageDeduplicationId=message_id,
)
return response
[docs]
def recieve_messages(self, queue_name: str):
response = self.sqs.get_queue_url(QueueName=queue_name)
queue_url = response["QueueUrl"]
# Receive messages from the queue
response = self.sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=1,
)
# Get the messages from the response
messages = response.get("Messages", [])
return messages
[docs]
def delete_message(self, queue_name: str, receipt: str):
response = self.sqs.get_queue_url(QueueName=queue_name)
queue_url = response["QueueUrl"]
# Receive messages from the queue
response = self.sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=receipt,
)
return response