Source code for cgl.apps.remote_downloader.main
import json
import logging
import os
import time
from cgl.core.cloud.constants import cloud_constants
from cgl.core.cloud.s3 import S3Bucket
from cgl.core.cloud.sqs_queue import SQS
from cgl.core.utils.general import Logger
SLEEP_TIME = 5.0
[docs]
def process_message(message):
message = json.loads(message)
_, prefix, original_path, rel_path = message.split("|")
do_download(prefix, rel_path)
[docs]
def do_download(prefix, rel_path):
bucket = S3Bucket(
region=cloud_constants["region"],
bucket_name=cloud_constants["remote_publish_s3"],
)
local_root = CFG.get_prod_root()
local_path = os.path.join(local_root, rel_path)
if os.path.exists(local_path):
new_path = os.path.join(
os.path.join(local_root, ".remote_duplicate", prefix), rel_path
)
logging.warning(
f"the path {local_path} is already downloaded redirecting to -> {new_path}"
)
local_path = new_path
bucket.download_folder(prefix, local_path)
logging.info(f"deleting folder: {prefix}")
bucket.delete_folder(prefix)
[docs]
def connect():
q = SQS(region=cloud_constants["region"])
while True:
messages = q.recieve_messages(queue_name=cloud_constants["remote_publish_sqs"])
for message in messages:
if message:
try:
process_message(message["Body"])
finally:
q.delete_message(
cloud_constants["remote_publish_sqs"], message["ReceiptHandle"]
)
time.sleep(SLEEP_TIME)
[docs]
def main():
level = logging.INFO
log_file = os.path.expanduser("~/.remote/remote_downloader.log")
if not os.path.isdir(os.path.dirname(log_file)):
os.mkdir(os.path.dirname(log_file))
Logger(log_level=level, log_file=log_file)
logging.info("starting remote file downloader")
connect()
if __name__ == "__main__":
main()