Source code for cgl.apps.remote_downloader.main

import json
import logging
import os
import time

from cgl.core.cloud.constants import cloud_constants
from cgl.core.cloud.s3 import S3Bucket
from cgl.core.cloud.sqs_queue import SQS
from cgl.core.utils.general import Logger

SLEEP_TIME = 5.0


[docs] def process_message(message): message = json.loads(message) _, prefix, original_path, rel_path = message.split("|") do_download(prefix, rel_path)
[docs] def do_download(prefix, rel_path): bucket = S3Bucket( region=cloud_constants["region"], bucket_name=cloud_constants["remote_publish_s3"], ) local_root = CFG.get_prod_root() local_path = os.path.join(local_root, rel_path) if os.path.exists(local_path): new_path = os.path.join( os.path.join(local_root, ".remote_duplicate", prefix), rel_path ) logging.warning( f"the path {local_path} is already downloaded redirecting to -> {new_path}" ) local_path = new_path bucket.download_folder(prefix, local_path) logging.info(f"deleting folder: {prefix}") bucket.delete_folder(prefix)
[docs] def connect(): q = SQS(region=cloud_constants["region"]) while True: messages = q.recieve_messages(queue_name=cloud_constants["remote_publish_sqs"]) for message in messages: if message: try: process_message(message["Body"]) finally: q.delete_message( cloud_constants["remote_publish_sqs"], message["ReceiptHandle"] ) time.sleep(SLEEP_TIME)
[docs] def main(): level = logging.INFO log_file = os.path.expanduser("~/.remote/remote_downloader.log") if not os.path.isdir(os.path.dirname(log_file)): os.mkdir(os.path.dirname(log_file)) Logger(log_level=level, log_file=log_file) logging.info("starting remote file downloader") connect()
if __name__ == "__main__": main()