Skip to content
Snippets Groups Projects
Commit adab3096 authored by marina.kiweler01's avatar marina.kiweler01
Browse files

logging, get_session, download_file

parent f7882dab
No related branches found
No related tags found
No related merge requests found
......@@ -9,72 +9,165 @@ import botocore
import hashlib
import logging
import os
import progressbar
import random
import sys
import time
from .Utils import Utils
from botocore.client import Config
class Loosolab_s3:
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
#--------------------------------------------------------------------------------------------------------#
def __init__(self, credentials, multipart_upload=True)
#--------------Init -------------------------------------------------------------------------------------#
def __init__(self, credentials, multipart_upload=True, silent=False, log_file=None):
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
self.my_logger = logging.getLogger('spasti')
if not log_file is None:
self.my_logger = logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', filename=log_file, level=logging.DEBUG)
# having a global session (needed for all boto action)
self.session = create_s3_session(credentials)
self.create_s3_session(credentials)
if not multipart_upload:
self.transfer = create_s3_transfer(credentials)
self.create_s3_transfer(credentials)
self.multipart_upload = multipart_upload
#--------------------------------------------------------------------------------------------------------#
#------------- Resource / Client & Transfer -------------------------------------------------------------#
def create_s3_session(self, credentials, resource=True):
""" Creating s3 session with boto3 - needed for any use of the s3 storage
Parameter:
----------
credentials : dictionary
contains : "signature", "key", "secret" and "endpoint"
resource : boolean
session as resource or as client
"""
# Store configuration once?
if resource:
session = boto3.resource(
's3',
config=Config(signature_version=credentials["s3signature"]),
aws_access_key_id=credentials["s3key"],
aws_secret_access_key=credentials["s3secret"],
endpoint_url=credentials["s3endpoint"]
)
else: # client:
session = boto3.client(
's3',
config=Config(signature_version=credentials["s3signature"]),
aws_access_key_id=credentials["s3key"],
aws_secret_access_key=credentials["s3secret"],
endpoint_url=credentials["s3endpoint"]
)
self.session = session
#self.check_s3_credentials() # list-bucket not allowed -> skip
#--------------------------------------------------------------------------------------------------------#
def create_s3_transfer(self, credentials):
""" Creating s3 transfer with boto3 - needed for upload on the s3 storage non_multipart
Parameter:
----------
credentials : dictionary
"""
try:
session = create_s3_session(credentials, resource=False)
myconfig = boto3.s3.transfer.TransferConfig(
multipart_threshold=9999999999999999, # workaround for 'disable' auto multipart upload
max_concurrency=10,
num_download_attempts=10,
)
transfer=boto3.s3.transfer.S3Transfer(session, myconfig)
self.my_logger.info('S3 transfer created!')
self.transfer = transfer
except Exception as e:
self.my_logger.error('S3 transfer could not be created!' + str(e))
#--------------------------------------------------------------------------------------------------------#
#--------- Getter ---------------------------------------------------------------------------------------#
def get_session(self):
return self.session
#--------------------------------------------------------------------------------------------------------#
def get_transfer(self):
if self.multipart_upload:
print("ERROR: Transfer is only created when multipart_upload = False !")
return
else:
return self.transfer
#--------------------------------------------------------------------------------------------------------#
#------------- Checks -----------------------------------------------------------------------------------#
def check_s3_credentials(self):
""" Checking if credentials are correct by calling 'list_bucket'
""" Checking if credentials are correct by calling 'buckets_all'
"""
try:
response = self.session.list_buckets()
logging.info('S3 session created!')
response = self.get_bucket_names()
self.my_logger.info('S3 session created!')
except Exception as e:
logging.error(str(e) + ' Used wrong credentials could not conect to S3!')
sys.exit(1)
self.my_logger.error(str(e) + ' Used wrong credentials could not conect to S3!')
#--------------------------------------------------------------------------------------------------------#
def check_s3_file_ex(self, bucket_name, file_name):
""" check if file on s3 storage exists in named bucket
Parameter:
----------
bucket_name : string
bucket name as string
file_name : string
Name of file on s3 storage
Returns:
-------
Boolean
does file exist?
def check_s3_bucket_ex(self, bucket_name):
""" check if bucket exists
Parameter:
----------
bucket_name : string
bucket name as string
Returns:
-------
Boolean
does bucket exist?
"""
try:
self.session.Object(bucket_name, file_name).load()
self.session.meta.client.head_bucket(Bucket=bucket_name)
except botocore.exceptions.ClientError as e:
if not e.response['Error']['Code'] == "404":
logging.error(str(e) + "Something went wrong checking for " + file_name " in " + bucket_name)
self.my_logger.error("Something went wrong checking for " + bucket_name + str(e))
return False
# no except -> the object does exist.
return True
#--------------------------------------------------------------------------------------------------------#
def check_s3_etag(self, bucket_name, file_name, local_file_name):
def check_s3_file_ex(self, bucket_name, file_name):
""" check if file on s3 storage exists in named bucket
Parameter:
----------
bucket_name : string
bucket name as string
file_name : string
Name of file on s3 storage
Returns:
-------
Boolean
does file exist?
"""
if self.check_s3_bucket_ex(bucket_name):
try:
self.session.Object(bucket_name, file_name).load()
except botocore.exceptions.ClientError as e:
if not e.response['Error']['Code'] == "404":
self.my_logger.error(str(e) + "Something went wrong checking for " + file_name + " in " + bucket_name)
return False
# no except -> the object does exist.
return True
else:
return False
#--------------------------------------------------------------------------------------------------------#
def compare_s3_etag(self, bucket_name, file_name, local_file_name):
""" compare local and s3 files
Parameter:
----------
bucket_name : string
bucket name as string
file_name : string
Name of file on s3 storage
local_file_name : string / path
Path to local file
Returns:
-------
modBool : Boolean
has file changed?
Parameter:
----------
bucket_name : string
bucket name as string
file_name : string
Name of file on s3 storage
local_file_name : string / path
Path to local file
Returns:
-------
modBool : Boolean
has file changed?
"""
try:
## MD5 Checksum for comparison -> Only works for non Multipart Upload!!
......@@ -85,31 +178,50 @@ class Loosolab_s3:
s3_e_tag = 0 # if object not jet exists
if '-' in s3_e_tag:
local_tag = etag_checksum(local_file_name)
local_tag = self.etag_checksum(local_file_name)
else:
local_tag = hashlib.md5(open(local_file_name).read().encode('utf-8')).hexdigest()
logging.info('local e-tag of ' + local_file_name +' is : ' + local_tag)
logging.info('s3 e-tag of ' + file_name +' is : ' + s3_e_tag)
self.my_logger.info('local e-tag of ' + local_file_name +' is : ' + local_tag)
self.my_logger.info('s3 e-tag of ' + file_name +' is : ' + s3_e_tag)
modBool = local_tag != s_3_e_tag
return modBool
except Exception as e:
logging.error(str(e) + "File comparison failed!" + file_name)
self.my_logger.error(str(e) + "File comparison failed!" + file_name)
#--------------------------------------------------------
def etag_checksum(file_name, chunk_size=8 * 1024 * 1024): # https://zihao.me/post/calculating-etag-for-aws-s3-objects/
#--------------------------------------------------------------------------------------------------------#
def confirm_bucket_name(self, bucket_name): # todo laenge automatisch anpassen?
""" checks if entered bucket name is valid and alters '_' or '.'.
Parameter:
------
bucket_name : string
Returns:
--------
bucket_name : string
"""
if ('_' or '.') in bucket_name: #bucket must not contain '_' or '.'
bucket_name = bucket_name.replace('_','-').replace('.','')
self.my_logger.warning('There are not supported characters in your bucket name (\".\" or \"_\") they are replaced. New name is: ' + bucket_name)
name_len = len(bucket_name)
if name_len > 63 or name_len < 3:#bucket name length must be between 63 and 3
self.my_logger.error('The bucket name must consist of 3 to 63 characters, the entered name has a length of '+ str(name_len))
return bucket_name
#--------------------------------------------------------------------------------------------------------#
#------------- Non-S3 Utils -----------------------------------------------------------------------------#
def etag_checksum(self, file_name, chunk_size=8 * 1024 * 1024): # https://zihao.me/post/calculating-etag-for-aws-s3-objects/
""" calculates the etag for Multi-uploaded files
Parameter:
----------
file_name : string
path to local file
chunk_size : int
size of upload chunks (8MB as default)
Returns:
--------
Parameter:
----------
file_name : string
path to local file
chunk_size : int
size of upload chunks (8MB as default)
Returns:
--------
Recalculated e-Tag of local file
"""
md5s = []
......@@ -119,292 +231,248 @@ class Loosolab_s3:
m = hashlib.md5("".join(md5s))
return '{}-{}'.format(m.hexdigest(), len(md5s))
#--------------------------------------------------------------------------------------------------------#
def confirm_bucket_name(bucket_name): # todo laenge automatisch anpassen?
""" checks if entered bucket name is valid and alters '_' or '.'.
Params:
------
bucket_name : string
Returns:
--------
bucket_name : string
#------------------------------------------
def _create_random(charamount):
""" create random string
Parameter:
----------
charamount : int
lenght of random string
Returns:
--------
string
"""
if ('_' or '.') in bucket: #bucket could not contain '_' or '.'
bucket = bucket.replace('_','-').replace('.','')
logging.warning('WARNING There are not supported characters in your bucket name (\".\" or \"_\") they are replaced. New name is: ' + bucket)
if len(bucket) > 63 or len(bucket) < 3:#bucket name length must be between 63 and 3
logging.error(' ERROR The bucket name must consist of 3 to 63 characters, the entered name has a length of '+ str(len(bucket)))
sys.exit(1)
return bucket
empty_string = ''
random_str = empty_string.join(random.choice(string.ascii_lowercase + string.digits) for char in range(charamount))
return(random_str)
#--------------------------------------------------------------------------------------------------------#
#------------- Resource / Client & Transfer -------------------------------------------------------------#
def create_s3_session(self, credentials, resource=False):
""" Creating s3 session with boto3 - needed for any use of the s3 storage
Parameter:
----------
credentials : dictionary
contains : "signature", "key", "secret" and "endpoint"
resource : boolean
session as resource or as client
Returns: s3
client or resource object
"""
s3_dic = get_s3_credentials(mamplan)
session = boto3.session.Session()
configuration = ['s3',
config=Config(signature_version=credentials["signature"]),
aws_access_key_id=credentials["key"],
aws_secret_access_key=credentials["secret"],
endpoint_url=credentials["endpoint"]
]
if resource:
self.s3 = session.resource(configuration)
else: # client:
self.s3 = session.client(configuration)
#self.check_s3_credentials() # list-bucket not allowed -> skip
return self.s3
#---------------------------------------------
def create_s3_transfer(self, credentials):
""" Creating s3 transfer with boto3 - needed for upload on the s3 storage non_multipart
Parameter:
----------
credentials : dictionary
Returns:
--------
transfer
#------------- Bucket Management ------------------------------------------------------------------------#
def get_bucket_names(self):
""" Get a list of all buckets
Returns:
--------
bucket_name_list : list
contains names of all buckets
"""
bucket_name_list = []
try:
session = create_s3_session(credentials, resource=False)
myconfig = boto3.s3.transfer.TransferConfig(
multipart_threshold=9999999999999999, # workaround for 'disable' auto multipart upload
max_concurrency=10,
num_download_attempts=10,
)
transfer=boto3.s3.transfer.S3Transfer(session, myconfig)
logging.info('S3 transfer created!')
return transfer
except Exception as e:
logging.error(str(e) + ' S3 transfer could not be created!')
sys.exit(1)
#--------------------------------------------------------------------------------------------------------#
#-------------- get by pattern --------------------------------------------------------------------------#
def get_s3_files_by_pattern(self, pattern, bucket_name):
""" return list of files with matching pattern from bucket
Parameter:
----------
bucket_name : string
pattern : string
Return:
-------
list with matches
"""
try:#list all objects of a bucket
object_list = self.session.list_objects_v2(Bucket=bucket_name, )
except Exception as e:
logging.error('ERROR listing objects of Bucket: %s' % bucket + str(e))
match_list = [obj['Key'] for obj in object_list['Contents'] if pattern in obj['Key']]
logging.info('Found the following files: ' + str(match_list))
if len(match_list) == 0:
logging.error('No matching files for {0} were found in bucket {1}.'.format(pattern,bucket))
for bucket in self.session.buckets.all():
bucket_name_list.append(bucket.name)
return bucket_name_list
except Exception as e:
self.my_logger.error('Buckets could not be listed. ' + str(e))
return match_list
#--------------------------------------------------------------------------------------------------------#
def get_s3_buckets_by_pattern(self, pattern)
def get_s3_buckets_by_pattern(self, pattern):
""" return list of buckets with matching pattern
Parameter:
----------
pattern : string
Parameter:
----------
pattern : string
Return:
-------
Return:
-------
list with matches
"""
try:
bucket_list = self.session.list_buckets() #list all buckets
except Exception as e:
logging.error('ERROR listing all Buckets' + str(e))
bucket_list = self.get_bucket_names() #list all buckets
match_list = [bucket_name['Name'] for bucket_name in bucket_list['Buckets'] if pattern in bucket_name]
logging.info('Found the following buckets: ' + str(match_list))
match_list = [bucket_name for bucket_name in bucket_list if pattern in bucket_name]
self.my_logger.info('Found the following buckets: ' + str(match_list))
if len(match_list) == 0:
logging.error('No matching buckets for {0} were found.'.format(pattern))
self.my_logger.error('No matching buckets for {0} were found.'.format(pattern))
return match_list
#--------------------------------------------------------------------------------------------------------#
#------------- Bucket Management ------------------------------------------------------------------------#
def create_s3_bucket(self, bucket_name, name_addition=True)
#--------------------------------------------------------------------------------------------------------#
def create_s3_bucket(self, bucket_name, name_addition=True):
"""Creating an s3 bucket.
Parameter:
----------
bucket_name : string
name_addition : Boolean
if bucket name should be altered when occupied
Returns:
--------
string bucket_name
Parameter:
----------
bucket_name : string
name_addition : Boolean
if bucket name should be altered when occupied
Returns:
--------
bucket_name : string
"""
bucket_name = confirm_bucket_name(bucket_name)
bucket_name = self.confirm_bucket_name(bucket_name)
name_occupied = True
i = 3 # counter to exit the loop if 3 name aditions do not work
while name_occupied:
try:
bucket = session.create_bucket(Bucket=bucket_name)
bucket = self.session.create_bucket(Bucket=bucket_name)
name_occupied = False
logging.info('Bucket ' + bucket_name +' was created!')
self.my_logger.info('Bucket ' + bucket_name +' was created!')
# If bucket already exist but is not owned by you, add random string:
except session.meta.client.exceptions.BucketAlreadyExists as e:
except self.session.meta.client.exceptions.BucketAlreadyExists as e:
if (name_addition and i > 0 and len(bucket) < 53): # 3 versuche, max length
addition = create_random(10)
addition = self._create_random(10)
bucket_name = bucket_name + "_" + addition
i = i-1
else:
logging.error(str(e) + ' Bucket name is already occupied by another user!')
self.my_logger.error(str(e) + ' Bucket name is already occupied by another user!')
return
# If bucket is already owned by you go on as if it was created
except session.meta.client.exceptions.BucketAlreadyOwnedByYou as e:
bucket = session.Bucket(bucket_name)
logging.info('Bucket ' + bucket_name +' is already owned by you!')
except self.session.meta.client.exceptions.BucketAlreadyOwnedByYou as e:
bucket = self.session.Bucket(bucket_name)
self.my_logger.info('Bucket ' + bucket_name +' is already owned by you!')
name_occupied = False
except Exception as e:
logging.error(str(e) + ' Bucket could not be created!')
self.my_logger.error(str(e) + ' Bucket could not be created!')
return
return bucket_name
#--------------------------------------------------------------------------------------------------------#
def delete_s3_bucket(self, bucket_name, only_delete_objects = False):
""" Delete and emptie bucket
Parameter:
----------
bucket_name : string
only_delete_objects : Bool
if True bucket will be emptie but not deleted
Parameter:
----------
bucket_name : string
only_delete_objects : Bool
if True bucket will be emptie but not deleted
"""
try:
bucket = self.session.Bucket(bucket_name)
except Exception as e:
logging.error('ERROR Bucket does not exist.' + str(e))
self.my_logger.error('Bucket does not exist.' + str(e))
return
try:
bucket.objects.all().delete()
if only_delete_objects:
logging.info("Bucket " + bucket_name + " emptied, NOT deleted.")
self.my_logger.info("Bucket " + bucket_name + " emptied, NOT deleted.")
return
except Exception as e: # Was ist wenn keine Objects da sind?
logging.error('ERROR Bucket Objects could not be deleted.' + str(e))
except Exception as e: # todo Was ist wenn keine Objects da sind?
self.my_logger.error('Bucket Objects could not be deleted.' + str(e))
return
try:
bucket.delete()
logging.info("Bucket " + bucket_name + " deleted.")
self.my_logger.info("Bucket " + bucket_name + " deleted.")
except Exception as e:
logging.error('ERROR Bucket could not be deleted.' + str(e))
self.my_logger.error('Bucket could not be deleted.' + str(e))
#--------------------------------------------------------------------------------------------------------#
def delete_s3_buckets(self, bucket_list, only_delete_objects = False):
""" Delete and emptie multiple buckets
Parameter:
----------
bucket_list : list of bucket names
only_delete_objects : Bool
if True buckets will be emptied but not deleted
Parameter:
----------
bucket_list : list of bucket names
only_delete_objects : Bool
if True buckets will be emptied but not deleted
"""
try:
for bucket_name in bucket_list:
delete_s3_bucket(bucket_name, only_delete_objects)
self.delete_s3_bucket(bucket_name, only_delete_objects)
except Exception as e:
logging.error('ERROR Problem with list of Bucket names.' + str(e))
self.my_logger.error('Problem with list of Bucket names.' + str(e))
#--------------------------------------------------------------------------------------------------------#
#-------------- File Management -------------------------------------------------------------------------#
def upload_s3_files(self, bucket_name, file_list):
"""Creating an s3 bucket.
def get_object_names(self, bucket_name):
""" Get a list of all objects in bucket
Parameter:
----------
bucket_name : string
Returns:
--------
object_name_list : list
contains names of all objects in bucket
"""
object_name_list = []
try:
bucket = self.session.Bucket(bucket_name)
except Exception as e:
self.my_logger.error('Bucket does not exist. ' + str(e))
return
try:
for obj in bucket.objects.all():
object_name_list.append(obj.key)
return object_name_list
except Exception as e:
self.my_logger.error('Objects in Bucket ' + bucket_name + 'could not be listed! ' + str(e))
Parameter:
----------
bucket_name : string
file_list : list
list of local filepaths
#--------------------------------------------------------------------------------------------------------#
def get_s3_files_by_pattern(self, pattern, bucket_name):
""" return list of files with matching pattern from bucket
Parameter:
----------
bucket_name : string
pattern : string
Returns:
--------
Return:
-------
list with matches
"""
object_list = self.get_object_names(bucket_name)
match_list = [obj for obj in object_list if pattern in obj]
self.my_logger.info('Found the following files: ' + str(match_list))
if len(match_list) == 0:
self.my_logger.error('No matching files for {0} were found in bucket {1}.'.format(pattern,bucket))
return match_list
#--------------------------------------------------------------------------------------------------------#
def upload_s3_files(self, bucket_name, file_list):
"""Creating an s3 bucket.
Parameter:
----------
bucket_name : string
file_list : list
list of local filepaths
Returns:
--------
Boolean
"""
for local_file_name in file_list:
try:
file_name = os.path.basename(local_file_name)
modBool = check_s3_etag(bucket_name, file_name, local_file_name)
modBool = self.compare_s3_etag(bucket_name, file_name, local_file_name)
if modBool:
if self.multipart_upload:
self.session.Bucket(bucket_name).upload_file(local_file_name, file_name)
else:
self.transfer.upload_file(local_file_name, bucket_name, file_name)
#self.session.Bucket(bucket_name).upload_file(local_file_name, file_name, Config=self.transfer)
except Exception as e:
logging.error(str(e) + "S3: Uploading files failed!")
self.my_logger.error("S3: Uploading files failed!" + str(e))
return
self.my_logger.info("All files are uploaded!" + file_list)
return True
#--------------------------------------------------------------------------------------------------------#
def download_s3_files(self, bucket_name, download_list, waiting=False): # todo Speicherort festlegen @Philipp : Was ist das mit dem waiting?
def download_s3_files(self, bucket_name, file_list, destination='.', timeout=10):
""" Download files from bucket
Parameter:
----------
bucket_name : string
download_list : list
Files to download
waiting : boolean
should be waited until files are ready for download?
"""
self.session.head_bucket(bucket_name)
logging.info('Bucket ' + bucket_name + ' exists, starting dowload...')
for file_path in download_list:
file_name = os.basename(file_path)
Parameter:
----------
bucket_name : string
file_list : list
Files to download
destination : string
timeout : how long to watch for file
"""
bucket = self.session.Bucket(bucket_name)
# check if exists
for local_file in file_list:
downloaded = False
file_name = os.path.basename(local_file)
file_path = os.path.join(destination, local_file)
# check once a second if file is present, check for x minutes (->*60)
total_checks = int(60 * timeout)
for checks in range(total_checks -1):
if self.check_s3_file_ex(bucket_name, file_name):
#if waiting is True -> the downloader waits until another process has uploaded the file to the bucket
if waiting:
while True:
try:
self.session.head_object(Bucket=bucket_name, Key=file_name)
logging.info("File is ready to start the download: "+ file_name)
break
except botocore.exceptions.ClientError as e: # todo hier noch andere fehler abfangen?
self.waiting_points("Waiting for file: " + file_name)
while True: #check if file is ready for download
try:
self.session.Bucket(bucket_name).download_file(file_name, file_name)
except Exception as e:
logging.error('ERROR Could not download file.\n' + str(e))
sys.exit(1)
#it possible that the file is avalible to download but has not been writen to the disk of the s3 then the downloaded file has the size of 0
if (os.path.getsize(file_name) != 0):
break
self.waiting_points('Waiting for file: '+ str(self.session.head_object(Bucket=bucket, Key=file_name)['ContentLength']) + str(os.path.getsize(file_path), file_path))
logging.info(file_name + " Has been downloaded.")
else:
logging.error('ERROR File didn\'t exist.: ' + file_name)
else:
logging.error('ERROR File or Bucket didn\'t exist.')
sys.exit(1)
\ No newline at end of file
if multipart_upload:
bucket.download_file(file_name, file_path)
else:
bucket.download_file(file_name, file_path, Config=self.transfer)
downloaded = True
break
time.sleep(1)
if not downloaded:
self.my_logger.error("Could not download file " + file_name)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment