Source code for cgl.core.cloud.s3
import logging
import boto3
from botocore.exceptions import ClientError
import os
[docs]
class S3Bucket:
def __init__(self, region: str, bucket_name: str, access_key: str = None, secret_key: str = None,
session_token: str = None):
if access_key:
self.s3_client = boto3.client("s3",
region_name=region,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
aws_session_token=session_token)
else:
self.s3_client = boto3.client("s3",
region_name=region)
self.bucket_name = bucket_name
[docs]
def upload_data(self, file_path: str, object_name: str = None, prefix="default"):
if object_name is None:
object_name = file_path
object_name = f"{prefix}/{object_name}"
try:
self.s3_client.upload_file(file_path, self.bucket_name, object_name)
logging.info(f"{file_path} uploaded to {self.bucket_name}/{object_name}")
print(f"{file_path} uploaded to {self.bucket_name}/{object_name}")
return True
except ClientError as e:
logging.info(f"Error uploading {file_path} to {self.bucket_name}/{object_name}: {e}")
print(f"Error uploading {file_path} to {self.bucket_name}/{object_name}: {e}")
return False
[docs]
def download_data(self, object_name, file_path=None):
if file_path is None:
file_path = object_name
try:
self.s3_client.download_file(self.bucket_name, object_name, file_path)
logging.info(f"{self.bucket_name}/{object_name} downloaded to {file_path}")
return True
except ClientError as e:
logging.error(f"Error downloading {self.bucket_name}/{object_name} to {file_path}: {e}")
return False
[docs]
def upload_folder(self, folder_path, prefix):
if os.path.isdir(folder_path):
folder_path = os.path.abspath(folder_path)
else:
print(f"Folder path {folder_path} is not a directory")
for root, _, files in os.walk(folder_path):
for file in files:
file_path = os.path.join(root, file)
object_name = os.path.relpath(file_path, folder_path)
self.upload_data(file_path, object_name, prefix=prefix)
[docs]
def download_folder(self, prefix, local_path):
paginator = self.s3_client.get_paginator("list_objects_v2")
for result in paginator.paginate(Bucket=self.bucket_name, Prefix=prefix):
for content in result.get("Contents", []):
object_name = content["Key"]
file_path = os.path.join(local_path, object_name[len(prefix):].lstrip("/"))
os.makedirs(os.path.dirname(file_path), exist_ok=True)
logging.info(f"downloading {prefix} -> {file_path}")
self.download_data(object_name, file_path)
[docs]
def delete_folder(self, prefix):
paginator = self.s3_client.get_paginator("list_objects_v2")
for result in paginator.paginate(Bucket=self.bucket_name, Prefix=prefix):
objects = [{"Key": content["Key"]} for content in result.get("Contents", [])]
if objects:
self.s3_client.delete_objects(Bucket=self.bucket_name, Delete={"Objects": objects})
[docs]
def list_folder_contents(self, folder_path):
folder_contents = []
paginator = self.s3_client.get_paginator("list_objects_v2")
page_iterator = paginator.paginate(Bucket=self.bucket_name, Prefix=folder_path)
filtered_iterator = page_iterator.search(f"Contents[?Key !=null ] | [?contains(Key,'{folder_path}')]")
for result_dict in filtered_iterator:
if result_dict:
folder_path = result_dict['Key']
if folder_path.split("/")[-1]:
folder_contents.append(folder_path.split("/")[-1])
return folder_contents
[docs]
def main():
bucket_name = "premise-remote-publish-test"
s3_handler = S3Bucket(bucket_name, )
# Upload a folder to S3
folder_to_upload = "somepath"
s3_handler.upload_folder(folder_to_upload, "besttest1")
#
local_download_path = "some path"
s3_handler.download_folder("besttest1", local_download_path)
if __name__ == "__main__":
main()