Source code for cgl.core.cloud.s3

import logging

import boto3
from botocore.exceptions import ClientError
import os


[docs] class S3Bucket: def __init__(self, region: str, bucket_name: str, access_key: str = None, secret_key: str = None, session_token: str = None): if access_key: self.s3_client = boto3.client("s3", region_name=region, aws_access_key_id=access_key, aws_secret_access_key=secret_key, aws_session_token=session_token) else: self.s3_client = boto3.client("s3", region_name=region) self.bucket_name = bucket_name
[docs] def upload_data(self, file_path: str, object_name: str = None, prefix="default"): if object_name is None: object_name = file_path object_name = f"{prefix}/{object_name}" try: self.s3_client.upload_file(file_path, self.bucket_name, object_name) logging.info(f"{file_path} uploaded to {self.bucket_name}/{object_name}") print(f"{file_path} uploaded to {self.bucket_name}/{object_name}") return True except ClientError as e: logging.info(f"Error uploading {file_path} to {self.bucket_name}/{object_name}: {e}") print(f"Error uploading {file_path} to {self.bucket_name}/{object_name}: {e}") return False
[docs] def download_data(self, object_name, file_path=None): if file_path is None: file_path = object_name try: self.s3_client.download_file(self.bucket_name, object_name, file_path) logging.info(f"{self.bucket_name}/{object_name} downloaded to {file_path}") return True except ClientError as e: logging.error(f"Error downloading {self.bucket_name}/{object_name} to {file_path}: {e}") return False
[docs] def upload_folder(self, folder_path, prefix): if os.path.isdir(folder_path): folder_path = os.path.abspath(folder_path) else: print(f"Folder path {folder_path} is not a directory") for root, _, files in os.walk(folder_path): for file in files: file_path = os.path.join(root, file) object_name = os.path.relpath(file_path, folder_path) self.upload_data(file_path, object_name, prefix=prefix)
[docs] def download_folder(self, prefix, local_path): paginator = self.s3_client.get_paginator("list_objects_v2") for result in paginator.paginate(Bucket=self.bucket_name, Prefix=prefix): for content in result.get("Contents", []): object_name = content["Key"] file_path = os.path.join(local_path, object_name[len(prefix):].lstrip("/")) os.makedirs(os.path.dirname(file_path), exist_ok=True) logging.info(f"downloading {prefix} -> {file_path}") self.download_data(object_name, file_path)
[docs] def delete_folder(self, prefix): paginator = self.s3_client.get_paginator("list_objects_v2") for result in paginator.paginate(Bucket=self.bucket_name, Prefix=prefix): objects = [{"Key": content["Key"]} for content in result.get("Contents", [])] if objects: self.s3_client.delete_objects(Bucket=self.bucket_name, Delete={"Objects": objects})
[docs] def list_folder_contents(self, folder_path): folder_contents = [] paginator = self.s3_client.get_paginator("list_objects_v2") page_iterator = paginator.paginate(Bucket=self.bucket_name, Prefix=folder_path) filtered_iterator = page_iterator.search(f"Contents[?Key !=null ] | [?contains(Key,'{folder_path}')]") for result_dict in filtered_iterator: if result_dict: folder_path = result_dict['Key'] if folder_path.split("/")[-1]: folder_contents.append(folder_path.split("/")[-1]) return folder_contents
[docs] def main(): bucket_name = "premise-remote-publish-test" s3_handler = S3Bucket(bucket_name, ) # Upload a folder to S3 folder_to_upload = "somepath" s3_handler.upload_folder(folder_to_upload, "besttest1") # local_download_path = "some path" s3_handler.download_folder("besttest1", local_download_path)
if __name__ == "__main__": main()