Skip to content
Snippets Groups Projects
Commit ff428a3a authored by marina.kiweler01's avatar marina.kiweler01
Browse files

package

parent b6d8c1be
No related branches found
No related tags found
No related merge requests found
File added
File added
#!/usr/bin/env python
"""
Class containing S3 functions based on boto3
Initialize object of class with Parameter:
credentials : dictionary containing "signature", "key", "secret" and "endpoint"
"""
import boto3
import botocore
import hashlib
import logging
import os
import random
import re
import sys
import time
from botocore.client import Config
class Loosolab_s3:
#--------------Init -------------------------------------------------------------------------------------#
def __init__(self, credentials, multipart_upload=True, log_file=None):
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
if not log_file is None:
self.my_logger = logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', filename=log_file, level=logging.DEBUG)
self.my_logger = logging.getLogger('Loosolab_s3_logger')
# having a global session (needed for all boto action)
self.create_s3_session(credentials)
if not multipart_upload:
self.create_s3_transfer(credentials)
self.multipart_upload = multipart_upload
#--------------------------------------------------------------------------------------------------------#
#------------- Resource / Client & Transfer -------------------------------------------------------------#
def create_s3_session(self, credentials, resource=True):
""" Creating s3 session with boto3 - needed for any use of the s3 storage
Parameter:
----------
credentials : dictionary
contains : "signature", "key", "secret" and "endpoint"
resource : boolean
session as resource or as client
"""
# Default value for signature
if not 'signature' in credentials:
credentials['signature'] = 's3'
# Store configuration once?
if resource:
session = boto3.resource(
's3',
config=Config(signature_version=credentials["signature"]),
aws_access_key_id=credentials["key"],
aws_secret_access_key=credentials["secret"],
endpoint_url=credentials["endpoint"]
)
else: # client:
session = boto3.client(
's3',
config=Config(signature_version=credentials["signature"]),
aws_access_key_id=credentials["key"],
aws_secret_access_key=credentials["secret"],
endpoint_url=credentials["endpoint"]
)
self.session = session
self.check_s3_credentials()
#--------------------------------------------------------------------------------------------------------#
def create_s3_transfer(self, credentials):
""" Creating s3 transfer with boto3 - needed for upload on the s3 storage non_multipart
Parameter:
----------
credentials : dictionary
"""
try:
session = create_s3_session(credentials, resource=False)
myconfig = boto3.s3.transfer.TransferConfig(
multipart_threshold=9999999999999999, # workaround for 'disable' auto multipart upload
max_concurrency=10,
num_download_attempts=10,
)
transfer=boto3.s3.transfer.S3Transfer(session, myconfig)
self.my_logger.info('S3 transfer created!')
self.transfer = transfer
except Exception as e:
self.my_logger.error('S3 transfer could not be created!' + str(e))
#--------------------------------------------------------------------------------------------------------#
#--------- Getter ---------------------------------------------------------------------------------------#
def get_session(self):
return self.session
#--------------------------------------------------------------------------------------------------------#
def get_transfer(self):
if self.multipart_upload:
print("ERROR: Transfer is only created when multipart_upload = False !")
return
else:
return self.transfer
#--------------------------------------------------------------------------------------------------------#
#------------- Checks -----------------------------------------------------------------------------------#
def check_s3_credentials(self):
""" Checking if credentials are correct by calling 'buckets.all'
"""
try:
response = self.get_bucket_names()
self.my_logger.info('S3 session created!')
except Exception as e:
self.my_logger.error(str(e) + ' Used wrong credentials could not conect to S3!')
#--------------------------------------------------------------------------------------------------------#
def check_s3_bucket_ex(self, bucket_name):
""" check if bucket exists
Parameter:
----------
bucket_name : string
bucket name as string
Returns:
-------
Boolean
does bucket exist?
"""
try:
self.session.meta.client.head_bucket(Bucket=bucket_name)
except botocore.exceptions.ClientError as e:
if not e.response['Error']['Code'] == "404":
self.my_logger.error("Something went wrong checking for " + bucket_name + " " + str(e))
return False
return True
#--------------------------------------------------------------------------------------------------------#
def check_s3_object_ex(self, bucket_name, file_name):
""" check if file on s3 storage exists in named bucket
Parameter:
----------
bucket_name : string
bucket name as string
file_name : string
Name of file on s3 storage
Returns:
-------
Boolean
does file exist?
"""
if self.check_s3_bucket_ex(bucket_name):
try:
self.session.Object(bucket_name, file_name).load()
except botocore.exceptions.ClientError as e:
if not e.response['Error']['Code'] == "404":
self.my_logger.error(str(e) + "Something went wrong checking for " + file_name + " in " + bucket_name)
return False
# no except -> the object does exist.
return True
else:
return False
#--------------------------------------------------------------------------------------------------------#
def compare_s3_etag(self, bucket_name, file_name, local_file_name):
""" compare local and s3 files
Parameter:
----------
bucket_name : string
bucket name as string
file_name : string
Name of file on s3 storage
local_file_name : string / path
Path to local file
Returns:
-------
modBool : Boolean
has file changed?
"""
try:
s3_object = self.session.Object(bucket_name, file_name)
try:
s3_e_tag = s3_object.e_tag[1:-1] # etag without quotes
except: # object does not exist
return False
if '-' in s3_e_tag:
local_tag = self.etag_checksum(local_file_name)
else:
print("easy")
local_tag = hashlib.md5(open(local_file_name).read().encode('utf-8')).hexdigest()
self.my_logger.info('local e-tag of ' + local_file_name +' is : ' + local_tag)
self.my_logger.info('s3 e-tag of ' + file_name +' is : ' + s3_e_tag)
modBool = local_tag == s3_e_tag
print(modBool)
if modBool:
self.my_logger.info("Files are not changed!" + str(file_name))
return modBool
except Exception as e:
return False
#--------------------------------------------------------------------------------------------------------#
def confirm_bucket_name(self, bucket_name): # todo laenge automatisch anpassen?
""" checks if entered bucket name is valid and alters '_' or '.'.
Parameter:
------
bucket_name : string
Returns:
--------
bucket_name : string
"""
bucket_name = bucket_name.lower() #only lowercase is allowed
if ('_' or '.') in bucket_name: #bucket must not contain '_' or '.'
bucket_name = bucket_name.replace('_','-').replace('.','')
self.my_logger.warning('There are not supported characters in your bucket name (\".\" or \"_\") they are replaced. New name is: ' + bucket_name)
name_len = len(bucket_name)
if name_len > 63 or name_len < 3:#bucket name length must be between 63 and 3
self.my_logger.error('The bucket name must consist of 3 to 63 characters, the entered name has a length of '+ str(name_len))
return bucket_name
#--------------------------------------------------------------------------------------------------------#
#------------- Non-S3 Utils -----------------------------------------------------------------------------#
def etag_checksum(self, file_name, chunk_size=8 * 1024 * 1024): # https://zihao.me/post/calculating-etag-for-aws-s3-objects/
""" calculates the etag for Multi-uploaded files
Parameter:
----------
file_name : string
path to local file
chunk_size : int
size of upload chunks (8MB as default)
Returns:
--------
Recalculated e-Tag of local file
"""
# zu langsam!
md5s = []
with open(file_name, 'rb') as f:
for data in iter(lambda: f.read(chunk_size), b''):
md5s.append(hashlib.md5(data).digest())
m = hashlib.md5(b"".join(md5s))
return '{}-{}'.format(m.hexdigest(), len(md5s))
#--------------------------------------------------------------------------------------------------------#
def _create_random(charamount):
""" create random string
Parameter:
----------
charamount : int
lenght of random string
Returns:
--------
string
"""
empty_string = ''
random_str = empty_string.join(random.choice(string.ascii_lowercase + string.digits) for char in range(charamount))
return(random_str)
#--------------------------------------------------------------------------------------------------------#
#------------- Bucket Management ------------------------------------------------------------------------#
def get_bucket_names(self, pattern=""):
""" Get a list of all buckets
Parameter:
----------
pattern : string
Returns:
--------
bucket_name_list : list
contains names of all buckets
"""
bucket_name_list = []
try:
for bucket in self.session.buckets.all():
bucket_name_list.append(bucket.name)
if not pattern: # retun ALL bucket names
return bucket_name_list
else: # with pattern matching
match_list = [bucket_name for bucket_name in bucket_name_list if re.search(pattern, bucket_name)]
self.my_logger.info('Found the following buckets: ' + str(match_list))
if len(match_list) == 0:
self.my_logger.error('No matching buckets for {0} were found.'.format(pattern))
return match_list
except Exception as e:
self.my_logger.error('Buckets could not be listed. ' + str(e))
#--------------------------------------------------------------------------------------------------------#
def create_s3_bucket(self, bucket_name, name_addition=True):
"""Creating an s3 bucket.
Parameter:
----------
bucket_name : string
name_addition : Boolean
if bucket name should be altered when occupied
Returns:
--------
bucket_name : string
"""
bucket_name = self.confirm_bucket_name(bucket_name)
name_occupied = True
i = 3 # counter to exit the loop if 3 name aditions do not work
while name_occupied:
try:
bucket = self.session.create_bucket(Bucket=bucket_name)
name_occupied = False
self.my_logger.info('Bucket ' + bucket_name +' was created!')
# If bucket already exist but is not owned by you, add random string:
except self.session.meta.client.exceptions.BucketAlreadyExists as e:
if (name_addition and i > 0 and len(bucket) < 53): # 3 versuche, max length
addition = self._create_random(10)
bucket_name = bucket_name + "_" + addition
i = i-1
else:
self.my_logger.error(str(e) + ' Bucket name is already occupied by another user!')
return
# If bucket is already owned by you go on as if it was created
except self.session.meta.client.exceptions.BucketAlreadyOwnedByYou as e:
bucket = self.session.Bucket(bucket_name)
self.my_logger.info('Bucket ' + bucket_name +' is already owned by you!')
name_occupied = False
except Exception as e:
self.my_logger.error(str(e) + ' Bucket could not be created!')
return
return bucket_name
#--------------------------------------------------------------------------------------------------------#
def emptie_s3_buckets(self, bucket_names, delete_bucket = False):
""" Delete and emptie bucket
Parameter:
----------
bucket_names : string or list of bucket names
delete_bucket : Bool
if True bucket will be deleted
"""
if isinstance(bucket_names, str):
bucket_list = [bucket_names]
else:
bucket_list = bucket_names
for bucket_name in bucket_list:
try:
bucket = self.session.Bucket(bucket_name)
except Exception as e:
self.my_logger.error('Bucket does not exist.' + str(e))
continue
try:
bucket.objects.all().delete()
if not delete_bucket:
self.my_logger.info("Bucket " + bucket_name + " emptied.")
continue
except Exception as e: # todo Was ist wenn keine Objects da sind?
self.my_logger.error('Bucket Objects could not be deleted.' + str(e))
continue
try:
bucket.delete()
self.my_logger.info("Bucket " + bucket_name + " deleted.")
except Exception as e:
self.my_logger.error('Bucket could not be deleted.' + str(e))
#--------------------------------------------------------------------------------------------------------#
#-------------- File Management -------------------------------------------------------------------------#
def get_object_names(self, bucket_name, pattern=""):
""" Get a list of all objects in bucket
Parameter:
----------
bucket_name : string
pattern : string
Returns:
--------
object_name_list : list
contains names of all objects in bucket
"""
object_name_list = []
if not self.check_s3_bucket_ex(bucket_name):
self.my_logger.error('Bucket does not exist!')
return []
try:
bucket = self.session.Bucket(bucket_name)
except Exception as e:
self.my_logger.error('Problem calling bucket. ' + str(e))
return
try:
for obj in bucket.objects.all():
object_name_list.append(obj.key)
if not pattern:
return object_name_list
else:
match_list = [obj for obj in object_name_list if re.search(pattern, obj)]
self.my_logger.info('Found the following files: ' + str(match_list))
if len(match_list) == 0:
self.my_logger.error('No matching files for {0} were found in bucket {1}.'.format(pattern,bucket_name))
return match_list
except Exception as e:
self.my_logger.error('Objects in Bucket ' + bucket_name + 'could not be listed! ' + str(e))
#--------------------------------------------------------------------------------------------------------#
def upload_s3_objects(self, bucket_name, file_list, compare=True):
"""Creating an s3 bucket.
Parameter:
----------
bucket_name : string
file_list : list
list of local filepaths
compare : boolean
should local and s3 file be compared?
Returns:
--------
Boolean
"""
modBool = False
if not self.check_s3_bucket_ex(bucket_name):
self.my_logger.error("Bucket for upload does not exist!" + str(e))
return
for local_file_name in file_list:
try:
file_name = os.path.basename(local_file_name)
if compare:
modBool = self.compare_s3_etag(bucket_name, file_name, local_file_name)
if not modBool:
if self.multipart_upload:
self.session.Bucket(bucket_name).upload_file(local_file_name, file_name)
else:
self.transfer.upload_file(local_file_name, bucket_name, file_name)
#self.session.Bucket(bucket_name).upload_file(local_file_name, file_name, Config=self.transfer)
self.my_logger.info("All files are uploaded!" + str(file_list))
except Exception as e:
self.my_logger.error("S3: Uploading files failed!" + str(e))
return
return True
#--------------------------------------------------------------------------------------------------------#
def download_s3_objects(self, bucket_name, file_list, destination='.', compare=True):
""" Download files from bucket
Parameter:
----------
bucket_name : string
file_list : list
Files to download
destination : string
timeout : float
how long to watch for file in minutes
"""
bucket = self.session.Bucket(bucket_name)
modBool = False
# check if exists
try:
for local_file in file_list:
file_name = os.path.basename(local_file)
file_path = os.path.join(destination, local_file)
if compare:
modBool = self.compare_s3_etag(bucket_name, file_name, file_path)
if self.check_s3_object_ex(bucket_name, file_name) and not modBool:
if not os.path.exists(file_path):
os.makedirs(os.path.dirname(file_path))
self.my_logger.info('Created directory: ' + os.path.dirname(file_path))
if self.multipart_upload:
bucket.download_file(file_name, file_path)
else:
bucket.download_file(file_name, file_path, Config=self.transfer)
except Exception as e:
self.my_logger.error(str(e) + "Could not download file " + file_name)
\ No newline at end of file
File added
#!/usr/bin/env python
"""
Class containing S3 functions based on boto3
Initialize object of class with Parameter:
credentials : dictionary containing "signature", "key", "secret" and "endpoint"
"""
import boto3
import botocore
import hashlib
import logging
import os
import random
import re
import sys
import time
from botocore.client import Config
class Loosolab_s3:
# loger level deactivate and exceptions
#--------------Init -------------------------------------------------------------------------------------#
def __init__(self, credentials, multipart_upload=True, log_level=):
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
if not log_file is None:
self.my_logger = logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', filename=log_file, level=logging.DEBUG)
self.my_logger = logging.getLogger('Loosolab_s3_logger')
# having a global session (needed for all boto action)
self.create_s3_session(credentials)
if not multipart_upload:
self.create_s3_transfer(credentials)
self.multipart_upload = multipart_upload
#--------------------------------------------------------------------------------------------------------#
#------------- Resource / Client & Transfer -------------------------------------------------------------#
def create_s3_session(self, credentials, resource=True):
""" Creating s3 session with boto3 - needed for any use of the s3 storage
Parameter:
----------
credentials : dictionary
contains : "signature", "key", "secret" and "endpoint"
resource : boolean
session as resource or as client
"""
# Default value for signature
if not 'signature' in credentials:
credentials['signature'] = 's3'
# Store configuration once?
if resource:
session = boto3.resource(
's3',
config=Config(signature_version=credentials["signature"]),
aws_access_key_id=credentials["key"],
aws_secret_access_key=credentials["secret"],
endpoint_url=credentials["endpoint"]
)
else: # client:
session = boto3.client(
's3',
config=Config(signature_version=credentials["signature"]),
aws_access_key_id=credentials["key"],
aws_secret_access_key=credentials["secret"],
endpoint_url=credentials["endpoint"]
)
self.session = session
self.check_s3_credentials()
#--------------------------------------------------------------------------------------------------------#
def create_s3_transfer(self, credentials):
""" Creating s3 transfer with boto3 - needed for upload on the s3 storage non_multipart
Parameter:
----------
credentials : dictionary
"""
try:
session = create_s3_session(credentials, resource=False)
myconfig = boto3.s3.transfer.TransferConfig(
multipart_threshold=9999999999999999, # workaround for 'disable' auto multipart upload
max_concurrency=10,
num_download_attempts=10,
)
transfer=boto3.s3.transfer.S3Transfer(session, myconfig)
self.my_logger.info('S3 transfer created!')
self.transfer = transfer
except Exception as e:
self.my_logger.error('S3 transfer could not be created!' + str(e))
#--------------------------------------------------------------------------------------------------------#
#--------- Getter ---------------------------------------------------------------------------------------#
def get_session(self):
return self.session
#--------------------------------------------------------------------------------------------------------#
def get_transfer(self):
if self.multipart_upload:
print("ERROR: Transfer is only created when multipart_upload = False !")
return
else:
return self.transfer
#--------------------------------------------------------------------------------------------------------#
#------------- Checks -----------------------------------------------------------------------------------#
def check_s3_credentials(self):
""" Checking if credentials are correct by calling 'buckets.all'
"""
try:
response = self.get_bucket_names()
self.my_logger.info('S3 session created!')
except Exception as e:
self.my_logger.error(str(e) + ' Used wrong credentials could not conect to S3!')
#--------------------------------------------------------------------------------------------------------#
def check_s3_bucket_ex(self, bucket_name):
""" check if bucket exists
Parameter:
----------
bucket_name : string
bucket name as string
Returns:
-------
Boolean
does bucket exist?
"""
try:
self.session.meta.client.head_bucket(Bucket=bucket_name)
except botocore.exceptions.ClientError as e:
if not e.response['Error']['Code'] == "404":
self.my_logger.error("Something went wrong checking for " + bucket_name + " " + str(e))
return False
return True
#--------------------------------------------------------------------------------------------------------#
def check_s3_object_ex(self, bucket_name, file_name):
""" check if file on s3 storage exists in named bucket
Parameter:
----------
bucket_name : string
bucket name as string
file_name : string
Name of file on s3 storage
Returns:
-------
Boolean
does file exist?
"""
if self.check_s3_bucket_ex(bucket_name):
try:
self.session.Object(bucket_name, file_name).load()
except botocore.exceptions.ClientError as e:
if not e.response['Error']['Code'] == "404":
self.my_logger.error(str(e) + "Something went wrong checking for " + file_name + " in " + bucket_name)
return False
# no except -> the object does exist.
return True
else:
return False
#--------------------------------------------------------------------------------------------------------#
def compare_s3_etag(self, bucket_name, file_name, local_file_name):
""" compare local and s3 files
Parameter:
----------
bucket_name : string
bucket name as string
file_name : string
Name of file on s3 storage
local_file_name : string / path
Path to local file
Returns:
-------
modBool : Boolean
has file changed?
"""
try:
s3_object = self.session.Object(bucket_name, file_name)
try:
s3_e_tag = s3_object.e_tag[1:-1] # etag without quotes
except: # object does not exist
return False
if '-' in s3_e_tag:
local_tag = self.etag_checksum(local_file_name)
else:
local_tag = hashlib.md5(open(local_file_name).read().encode('utf-8')).hexdigest()
self.my_logger.info('local e-tag of ' + local_file_name +' is : ' + local_tag)
self.my_logger.info('s3 e-tag of ' + file_name +' is : ' + s3_e_tag)
modBool = local_tag == s3_e_tag
print(modBool)
if modBool:
self.my_logger.info("Files are not changed!" + str(file_name))
return modBool
except Exception as e:
return False
#--------------------------------------------------------------------------------------------------------#
def confirm_bucket_name(self, bucket_name): # todo laenge automatisch anpassen?
""" checks if entered bucket name is valid and alters '_' or '.'.
Parameter:
------
bucket_name : string
Returns:
--------
bucket_name : string
"""
bucket_name = bucket_name.lower() #only lowercase is allowed
if ('_' or '.') in bucket_name: #bucket must not contain '_' or '.'
bucket_name = bucket_name.replace('_','-').replace('.','')
self.my_logger.warning('There are not supported characters in your bucket name (\".\" or \"_\") they are replaced. New name is: ' + bucket_name)
name_len = len(bucket_name)
if name_len > 63 or name_len < 3:#bucket name length must be between 63 and 3
self.my_logger.error('The bucket name must consist of 3 to 63 characters, the entered name has a length of '+ str(name_len))
return bucket_name
#--------------------------------------------------------------------------------------------------------#
#------------- Non-S3 Utils -----------------------------------------------------------------------------#
def etag_checksum(self, file_name, chunk_size=8 * 1024 * 1024): # https://zihao.me/post/calculating-etag-for-aws-s3-objects/
""" calculates the etag for Multi-uploaded files
Parameter:
----------
file_name : string
path to local file
chunk_size : int
size of upload chunks (8MB as default)
Returns:
--------
Recalculated e-Tag of local file
"""
# zu langsam!
md5s = []
with open(file_name, 'rb') as f:
for data in iter(lambda: f.read(chunk_size), b''):
md5s.append(hashlib.md5(data).digest())
m = hashlib.md5(b"".join(md5s))
return '{}-{}'.format(m.hexdigest(), len(md5s))
#--------------------------------------------------------------------------------------------------------#
def _create_random(charamount):
""" create random string
Parameter:
----------
charamount : int
lenght of random string
Returns:
--------
string
"""
empty_string = ''
random_str = empty_string.join(random.choice(string.ascii_lowercase + string.digits) for char in range(charamount))
return(random_str)
#--------------------------------------------------------------------------------------------------------#
#------------- Bucket Management ------------------------------------------------------------------------#
def get_bucket_names(self, pattern=""):
""" Get a list of all buckets
Parameter:
----------
pattern : string
Returns:
--------
bucket_name_list : list
contains names of all buckets
"""
bucket_name_list = []
try:
for bucket in self.session.buckets.all():
bucket_name_list.append(bucket.name)
if not pattern: # retun ALL bucket names
return bucket_name_list
else: # with pattern matching
match_list = [bucket_name for bucket_name in bucket_name_list if re.search(pattern, bucket_name)]
self.my_logger.info('Found the following buckets: ' + str(match_list))
if len(match_list) == 0:
self.my_logger.error('No matching buckets for {0} were found.'.format(pattern))
return match_list
except Exception as e:
self.my_logger.error('Buckets could not be listed. ' + str(e))
#--------------------------------------------------------------------------------------------------------#
def create_s3_bucket(self, bucket_name, name_addition=True):
"""Creating an s3 bucket.
Parameter:
----------
bucket_name : string
name_addition : Boolean
if bucket name should be altered when occupied
Returns:
--------
bucket_name : string
"""
bucket_name = self.confirm_bucket_name(bucket_name)
name_occupied = True
i = 3 # counter to exit the loop if 3 name aditions do not work
while name_occupied:
try:
bucket = self.session.create_bucket(Bucket=bucket_name)
name_occupied = False
self.my_logger.info('Bucket ' + bucket_name +' was created!')
# If bucket already exist but is not owned by you, add random string:
except self.session.meta.client.exceptions.BucketAlreadyExists as e:
if (name_addition and i > 0 and len(bucket) < 53): # 3 versuche, max length
addition = self._create_random(10)
bucket_name = bucket_name + "_" + addition
i = i-1
else:
self.my_logger.error(str(e) + ' Bucket name is already occupied by another user!')
return
# If bucket is already owned by you go on as if it was created
except self.session.meta.client.exceptions.BucketAlreadyOwnedByYou as e:
bucket = self.session.Bucket(bucket_name)
self.my_logger.info('Bucket ' + bucket_name +' is already owned by you!')
name_occupied = False
except Exception as e:
self.my_logger.error(str(e) + ' Bucket could not be created!')
return
return bucket_name
#--------------------------------------------------------------------------------------------------------#
def emptie_s3_buckets(self, bucket_names, delete_bucket = False):
""" Delete and emptie bucket
Parameter:
----------
bucket_names : string or list of bucket names
delete_bucket : Bool
if True bucket will be deleted
"""
if isinstance(bucket_names, str):
bucket_list = [bucket_names]
else:
bucket_list = bucket_names
for bucket_name in bucket_list:
try:
bucket = self.session.Bucket(bucket_name)
except Exception as e:
self.my_logger.error('Bucket does not exist.' + str(e))
continue
try:
bucket.objects.all().delete()
if not delete_bucket:
self.my_logger.info("Bucket " + bucket_name + " emptied.")
continue
except Exception as e: # todo Was ist wenn keine Objects da sind?
self.my_logger.error('Bucket Objects could not be deleted.' + str(e))
continue
try:
bucket.delete()
self.my_logger.info("Bucket " + bucket_name + " deleted.")
except Exception as e:
self.my_logger.error('Bucket could not be deleted.' + str(e))
#--------------------------------------------------------------------------------------------------------#
#-------------- File Management -------------------------------------------------------------------------#
def get_object_names(self, bucket_name, pattern=""):
""" Get a list of all objects in bucket
Parameter:
----------
bucket_name : string
pattern : string
Returns:
--------
object_name_list : list
contains names of all objects in bucket
"""
object_name_list = []
if not self.check_s3_bucket_ex(bucket_name):
self.my_logger.error('Bucket does not exist!')
return []
try:
bucket = self.session.Bucket(bucket_name)
except Exception as e:
self.my_logger.error('Problem calling bucket. ' + str(e))
return
try:
for obj in bucket.objects.all():
object_name_list.append(obj.key)
if not pattern:
return object_name_list
else:
match_list = [obj for obj in object_name_list if re.search(pattern, obj)]
self.my_logger.info('Found the following files: ' + str(match_list))
if len(match_list) == 0:
self.my_logger.error('No matching files for {0} were found in bucket {1}.'.format(pattern,bucket_name))
return match_list
except Exception as e:
self.my_logger.error('Objects in Bucket ' + bucket_name + 'could not be listed! ' + str(e))
#--------------------------------------------------------------------------------------------------------#
def upload_s3_objects(self, bucket_name, file_list, compare=True):
"""Creating an s3 bucket.
Parameter:
----------
bucket_name : string
file_list : list
list of local filepaths
compare : boolean
should local and s3 file be compared?
Returns:
--------
Boolean
"""
modBool = False
if not self.check_s3_bucket_ex(bucket_name):
self.my_logger.error("Bucket for upload does not exist!" + str(e))
return
for local_file_name in file_list:
try:
file_name = os.path.basename(local_file_name)
if compare:
modBool = self.compare_s3_etag(bucket_name, file_name, local_file_name)
if not modBool:
if self.multipart_upload:
self.session.Bucket(bucket_name).upload_file(local_file_name, file_name)
else:
self.transfer.upload_file(local_file_name, bucket_name, file_name)
#self.session.Bucket(bucket_name).upload_file(local_file_name, file_name, Config=self.transfer)
self.my_logger.info("All files are uploaded!" + str(file_list))
except Exception as e:
self.my_logger.error("S3: Uploading files failed!" + str(e))
return
return True
#--------------------------------------------------------------------------------------------------------#
def download_s3_objects(self, bucket_name, file_list, destination='.', compare=True):
""" Download files from bucket
Parameter:
----------
bucket_name : string
file_list : list
Files to download
destination : string
timeout : float
how long to watch for file in minutes
"""
bucket = self.session.Bucket(bucket_name)
modBool = False
# check if exists
try:
for local_file in file_list:
file_name = os.path.basename(local_file)
file_path = os.path.join(destination, local_file)
if compare:
modBool = self.compare_s3_etag(bucket_name, file_name, file_path)
if self.check_s3_object_ex(bucket_name, file_name) and not modBool:
if not os.path.exists(file_path):
os.makedirs(os.path.dirname(file_path))
self.my_logger.info('Created directory: ' + os.path.dirname(file_path))
if self.multipart_upload:
bucket.download_file(file_name, file_path)
else:
bucket.download_file(file_name, file_path, Config=self.transfer)
except Exception as e:
self.my_logger.error(str(e) + "Could not download file " + file_name)
\ No newline at end of file
setup.py 0 → 100644
from setuptools import setup
setup(name='loosolab_s3',
version='0.0.1',
description='',
author='Marina Kiweler',
author_email='marina.kiweler@mpi-bn.mpg.de',
license='MIT',
packages=['loosolab_s3'],
install_requires=[
'boto3',
'botocore'
],
classifiers = [
'Programming Language :: Python :: 3'
],
zip_safe=False,
include_package_data=True)
\ No newline at end of file
#!/usr/bin/env python
import importlib
import s3_functions
import boto3
import botocore
#importlib.reload(s3_functions)
credentials = {
"endpoint":"https://s3.mpi-bn.mpg.de",
"key":"mampok-service",
"secret":"pESK7pxvLz4LhHLa",
"signature":"s3v4"
}
def print_green(string):
string = str(string)
print("\33[38;5;41m"+ string +"\33[0m\n")
print_green('!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!START!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!')
my = s3_functions.Loosolab_s3(credentials)
print_green ('checked : create_s3_session , check_s3_credentials')
print_green('get session :')
my_sess = my.get_session()
for bucket in my_sess.buckets.all():
print_green(bucket.name)
print_green('checked when bucket names are print_greened')
print_green('get transfer :')
my_trans = my.get_transfer()
print_green('checked')
print_green('get_bucket_names :')
bucket_names = my.get_bucket_names()
print_green(bucket_names)
print_green('get_bucket_names_by_pattern :')
bucket_names_pat = my.get_bucket_names("zebra")
print_green(bucket_names_pat)
print_green('checked if matching buckets were print_greened')
bucket_names_pat_w = my.get_bucket_names("passtnicht")
print_green(bucket_names_pat_w)
print_green('checked if no match')
print_green('check bucket ex :')
b_ex = my.check_s3_bucket_ex(bucket_names[0])
print(b_ex)
print_green('^ true?')
b_nonex = my.check_s3_bucket_ex('non-existent')
print(b_nonex)
print_green('^ false?')
print_green('get object names :')
obj_names = my.get_object_names(bucket_names[0])
print_green(obj_names)
my.check_s3_object_ex(bucket_names[0], obj_names[0])
print_green('get object names - wrong bucket:')
obj_names_w = my.get_object_names('non-existent')
print_green(obj_names_w)
print_green('get object names by pattern:')
obj_names_pat = my.get_object_names(bucket_names[0], 'h5ad')
print_green('checked if matching files were printed')
obj_names_pat_w = my.get_object_names(bucket_names[0], 'nonexistent')
print_green('checked if no files were printed')
print_green('create_bucket')
new_bucket = my.create_s3_bucket('mampok_my_new_bucket')
print_green(new_bucket)
buck_ex = my.check_s3_bucket_ex(new_bucket)
print_green(str(buck_ex) + '<- true?')
print_green('uplad object')
my.upload_s3_objects(new_bucket, ['tabula-muris.h5ad'])
ob_ex = my.check_s3_object_ex(new_bucket, 'test.txt')
print_green(str(ob_ex) + '<- true?')
print_green('download object')
my.download_s3_objects(new_bucket, ['../tabula-muris.h5ad'])
print_green('downloaded - check')
#print_green('delete bucket')
#my.delete_s3_bucket(new_bucket)
print_green('FINISHED')
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment