diff --git a/b2sdk/api.py b/b2sdk/api.py index 400f92a68..8ab26948c 100644 --- a/b2sdk/api.py +++ b/b2sdk/api.py @@ -10,24 +10,20 @@ import six -from .account_info.sqlite_account_info import SqliteAccountInfo -from .account_info.exception import MissingAccountData -from .b2http import B2Http from .bucket import Bucket, BucketFactory -from .cache import AuthInfoCache, DummyCache -from .transferer import Transferer from .exception import NonExistentBucket, RestrictedBucket -from .file_version import FileVersionInfoFactory, FileIdAndName -from .part import PartFactory -from .raw_api import API_VERSION, B2RawApi +from .file_version import FileIdAndName +from .large_file.services import LargeFileServices +from .raw_api import API_VERSION from .session import B2Session +from .transfer import ( + CopyManager, + DownloadManager, + Emerger, + UploadManager, +) from .utils import B2TraceMeta, b2_url_encode, limit_trace_arguments -try: - import concurrent.futures as futures -except ImportError: - import futures - def url_for_api(info, api_name): """ @@ -44,6 +40,11 @@ def url_for_api(info, api_name): return '%s/b2api/%s/%s' % (base, API_VERSION, api_name) +class Services(object): + def __init__(self, session): + self.large_file = LargeFileServices(session) + + @six.add_metaclass(B2TraceMeta) class B2Api(object): """ @@ -68,7 +69,14 @@ class handles several things that simplify the task of uploading BUCKET_FACTORY_CLASS = staticmethod(BucketFactory) BUCKET_CLASS = staticmethod(Bucket) - def __init__(self, account_info=None, cache=None, raw_api=None, max_upload_workers=10): + def __init__( + self, + account_info=None, + cache=None, + raw_api=None, + max_upload_workers=10, + max_copy_workers=10 + ): """ Initialize the API using the given account info. @@ -77,14 +85,12 @@ def __init__(self, account_info=None, cache=None, raw_api=None, max_upload_worke :class:`~b2sdk.v1.AbstractAccountInfo` To learn more about Account Info objects, see here :class:`~b2sdk.v1.SqliteAccountInfo` - :param cache: an instance of the one of the following classes: :class:`~b2sdk.cache.DummyCache`, :class:`~b2sdk.cache.InMemoryCache`, :class:`~b2sdk.cache.AuthInfoCache`, or any custom class derived from :class:`~b2sdk.cache.AbstractCache` It is used by B2Api to cache the mapping between bucket name and bucket ids. default is :class:`~b2sdk.cache.DummyCache` - :param raw_api: an instance of one of the following classes: :class:`~b2sdk.raw_api.B2RawApi`, :class:`~b2sdk.raw_simulator.RawSimulator`, or any custom class derived from :class:`~b2sdk.raw_api.AbstractRawApi` @@ -93,59 +99,40 @@ def __init__(self, account_info=None, cache=None, raw_api=None, max_upload_worke default is :class:`~b2sdk.raw_api.B2RawApi` :param int max_upload_workers: a number of upload threads, default is 10 + :param int max_copy_workers: a number of copy threads, default is 10 """ - self.raw_api = raw_api or B2RawApi(B2Http()) - if account_info is None: - account_info = SqliteAccountInfo() - if cache is None: - cache = AuthInfoCache(account_info) - self.session = B2Session(self, self.raw_api) - self.transferer = Transferer(self.session, account_info) - self.account_info = account_info - if cache is None: - cache = DummyCache() - self.cache = cache - self.upload_executor = None - self.max_workers = max_upload_workers - - def set_thread_pool_size(self, max_workers): - """ - Set the size of the thread pool to use for uploads and downloads. - - Must be called before any work starts, or the thread pool will get - the default size of 1. + self.session = B2Session(account_info=account_info, cache=cache, raw_api=raw_api) + self.services = Services(self.session) + self.download_manager = DownloadManager(self.session) + self.upload_manager = UploadManager( + self.session, self.services, max_upload_workers=max_upload_workers + ) + self.copy_manager = CopyManager( + self.session, self.services, max_copy_workers=max_copy_workers + ) + self.emerger = Emerger( + self.session, self.services, self.download_manager, self.upload_manager, + self.copy_manager + ) - .. todo:: - move set_thread_pool_size and get_thread_pool to transferer + @property + def account_info(self): + return self.session.account_info - :param int max_workers: maximum allowed number of workers in a pool - """ - if self.upload_executor is not None: - raise Exception('thread pool already created') - self.max_workers = max_workers + @property + def cache(self): + return self.session.cache - def get_thread_pool(self): - """ - Return the thread pool executor to use for uploads and downloads. - """ - if self.upload_executor is None: - self.upload_executor = futures.ThreadPoolExecutor(max_workers=self.max_workers) - return self.upload_executor + @property + def raw_api(self): + return self.session.raw_api def authorize_automatically(self): """ Perform automatic account authorization, retrieving all account data from account info object passed during initialization. """ - try: - self.authorize_account( - self.account_info.get_realm(), - self.account_info.get_application_key_id(), - self.account_info.get_application_key(), - ) - except MissingAccountData: - return False - return True + return self.session.authorize_automatically() @limit_trace_arguments(only=('self', 'realm')) def authorize_account(self, realm, application_key_id, application_key): @@ -156,32 +143,7 @@ def authorize_account(self, realm, application_key_id, application_key): :param str application_key_id: :term:`application key ID` :param str application_key: user's :term:`application key` """ - # Clean up any previous account info if it was for a different account. - try: - old_account_id = self.account_info.get_account_id() - old_realm = self.account_info.get_realm() - if application_key_id != old_account_id or realm != old_realm: - self.cache.clear() - except MissingAccountData: - self.cache.clear() - - # Authorize - realm_url = self.account_info.REALM_URLS[realm] - response = self.raw_api.authorize_account(realm_url, application_key_id, application_key) - allowed = response['allowed'] - - # Store the auth data - self.account_info.set_auth_data( - response['accountId'], - response['authorizationToken'], - response['apiUrl'], - response['downloadUrl'], - response['recommendedPartSize'], - application_key, - realm, - allowed, - application_key_id, - ) + self.session.authorize_account(realm, application_key_id, application_key) def get_account_id(self): """ @@ -245,11 +207,10 @@ def download_file_by_id(self, file_id, download_dest, progress_listener=None, ra position, and the second one is the end position in the file :return: context manager that returns an object that supports iter_content() """ - url = self.session.get_download_url_by_id( - file_id, - url_factory=self.account_info.get_download_url, + url = self.session.get_download_url_by_id(file_id) + return self.download_manager.download_file_from_url( + url, download_dest, progress_listener, range_ ) - return self.transferer.download_file_from_url(url, download_dest, progress_listener, range_) def get_bucket_by_id(self, bucket_id): """ @@ -340,14 +301,9 @@ def list_parts(self, file_id, start_part_number=None, batch_size=None): :param int batch_size: the number of parts to fetch at a time from the server :rtype: generator """ - batch_size = batch_size or 100 - while True: - response = self.session.list_parts(file_id, start_part_number, batch_size) - for part_dict in response['parts']: - yield PartFactory.from_list_parts_dict(part_dict) - start_part_number = response.get('nextPartNumber') - if start_part_number is None: - break + return self.services.large_file.list_parts( + file_id, start_part_number=start_part_number, batch_size=batch_size + ) # delete/cancel def cancel_large_file(self, file_id): @@ -357,8 +313,7 @@ def cancel_large_file(self, file_id): :param str file_id: a file ID :rtype: None """ - response = self.session.cancel_large_file(file_id) - return FileVersionInfoFactory.from_cancel_large_file_response(response) + return self.services.large_file.cancel_large_file(file_id) def delete_file_version(self, file_id, file_name): """ diff --git a/b2sdk/bucket.py b/b2sdk/bucket.py index 9a2878072..40bb05787 100644 --- a/b2sdk/bucket.py +++ b/b2sdk/bucket.py @@ -10,113 +10,17 @@ import logging import six -import threading -from .exception import ( - AlreadyFailed, B2Error, MaxFileSizeExceeded, MaxRetriesExceeded, UnrecognizedBucketType -) +from .exception import UnrecognizedBucketType from .file_version import FileVersionInfoFactory -from .progress import DoNothingProgressListener, AbstractProgressListener, RangeOfInputStream, ReadingStreamWithProgress, StreamWithHash -from .unfinished_large_file import UnfinishedLargeFile +from .progress import DoNothingProgressListener from .upload_source import UploadSourceBytes, UploadSourceLocalFile -from .utils import b2_url_encode, choose_part_ranges, hex_sha1_of_stream, interruptible_get_result, validate_b2_file_name +from .utils import b2_url_encode, validate_b2_file_name from .utils import B2TraceMeta, disable_trace, limit_trace_arguments -from .raw_api import HEX_DIGITS_AT_END logger = logging.getLogger(__name__) -class LargeFileUploadState(object): - """ - Track the status of uploading a large file, accepting updates - from the tasks that upload each of the parts. - - The aggregated progress is passed on to a ProgressListener that - reports the progress for the file as a whole. - - This class is THREAD SAFE. - """ - - def __init__(self, file_progress_listener): - """ - :param b2sdk.v1.AbstractProgressListener file_progress_listener: a progress listener object to use. Use :py:class:`b2sdk.v1.DoNothingProgressListener` to disable. - """ - self.lock = threading.RLock() - self.error_message = None - self.file_progress_listener = file_progress_listener - self.part_number_to_part_state = {} - self.bytes_completed = 0 - - def set_error(self, message): - """ - Set an error message. - - :param str message: an error message - """ - with self.lock: - self.error_message = message - - def has_error(self): - """ - Check whether an error occured. - - :rtype: bool - """ - with self.lock: - return self.error_message is not None - - def get_error_message(self): - """ - Fetche an error message. - - :return: an error message - :rtype: str - """ - with self.lock: - assert self.has_error() - return self.error_message - - def update_part_bytes(self, bytes_delta): - """ - Update listener progress info. - - :param int bytes_delta: number of bytes to increase a progress for - """ - with self.lock: - self.bytes_completed += bytes_delta - self.file_progress_listener.bytes_completed(self.bytes_completed) - - -class PartProgressReporter(AbstractProgressListener): - """ - An adapter that listens to the progress of upload a part and - gives the information to a :py:class:`b2sdk.bucket.LargeFileUploadState`. - - Accepts absolute bytes_completed from the uploader, and reports - deltas to the :py:class:`b2sdk.bucket.LargeFileUploadState`. The bytes_completed for the - part will drop back to 0 on a retry, which will result in a - negative delta. - """ - - def __init__(self, large_file_upload_state, *args, **kwargs): - """ - :param b2sdk.bucket.LargeFileUploadState large_file_upload_state: object to relay the progress to - """ - super(PartProgressReporter, self).__init__(*args, **kwargs) - self.large_file_upload_state = large_file_upload_state - self.prev_byte_count = 0 - - def bytes_completed(self, byte_count): - self.large_file_upload_state.update_part_bytes(byte_count - self.prev_byte_count) - self.prev_byte_count = byte_count - - def close(self): - pass - - def set_total_bytes(self, total_byte_count): - pass - - @six.add_metaclass(B2TraceMeta) class Bucket(object): """ @@ -124,8 +28,6 @@ class Bucket(object): """ DEFAULT_CONTENT_TYPE = 'b2/x-auto' - MAX_UPLOAD_ATTEMPTS = 5 - MAX_LARGE_FILE_SIZE = 10 * 1000 * 1000 * 1000 * 1000 # 10 TB def __init__( self, @@ -250,12 +152,8 @@ def download_file_by_name(self, file_name, download_dest, progress_listener=None :param b2sdk.v1.AbstractProgressListener, None progress_listener: a progress listener object to use, or ``None`` to not track progress :param tuple[int, int] range_: two integer values, start and end offsets """ - url = self.api.session.get_download_url_by_name( - self.name, - file_name, - url_factory=self.api.account_info.get_download_url, - ) - return self.api.transferer.download_file_from_url( + url = self.api.session.get_download_url_by_name(self.name, file_name) + return self.api.download_manager.download_file_from_url( url, download_dest, progress_listener, range_ ) @@ -376,23 +274,19 @@ def ls(self, folder_to_list='', show_versions=False, recursive=False, fetch_coun def list_unfinished_large_files(self, start_file_id=None, batch_size=None, prefix=None): """ A generator that yields an :py:class:`b2sdk.v1.UnfinishedLargeFile` for each - unfinished large file in the bucket, starting at the given file, filtering by prefix. + unfinished large file in the bucket, starting at the given file. :param str,None start_file_id: a file ID to start from or None to start from the beginning :param int,None batch_size: max file count :param str,None prefix: file name prefix filter :rtype: generator[b2sdk.v1.UnfinishedLargeFile] """ - batch_size = batch_size or 100 - while True: - batch = self.api.session.list_unfinished_large_files( - self.id_, start_file_id, batch_size, prefix - ) - for file_dict in batch['files']: - yield UnfinishedLargeFile(file_dict) - start_file_id = batch.get('nextFileId') - if start_file_id is None: - break + return self.api.services.large_file.list_unfinished_large_files( + self.id_, + start_file_id=start_file_id, + batch_size=batch_size, + prefix=prefix, + ) def start_large_file(self, file_name, content_type=None, file_info=None): """ @@ -402,8 +296,8 @@ def start_large_file(self, file_name, content_type=None, file_info=None): :param str,None content_type: the MIME type, or ``None`` to accept the default based on file extension of the B2 file name :param dict,None file_infos: a file info to store with the file or ``None`` to not store anything """ - return UnfinishedLargeFile( - self.api.session.start_large_file(self.id_, file_name, content_type, file_info) + return self.api.services.large_file.start_large_file( + self.id_, file_name, content_type=content_type, file_info=file_info ) @limit_trace_arguments(skip=('data_bytes',)) @@ -501,190 +395,17 @@ def upload( content_type = content_type or self.DEFAULT_CONTENT_TYPE progress_listener = progress_listener or DoNothingProgressListener() - # We don't upload any large files unless all of the parts can be at least - # the minimum part size. - min_part_size = max(min_part_size or 0, self.api.account_info.get_minimum_part_size()) - min_large_file_size = min_part_size * 2 - if upload_source.get_content_length() < min_large_file_size: - # Run small uploads in the same thread pool as large file uploads, - # so that they share resources during a sync. - f = self.api.get_thread_pool().submit( - self._upload_small_file, upload_source, file_name, content_type, file_info, - progress_listener - ) - return f.result() - else: - return self._upload_large_file( - upload_source, file_name, content_type, file_info, progress_listener - ) - - def _upload_small_file( - self, upload_source, file_name, content_type, file_info, progress_listener - ): - content_length = upload_source.get_content_length() - exception_info_list = [] - progress_listener.set_total_bytes(content_length) - with progress_listener: - for _ in six.moves.xrange(self.MAX_UPLOAD_ATTEMPTS): - try: - with upload_source.open() as file: - input_stream = ReadingStreamWithProgress(file, progress_listener) - hashing_stream = StreamWithHash(input_stream) - length_with_hash = content_length + hashing_stream.hash_size() - response = self.api.session.upload_file( - self.id_, None, file_name, length_with_hash, content_type, - HEX_DIGITS_AT_END, file_info, hashing_stream - ) - assert hashing_stream.hash == response['contentSha1'] - return FileVersionInfoFactory.from_api_response(response) - except B2Error as e: - if not e.should_retry_upload(): - raise - exception_info_list.append(e) - self.api.account_info.clear_bucket_upload_data(self.id_) - - raise MaxRetriesExceeded(self.MAX_UPLOAD_ATTEMPTS, exception_info_list) - - def _upload_large_file( - self, upload_source, file_name, content_type, file_info, progress_listener - ): - content_length = upload_source.get_content_length() - if self.MAX_LARGE_FILE_SIZE < content_length: - raise MaxFileSizeExceeded(content_length, self.MAX_LARGE_FILE_SIZE) - minimum_part_size = self.api.account_info.get_minimum_part_size() - - # Set up the progress reporting for the parts - progress_listener.set_total_bytes(content_length) - - # Select the part boundaries - part_ranges = choose_part_ranges(content_length, minimum_part_size) - - # Check for unfinished files with same name - unfinished_file, finished_parts = self._find_unfinished_file_if_possible( + return self.api.upload_manager.upload( + self.id_, upload_source, file_name, + content_type, file_info, - part_ranges, + progress_listener, + # FIXME: Bucket.upload documents wrong logic + min_large_file_size=min_part_size * 2 if min_part_size is not None else None ) - # Tell B2 we're going to upload a file if necessary - if unfinished_file is None: - unfinished_file = self.start_large_file(file_name, content_type, file_info) - file_id = unfinished_file.file_id - - with progress_listener: - large_file_upload_state = LargeFileUploadState(progress_listener) - # Tell the executor to upload each of the parts - part_futures = [ - self.api.get_thread_pool().submit( - self._upload_part, - file_id, - part_index + 1, # part number - part_range, - upload_source, - large_file_upload_state, - finished_parts - ) for (part_index, part_range) in enumerate(part_ranges) - ] - - # Collect the sha1 checksums of the parts as the uploads finish. - # If any of them raised an exception, that same exception will - # be raised here by result() - part_sha1_array = [interruptible_get_result(f)['contentSha1'] for f in part_futures] - - # Finish the large file - response = self.api.session.finish_large_file(file_id, part_sha1_array) - return FileVersionInfoFactory.from_api_response(response) - - def _find_unfinished_file_if_possible(self, upload_source, file_name, file_info, part_ranges): - """ - Find an unfinished file that may be used to resume a large file upload. The - file is found using the filename and comparing the uploaded parts against - the local file. - - This is only possible if the application key being used allows ``listFiles`` access. - """ - if 'listFiles' in self.api.account_info.get_allowed()['capabilities']: - for file_ in self.list_unfinished_large_files(): - if file_.file_name == file_name and file_.file_info == file_info: - files_match = True - finished_parts = {} - for part in self.list_parts(file_.file_id): - # Compare part sizes - offset, part_length = part_ranges[part.part_number - 1] - if part_length != part.content_length: - files_match = False - break - - # Compare hash - with upload_source.open() as f: - f.seek(offset) - sha1_sum = hex_sha1_of_stream(f, part_length) - if sha1_sum != part.content_sha1: - files_match = False - break - - # Save part - finished_parts[part.part_number] = part - - # Skip not matching files or unfinished files with no uploaded parts - if not files_match or not finished_parts: - continue - - # Return first matched file - return file_, finished_parts - return None, {} - - def _upload_part( - self, - file_id, - part_number, - part_range, - upload_source, - large_file_upload_state, - finished_parts=None - ): - # Check if this part was uploaded before - if finished_parts is not None and part_number in finished_parts: - # Report this part finished - part = finished_parts[part_number] - large_file_upload_state.update_part_bytes(part.content_length) - - # Return SHA1 hash - return {'contentSha1': part.content_sha1} - - # Set up a progress listener - part_progress_listener = PartProgressReporter(large_file_upload_state) - - # Retry the upload as needed - exception_list = [] - for _ in six.moves.xrange(self.MAX_UPLOAD_ATTEMPTS): - # if another part has already had an error there's no point in - # uploading this part - if large_file_upload_state.has_error(): - raise AlreadyFailed(large_file_upload_state.get_error_message()) - - try: - with upload_source.open() as file: - offset, content_length = part_range - file.seek(offset) - range_stream = RangeOfInputStream(file, offset, content_length) - input_stream = ReadingStreamWithProgress(range_stream, part_progress_listener) - hashing_stream = StreamWithHash(input_stream) - length_with_hash = content_length + hashing_stream.hash_size() - response = self.api.session.upload_part( - self.id_, file_id, part_number, length_with_hash, HEX_DIGITS_AT_END, - hashing_stream - ) - assert hashing_stream.hash == response['contentSha1'] - return response - except B2Error as e: - exception_list.append(e) - self.api.account_info.clear_bucket_upload_data(self.id_) - - large_file_upload_state.set_error(str(exception_list[-1])) - raise MaxRetriesExceeded(self.MAX_UPLOAD_ATTEMPTS, exception_list) - def get_download_url(self, filename): """ Get file download URL. @@ -708,6 +429,7 @@ def hide_file(self, file_name): response = self.api.session.hide_file(self.id_, file_name) return FileVersionInfoFactory.from_api_response(response) + # FIXME: this would be replaced by `copy(...)` which would use `self.api.upload_manager.copy` def copy_file( self, file_id, diff --git a/b2sdk/large_file/__init__.py b/b2sdk/large_file/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/b2sdk/part.py b/b2sdk/large_file/part.py similarity index 97% rename from b2sdk/part.py rename to b2sdk/large_file/part.py index 5e4a95474..2fc01473e 100644 --- a/b2sdk/part.py +++ b/b2sdk/large_file/part.py @@ -1,6 +1,6 @@ ###################################################################### # -# File: b2sdk/part.py +# File: b2sdk/large_file/part.py # # Copyright 2019 Backblaze Inc. All Rights Reserved. # diff --git a/b2sdk/large_file/services.py b/b2sdk/large_file/services.py new file mode 100644 index 000000000..451961d19 --- /dev/null +++ b/b2sdk/large_file/services.py @@ -0,0 +1,73 @@ +from b2sdk.file_version import FileVersionInfoFactory +from b2sdk.large_file.part import PartFactory +from b2sdk.large_file.unfinished_large_file import UnfinishedLargeFile + + +class LargeFileServices(object): + def __init__(self, session): + self.session = session + + def list_parts(self, file_id, start_part_number=None, batch_size=None): + """ + Generator that yields a :py:class:`b2sdk.v1.Part` for each of the parts that have been uploaded. + + :param str file_id: the ID of the large file that is not finished + :param int start_part_number: the first part number to return; defaults to the first part + :param int batch_size: the number of parts to fetch at a time from the server + :rtype: generator + """ + batch_size = batch_size or 100 + while True: + response = self.session.list_parts(file_id, start_part_number, batch_size) + for part_dict in response['parts']: + yield PartFactory.from_list_parts_dict(part_dict) + start_part_number = response.get('nextPartNumber') + if start_part_number is None: + break + + def list_unfinished_large_files( + self, bucket_id, start_file_id=None, batch_size=None, prefix=None + ): + """ + A generator that yields an :py:class:`b2sdk.v1.UnfinishedLargeFile` for each + unfinished large file in the bucket, starting at the given file. + + :param str bucket_id: bucket id + :param str,None start_file_id: a file ID to start from or None to start from the beginning + :param int,None batch_size: max file count + :param str,None prefix: file name prefix filter + :rtype: generator[b2sdk.v1.UnfinishedLargeFile] + """ + batch_size = batch_size or 100 + while True: + batch = self.session.list_unfinished_large_files( + bucket_id, start_file_id, batch_size, prefix + ) + for file_dict in batch['files']: + yield UnfinishedLargeFile(file_dict) + start_file_id = batch.get('nextFileId') + if start_file_id is None: + break + + def start_large_file(self, bucket_id, file_name, content_type=None, file_info=None): + """ + Start a large file transfer. + + :param str file_name: a file name + :param str,None content_type: the MIME type, or ``None`` to accept the default based on file extension of the B2 file name + :param dict,None file_infos: a file info to store with the file or ``None`` to not store anything + """ + return UnfinishedLargeFile( + self.session.start_large_file(bucket_id, file_name, content_type, file_info) + ) + + # delete/cancel + def cancel_large_file(self, file_id): + """ + Cancel a large file upload. + + :param str file_id: a file ID + :rtype: None + """ + response = self.session.cancel_large_file(file_id) + return FileVersionInfoFactory.from_cancel_large_file_response(response) diff --git a/b2sdk/unfinished_large_file.py b/b2sdk/large_file/unfinished_large_file.py similarity index 96% rename from b2sdk/unfinished_large_file.py rename to b2sdk/large_file/unfinished_large_file.py index f35c3e812..5e6274e9e 100644 --- a/b2sdk/unfinished_large_file.py +++ b/b2sdk/large_file/unfinished_large_file.py @@ -1,6 +1,6 @@ ###################################################################### # -# File: b2sdk/unfinished_large_file.py +# File: b2sdk/large_file/unfinished_large_file.py # # Copyright 2019 Backblaze Inc. All Rights Reserved. # diff --git a/b2sdk/raw_api.py b/b2sdk/raw_api.py index 98fc7ba24..94682bc13 100644 --- a/b2sdk/raw_api.py +++ b/b2sdk/raw_api.py @@ -51,40 +51,6 @@ API_VERSION = 'v2' -@unique -class TokenType(Enum): - API = 'api' - UPLOAD_PART = 'upload_part' - UPLOAD_SMALL = 'upload_small' - - -def set_token_type(token_type): - """ - This is a decorator to identify the type of token that must be passed into a function. - When the raw_api is used through B2Session, it will be used to identify the type of url and token to be passed. - - :param token_type: TokenType enum - :return: - """ - - def inner(func, *args, **kwargs): - func.token_type = token_type - return func - - return inner - - -def get_token_type(func): - """ - This will return the token type that must be passed into the input raw_api function. - The default value is TokenType.API. - - :param func: raw_api function to be called - :return: token type - """ - return getattr(func, 'token_type', TokenType.API) - - @unique class MetadataDirectiveMode(Enum): """ Mode of handling metadata when copying a file """ @@ -98,10 +64,58 @@ class AbstractRawApi(object): Direct access to the B2 web apis. """ + @abstractmethod + def authorize_account(self, realm_url, application_key_id, application_key): + pass + @abstractmethod def cancel_large_file(self, api_url, account_auth_token, file_id): pass + @abstractmethod + def copy_file( + self, + api_url, + account_auth_token, + source_file_id, + new_file_name, + bytes_range=None, + metadata_directive=None, + content_type=None, + file_info=None, + destination_bucket_id=None, + ): + pass + + @abstractmethod + def create_bucket( + self, + api_url, + account_auth_token, + account_id, + bucket_name, + bucket_type, + bucket_info=None, + cors_rules=None, + lifecycle_rules=None + ): + pass + + @abstractmethod + def create_key( + self, api_url, account_auth_token, account_id, capabilities, key_name, + valid_duration_seconds, bucket_id, name_prefix + ): + pass + + @abstractmethod + def download_file_from_url(self, account_auth_token_or_none, url, range_=None): + pass + + @abstractmethod + def delete_key(self, api_url, account_auth_token, application_key_id): + pass + @abstractmethod def delete_bucket(self, api_url, account_auth_token, account_id, bucket_id): pass @@ -114,6 +128,20 @@ def delete_file_version(self, api_url, account_auth_token, file_id, file_name): def finish_large_file(self, api_url, account_auth_token, file_id, part_sha1_array): pass + @abstractmethod + def get_download_authorization( + self, api_url, account_auth_token, bucket_id, file_name_prefix, valid_duration_in_seconds + ): + pass + + @abstractmethod + def get_file_info(self, api_url, account_auth_token, file_id): + pass + + @abstractmethod + def get_upload_url(self, api_url, account_auth_token, bucket_id): + pass + @abstractmethod def get_upload_part_url(self, api_url, account_auth_token, file_id): pass @@ -122,6 +150,53 @@ def get_upload_part_url(self, api_url, account_auth_token, file_id): def hide_file(self, api_url, account_auth_token, bucket_id, file_name): pass + @abstractmethod + def list_buckets( + self, + api_url, + account_auth_token, + account_id, + bucket_id=None, + bucket_name=None, + ): + pass + + @abstractmethod + def list_file_names( + self, + api_url, + account_auth_token, + bucket_id, + start_file_name=None, + max_file_count=None, + prefix=None, + ): + pass + + @abstractmethod + def list_file_versions( + self, + api_url, + account_auth_token, + bucket_id, + start_file_name=None, + start_file_id=None, + max_file_count=None, + prefix=None, + ): + pass + + @abstractmethod + def list_keys( + self, + api_url, + account_auth_token, + account_id, + max_key_count=None, + start_application_key_id=None + ): + pass + @abstractmethod def list_parts(self, api_url, account_auth_token, file_id, start_part_number, max_part_count): pass @@ -159,33 +234,25 @@ def update_bucket( ): pass + @abstractmethod + def upload_file( + self, upload_url, upload_auth_token, file_name, content_length, content_type, content_sha1, + file_infos, data_stream + ): + pass + @abstractmethod def upload_part( self, upload_url, upload_auth_token, part_number, content_length, sha1_sum, input_stream ): pass - def get_download_url_by_id(self, download_url, account_auth_token, file_id): + def get_download_url_by_id(self, download_url, file_id): return '%s/b2api/%s/b2_download_file_by_id?fileId=%s' % (download_url, API_VERSION, file_id) - def get_download_url_by_name(self, download_url, account_auth_token, bucket_name, file_name): + def get_download_url_by_name(self, download_url, bucket_name, file_name): return download_url + '/file/' + bucket_name + '/' + b2_url_encode(file_name) - @abstractmethod - def copy_file( - self, - api_url, - account_auth_token, - source_file_id, - new_file_name, - bytes_range=None, - metadata_directive=None, - content_type=None, - file_info=None, - destination_bucket_id=None, - ): - pass - class B2RawApi(AbstractRawApi): """ @@ -202,10 +269,6 @@ class B2RawApi(AbstractRawApi): of the HTTP calls. It can be mocked-out for testing higher layers. And this class can be tested by exercising each call just once, which is relatively quick. - - All public methods of this class except authorize_account shall accept - api_url and account_info as first two positional arguments. This is needed - for B2Session magic. """ def __init__(self, b2_http): @@ -297,11 +360,10 @@ def delete_key(self, api_url, account_auth_token, application_key_id): applicationKeyId=application_key_id, ) - def download_file_from_url(self, _, account_auth_token_or_none, url, range_=None): + def download_file_from_url(self, account_auth_token_or_none, url, range_=None): """ Issue a streaming request for download of a file, potentially authorized. - :param _: unused (caused by B2Session magic) :param account_auth_token_or_none: an optional account auth token to pass in :param url: the full URL to download from :param range: two-element tuple for http Range header @@ -550,7 +612,6 @@ def check_b2_filename(self, filename): if long_segment > 250: raise UnusableFileName("Filename segment too long (maximum 250 bytes in utf-8).") - @set_token_type(TokenType.UPLOAD_SMALL) def upload_file( self, upload_url, upload_auth_token, file_name, content_length, content_type, content_sha1, file_infos, data_stream @@ -582,7 +643,6 @@ def upload_file( return self.b2_http.post_content_return_json(upload_url, headers, data_stream) - @set_token_type(TokenType.UPLOAD_PART) def upload_part( self, upload_url, upload_auth_token, part_number, content_length, content_sha1, data_stream ): @@ -760,29 +820,29 @@ def test_raw_api_helper(raw_api): # b2_download_file_by_id with auth print('b2_download_file_by_id (auth)') - url = raw_api.get_download_url_by_id(download_url, None, file_id) - with raw_api.download_file_from_url(None, account_auth_token, url) as response: + url = raw_api.get_download_url_by_id(download_url, file_id) + with raw_api.download_file_from_url(account_auth_token, url) as response: data = next(response.iter_content(chunk_size=len(file_contents))) assert data == file_contents, data # b2_download_file_by_id no auth print('b2_download_file_by_id (no auth)') - url = raw_api.get_download_url_by_id(download_url, None, file_id) - with raw_api.download_file_from_url(None, None, url) as response: + url = raw_api.get_download_url_by_id(download_url, file_id) + with raw_api.download_file_from_url(None, url) as response: data = next(response.iter_content(chunk_size=len(file_contents))) assert data == file_contents, data # b2_download_file_by_name with auth print('b2_download_file_by_name (auth)') - url = raw_api.get_download_url_by_name(download_url, None, bucket_name, file_name) - with raw_api.download_file_from_url(None, account_auth_token, url) as response: + url = raw_api.get_download_url_by_name(download_url, bucket_name, file_name) + with raw_api.download_file_from_url(account_auth_token, url) as response: data = next(response.iter_content(chunk_size=len(file_contents))) assert data == file_contents, data # b2_download_file_by_name no auth print('b2_download_file_by_name (no auth)') - url = raw_api.get_download_url_by_name(download_url, None, bucket_name, file_name) - with raw_api.download_file_from_url(None, None, url) as response: + url = raw_api.get_download_url_by_name(download_url, bucket_name, file_name) + with raw_api.download_file_from_url(None, url) as response: data = next(response.iter_content(chunk_size=len(file_contents))) assert data == file_contents, data @@ -795,8 +855,8 @@ def test_raw_api_helper(raw_api): # b2_download_file_by_name with download auth print('b2_download_file_by_name (download auth)') - url = raw_api.get_download_url_by_name(download_url, None, bucket_name, file_name) - with raw_api.download_file_from_url(None, download_auth_token, url) as response: + url = raw_api.get_download_url_by_name(download_url, bucket_name, file_name) + with raw_api.download_file_from_url(download_auth_token, url) as response: data = next(response.iter_content(chunk_size=len(file_contents))) assert data == file_contents, data diff --git a/b2sdk/raw_simulator.py b/b2sdk/raw_simulator.py index c06fffdd0..fd19a8207 100644 --- a/b2sdk/raw_simulator.py +++ b/b2sdk/raw_simulator.py @@ -30,7 +30,7 @@ Unauthorized, UnsatisfiableRange, ) -from .raw_api import AbstractRawApi, HEX_DIGITS_AT_END, MetadataDirectiveMode, set_token_type, TokenType +from .raw_api import AbstractRawApi, HEX_DIGITS_AT_END, MetadataDirectiveMode from .utils import b2_url_decode, b2_url_encode ALL_CAPABILITES = [ @@ -822,7 +822,7 @@ def delete_bucket(self, api_url, account_auth_token, account_id, bucket_id): del self.bucket_id_to_bucket[bucket_id] return bucket.bucket_dict() - def download_file_from_url(self, _, account_auth_token_or_none, url, range_=None): + def download_file_from_url(self, account_auth_token_or_none, url, range_=None): # TODO: check auth token if bucket is not public matcher = self.DOWNLOAD_URL_MATCHER.match(url) assert matcher is not None, url @@ -1074,7 +1074,6 @@ def update_bucket( if_revision_is=if_revision_is ) - @set_token_type(TokenType.UPLOAD_SMALL) def upload_file( self, upload_url, upload_auth_token, file_name, content_length, content_type, content_sha1, file_infos, data_stream @@ -1095,7 +1094,6 @@ def upload_file( self.file_id_to_bucket_id[file_id] = bucket_id return response - @set_token_type(TokenType.UPLOAD_PART) def upload_part( self, upload_url, upload_auth_token, part_number, content_length, sha1_sum, input_stream ): diff --git a/b2sdk/session.py b/b2sdk/session.py index 97f98edd8..29fe4bec9 100644 --- a/b2sdk/session.py +++ b/b2sdk/session.py @@ -8,57 +8,381 @@ # ###################################################################### -import functools +from enum import Enum, unique +from b2sdk.account_info.sqlite_account_info import SqliteAccountInfo +from b2sdk.account_info.exception import MissingAccountData +from b2sdk.b2http import B2Http +from b2sdk.cache import AuthInfoCache, DummyCache from b2sdk.exception import (InvalidAuthToken, Unauthorized) -from b2sdk.raw_api import ALL_CAPABILITIES, get_token_type, TokenType +from b2sdk.raw_api import ALL_CAPABILITIES, B2RawApi + + +@unique +class TokenType(Enum): + API = 'api' + API_TOKEN_ONLY = 'api_token_only' + UPLOAD_PART = 'upload_part' + UPLOAD_SMALL = 'upload_small' class B2Session(object): """ - A *magic* facade that supplies the correct api_url and account_auth_token + A facade that supplies the correct api_url and account_auth_token to methods of underlying raw_api and reauthorizes if necessary. """ - def __init__(self, api, raw_api): - self._api = api # for reauthorization - self.raw_api = raw_api - - def __getattr__(self, name): - f = getattr(self.raw_api, name) - - @functools.wraps(f) - def wrapper(*args, **kwargs): - auth_failure_encountered = False - # A *magic* that will identify and generate the correct type of Url and token based on the decorator on the B2RawApi method. - token_type = get_token_type(f) - # download_by_name uses different URLs - url_factory = kwargs.pop('url_factory', self._api.account_info.get_api_url) - while 1: - try: - if token_type == TokenType.API: - api_url = url_factory() - account_auth_token = self._api.account_info.get_account_auth_token() - return f(api_url, account_auth_token, *args, **kwargs) - elif token_type == TokenType.UPLOAD_SMALL: - return self._upload_small(f, *args, **kwargs) - elif token_type == TokenType.UPLOAD_PART: - return self._upload_part(f, *args, **kwargs) - else: - assert False, 'token type is not supported' - except InvalidAuthToken: - if not auth_failure_encountered: - auth_failure_encountered = True - reauthorization_success = self._api.authorize_automatically() - if reauthorization_success: - continue - # TODO: exception chaining could be added here - # to help debug reauthorization failures - raise - except Unauthorized as e: - raise self._add_app_key_info_to_unauthorized(e) - - return wrapper + def __init__(self, account_info=None, cache=None, raw_api=None): + """ + Initialize Session using given account info. + + :param account_info: an instance of :class:`~b2sdk.v1.UrlPoolAccountInfo`, + or any custom class derived from + :class:`~b2sdk.v1.AbstractAccountInfo` + To learn more about Account Info objects, see here + :class:`~b2sdk.v1.SqliteAccountInfo` + + :param cache: an instance of the one of the following classes: + :class:`~b2sdk.cache.DummyCache`, :class:`~b2sdk.cache.InMemoryCache`, + :class:`~b2sdk.cache.AuthInfoCache`, + or any custom class derived from :class:`~b2sdk.cache.AbstractCache` + It is used by B2Api to cache the mapping between bucket name and bucket ids. + default is :class:`~b2sdk.cache.DummyCache` + + :param raw_api: an instance of one of the following classes: + :class:`~b2sdk.raw_api.B2RawApi`, :class:`~b2sdk.raw_simulator.RawSimulator`, + or any custom class derived from :class:`~b2sdk.raw_api.AbstractRawApi` + It makes network-less unit testing simple by using :class:`~b2sdk.raw_simulator.RawSimulator`, + in tests and :class:`~b2sdk.raw_api.B2RawApi` in production. + default is :class:`~b2sdk.raw_api.B2RawApi` + """ + + self.raw_api = raw_api or B2RawApi(B2Http()) + if account_info is None: + account_info = SqliteAccountInfo() + if cache is None: + cache = AuthInfoCache(account_info) + if cache is None: + cache = DummyCache() + + self.account_info = account_info + self.cache = cache + + def authorize_automatically(self): + """ + Perform automatic account authorization, retrieving all account data + from account info object passed during initialization. + """ + try: + self.authorize_account( + self.account_info.get_realm(), + self.account_info.get_application_key_id(), + self.account_info.get_application_key(), + ) + except MissingAccountData: + return False + return True + + def authorize_account(self, realm, application_key_id, application_key): + """ + Perform account authorization. + + :param str realm: a realm to authorize account in (usually just "production") + :param str application_key_id: :term:`application key ID` + :param str application_key: user's :term:`application key` + """ + # Clean up any previous account info if it was for a different account. + try: + old_account_id = self.account_info.get_account_id() + old_realm = self.account_info.get_realm() + if application_key_id != old_account_id or realm != old_realm: + self.cache.clear() + except MissingAccountData: + self.cache.clear() + + # Authorize + realm_url = self.account_info.REALM_URLS[realm] + response = self.raw_api.authorize_account(realm_url, application_key_id, application_key) + allowed = response['allowed'] + + # Store the auth data + self.account_info.set_auth_data( + response['accountId'], + response['authorizationToken'], + response['apiUrl'], + response['downloadUrl'], + response['recommendedPartSize'], + application_key, + realm, + allowed, + application_key_id, + ) + + def cancel_large_file(self, file_id): + return self._wrap_default_token('cancel_large_file', file_id) + + def create_bucket( + self, + account_id, + bucket_name, + bucket_type, + bucket_info=None, + cors_rules=None, + lifecycle_rules=None + ): + return self._wrap_default_token( + 'create_bucket', + account_id, + bucket_name, + bucket_type, + bucket_info=bucket_info, + cors_rules=cors_rules, + lifecycle_rules=lifecycle_rules, + ) + + def create_key( + self, account_id, capabilities, key_name, valid_duration_seconds, bucket_id, name_prefix + ): + return self._wrap_default_token( + 'create_key', + account_id, + capabilities, + key_name, + valid_duration_seconds, + bucket_id, + name_prefix, + ) + + def delete_key(self, application_key_id): + return self._wrap_default_token('delete_key', application_key_id) + + def delete_bucket(self, account_id, bucket_id): + return self._wrap_default_token('delete_bucket', account_id, bucket_id) + + def delete_file_version(self, file_id, file_name): + return self._wrap_default_token('delete_file_version', file_id, file_name) + + def download_file_from_url(self, url, range_=None): + return self._wrap_token( + 'download_file_from_url', TokenType.API_TOKEN_ONLY, url, range_=range_ + ) + + def finish_large_file(self, file_id, part_sha1_array): + return self._wrap_default_token('finish_large_file', file_id, part_sha1_array) + + def get_download_authorization(self, bucket_id, file_name_prefix, valid_duration_in_seconds): + return self._wrap_default_token( + 'get_download_authorization', bucket_id, file_name_prefix, valid_duration_in_seconds + ) + + def get_file_info(self, file_id): + return self._wrap_default_token('get_file_info', file_id) + + def get_upload_url(self, bucket_id): + return self._wrap_default_token('get_upload_url', bucket_id) + + def get_upload_part_url(self, file_id): + return self._wrap_default_token('get_upload_part_url', file_id) + + def hide_file(self, bucket_id, file_name): + return self._wrap_default_token('hide_file', bucket_id, file_name) + + def list_buckets(self, account_id, bucket_id=None, bucket_name=None): + return self._wrap_default_token( + 'list_buckets', + account_id, + bucket_id=bucket_id, + bucket_name=bucket_name, + ) + + def list_file_names( + self, + bucket_id, + start_file_name=None, + max_file_count=None, + prefix=None, + ): + return self._wrap_default_token( + 'list_file_names', + bucket_id, + start_file_name=start_file_name, + max_file_count=max_file_count, + prefix=prefix, + ) + + def list_file_versions( + self, + bucket_id, + start_file_name=None, + start_file_id=None, + max_file_count=None, + prefix=None, + ): + return self._wrap_default_token( + 'list_file_versions', + bucket_id, + start_file_name=start_file_name, + start_file_id=start_file_id, + max_file_count=max_file_count, + prefix=prefix, + ) + + def list_keys(self, account_id, max_key_count=None, start_application_key_id=None): + return self._wrap_default_token( + 'list_keys', + account_id, + max_key_count=max_key_count, + start_application_key_id=start_application_key_id, + ) + + def list_parts(self, file_id, start_part_number, max_part_count): + return self._wrap_default_token('list_parts', file_id, start_part_number, max_part_count) + + def list_unfinished_large_files( + self, + bucket_id, + start_file_id=None, + max_file_count=None, + prefix=None, + ): + return self._wrap_default_token( + 'list_unfinished_large_files', + bucket_id, + start_file_id=start_file_id, + max_file_count=max_file_count, + prefix=prefix, + ) + + def start_large_file(self, bucket_id, file_name, content_type, file_info): + return self._wrap_default_token( + 'start_large_file', bucket_id, file_name, content_type, file_info + ) + + def update_bucket( + self, + account_id, + bucket_id, + bucket_type=None, + bucket_info=None, + cors_rules=None, + lifecycle_rules=None, + if_revision_is=None, + ): + return self._wrap_default_token( + 'update_bucket', + account_id, + bucket_id, + bucket_type=bucket_type, + bucket_info=bucket_info, + cors_rules=cors_rules, + lifecycle_rules=lifecycle_rules, + if_revision_is=if_revision_is, + ) + + def upload_file( + self, bucket_id, file_name, content_length, content_type, content_sha1, file_infos, + data_stream + ): + return self._wrap_token( + 'upload_file', + TokenType.UPLOAD_SMALL, + bucket_id, + file_name, + content_length, + content_type, + content_sha1, + file_infos, + data_stream, + ) + + def upload_part(self, file_id, part_number, content_length, sha1_sum, input_stream): + return self._wrap_token( + 'upload_part', + TokenType.UPLOAD_PART, + file_id, + part_number, + content_length, + sha1_sum, + input_stream, + ) + + def get_download_url_by_id(self, file_id): + return self.raw_api.get_download_url_by_id(self.account_info.get_download_url(), file_id) + + def get_download_url_by_name(self, bucket_name, file_name): + return self.raw_api.get_download_url_by_name( + self.account_info.get_download_url(), bucket_name, file_name + ) + + def copy_file( + self, + source_file_id, + new_file_name, + bytes_range=None, + metadata_directive=None, + content_type=None, + file_info=None, + destination_bucket_id=None, + ): + return self._wrap_default_token( + 'copy_file', + source_file_id, + new_file_name, + bytes_range=bytes_range, + metadata_directive=metadata_directive, + content_type=content_type, + file_info=file_info, + destination_bucket_id=destination_bucket_id, + ) + + def _wrap_default_token(self, raw_api_name, *args, **kwargs): + return self._wrap_token(raw_api_name, TokenType.API, *args, **kwargs) + + def _wrap_token(self, raw_api_name, token_type, *args, **kwargs): + raw_api_method = getattr(self.raw_api, raw_api_name) + + def api_token_callback(): + api_url = self.account_info.get_api_url() + account_auth_token = self.account_info.get_account_auth_token() + return raw_api_method(api_url, account_auth_token, *args, **kwargs) + + def api_token_only_callback(): + account_auth_token = self.account_info.get_account_auth_token() + return raw_api_method(account_auth_token, *args, **kwargs) + + def upload_small_token_callback(): + return self._upload_small(raw_api_method, *args, **kwargs) + + def upload_part_token_callback(): + return self._upload_part(raw_api_method, *args, **kwargs) + + if token_type == TokenType.API: + callback = api_token_callback + elif token_type == TokenType.API_TOKEN_ONLY: + callback = api_token_only_callback + elif token_type == TokenType.UPLOAD_SMALL: + callback = upload_small_token_callback + elif token_type == TokenType.UPLOAD_PART: + callback = upload_part_token_callback + + return self._reauthorization_loop(callback) + + def _reauthorization_loop(self, callback): + auth_failure_encountered = False + while 1: + try: + return callback() + except InvalidAuthToken: + if not auth_failure_encountered: + auth_failure_encountered = True + reauthorization_success = self.authorize_automatically() + if reauthorization_success: + continue + # TODO: exception chaining could be added here + # to help debug reauthorization failures + raise + except Unauthorized as e: + raise self._add_app_key_info_to_unauthorized(e) def _add_app_key_info_to_unauthorized(self, unauthorized): """ @@ -66,7 +390,7 @@ def _add_app_key_info_to_unauthorized(self, unauthorized): about why it might have failed. """ # What's allowed? - allowed = self._api.account_info.get_allowed() + allowed = self.account_info.get_allowed() capabilities = allowed['capabilities'] bucket_name = allowed['bucketName'] name_prefix = allowed['namePrefix'] @@ -95,7 +419,7 @@ def _get_upload_data(self, bucket_id): Take ownership of an upload URL / auth token for the bucket and return it. """ - account_info = self._api.account_info + account_info = self.account_info upload_url, upload_auth_token = account_info.take_bucket_upload_url(bucket_id) if None not in (upload_url, upload_auth_token): return upload_url, upload_auth_token @@ -108,7 +432,7 @@ def _get_upload_part_data(self, file_id): Make sure that we have an upload URL and auth token for the given bucket and return it. """ - account_info = self._api.account_info + account_info = self.account_info upload_url, upload_auth_token = account_info.take_large_file_upload_url(file_id) if None not in (upload_url, upload_auth_token): return upload_url, upload_auth_token @@ -116,14 +440,14 @@ def _get_upload_part_data(self, file_id): response = self.get_upload_part_url(file_id) return response['uploadUrl'], response['authorizationToken'] - def _upload_small(self, f, bucket_id, file_id, *args, **kwargs): + def _upload_small(self, f, bucket_id, *args, **kwargs): upload_url, upload_auth_token = self._get_upload_data(bucket_id) response = f(upload_url, upload_auth_token, *args, **kwargs) - self._api.account_info.put_bucket_upload_url(bucket_id, upload_url, upload_auth_token) + self.account_info.put_bucket_upload_url(bucket_id, upload_url, upload_auth_token) return response - def _upload_part(self, f, bucket_id, file_id, *args, **kwargs): + def _upload_part(self, f, file_id, *args, **kwargs): upload_url, upload_auth_token = self._get_upload_part_data(file_id) response = f(upload_url, upload_auth_token, *args, **kwargs) - self._api.account_info.put_large_file_upload_url(file_id, upload_url, upload_auth_token) + self.account_info.put_large_file_upload_url(file_id, upload_url, upload_auth_token) return response diff --git a/b2sdk/transfer/__init__.py b/b2sdk/transfer/__init__.py new file mode 100644 index 000000000..14ebd8426 --- /dev/null +++ b/b2sdk/transfer/__init__.py @@ -0,0 +1,11 @@ +from .inbound.download_manager import DownloadManager +from .outbound.copy_manager import CopyManager +from .outbound.upload_manager import UploadManager +from .emerger import Emerger + +__all__ = [ + 'DownloadManager', + 'CopyManager', + 'UploadManager', + 'Emerger', +] \ No newline at end of file diff --git a/b2sdk/transfer/emerger.py b/b2sdk/transfer/emerger.py new file mode 100644 index 000000000..52e72d970 --- /dev/null +++ b/b2sdk/transfer/emerger.py @@ -0,0 +1,154 @@ +import logging + +import six + +from b2sdk.file_version import FileVersionInfoFactory +from b2sdk.utils import B2TraceMetaAbstract, interruptible_get_result + +from .outbound.large_file_upload_state import LargeFileUploadState + +logger = logging.getLogger(__name__) + + +@six.add_metaclass(B2TraceMetaAbstract) +class Emerger(object): + """ + Handle complex actions around multi source copy/uploads. + + This class can be used to build advanced copy workflows like incremental upload. + """ + + def __init__(self, session, services, download_manager, upload_manager, copy_manager): + """ + Initialize the Emerger using the given session and transfer managers. + + :param session: an instance of :class:`~b2sdk.v1.B2Session`, + or any custom class derived from + :class:`~b2sdk.v1.B2Session` + :param services: an instace of :class:`~b2sdk.v1.Services` + :param b2sdk.v1.DownloadManager download_manager: an instace of :class:`~b2sdk.v1.DownloadManager` + :param b2sdk.v1.UploadManager upload_manager: an instace of :class:`~b2sdk.v1.UploadManager` + :param b2sdk.v1.CopyManager copy_manager: an instace of :class:`~b2sdk.v1.CopyManager` + """ + self.session = session + self.services = services + self.download_manager = download_manager + self.upload_manager = upload_manager + self.copy_manager = copy_manager + + def emerge( + self, bucket_id, emerge_ranges_iterator, file_name, content_type, file_info, + progress_listener + ): + """ + Emerge (store multiple sources) of source range iterator. + + :param str bucket_id: a bucket ID + :param emerge_ranges_iterator: iterator of emerge ranges - range wrappers around emerge sources + which are (not determined yet) polymorphism of + :class:`~b2sdk.v1.CopySource` and :class:`~b2sdk.v1.UploadSource` + :param str file_name: the file name of the new B2 file + :param str content_type: the MIME type + :param dict,None file_info: a file info to store with the file or ``None`` to not store anything + :param b2sdk.v1.AbstractProgressListener progress_listener: a progress listener object to use + + Right now it is only a draft implementation that support concat interface only for files > 5mb. + """ + # TODO: this is only a draft implementation that support concat interface only for files > 5mb + + # TODO: we assume that there is more than one emerge range on iterator + unfinished_file = self.services.large_file.start_large_file( + bucket_id, file_name, content_type, file_info + ) + file_id = unfinished_file.file_id + + total_bytes = 0 + part_futures = [] + with progress_listener: + large_file_upload_state = LargeFileUploadState(progress_listener) + for emerge_range in emerge_ranges_iterator: + if emerge_range.destination_offset != total_bytes: + raise NotImplementedError('only non overlapping ranges supported') + + emerge_source_type = emerge_range.emerge_source.get_type() + + # TODO: remeber - this is only a non working draft + if emerge_source_type == 'upload': + # TODO: probably UploadSource and CopySource should inherit from EmergeSource + # then `get_source()` wouldn't be required here + upload_source = emerge_range.emerge_source.get_source() + total_bytes += upload_source.get_content_length() + + # FIXME: what we actually should do here??? + progress_listener.set_total_bytes(total_bytes) + + upload_source_parts = self.upload_manager.split_upload_source(upload_source) + part_futures.extend( + self.upload_manager.get_thread_pool().submit( + self.upload_manager.upload_part, + bucket_id, + file_id, + upload_source_part, + large_file_upload_state, + ) for upload_source_part in upload_source_parts + ) + elif emerge_source_type == 'copy': + copy_source = emerge_range.emerge_source.get_source() + if copy_source.content_length is None: + raise NotImplementedError('unknonwn length copy sources not supported') + + # FIXME: what we actually should do here??? + total_bytes += copy_source.content_length + + copy_source_parts = self.copy_manager.split_copy_source(copy_source) + part_futures.extend( + self.get_thread_pool().submit( + self.copy_manager.copy_part, + file_id, + copy_source_part, + large_file_upload_state, + ) for copy_source_part in copy_source_parts + ) + else: + raise NotImplementedError( + 'not implemented emerge source: {0}'.format(emerge_source_type) + ) + + part_sha1_array = [interruptible_get_result(f)['contentSha1'] for f in part_futures] + + # Finish the large file + response = self.session.finish_large_file(file_id, part_sha1_array) + return FileVersionInfoFactory.from_api_response(response) + + def concat( + self, bucket_id, emrege_sources_iterator, file_name, content_type, file_info, + progress_listener + ): + """ + Concat multiple copy/upload sources. + + :param str bucket_id: a bucket ID + :param emrege_sources_iterator: iterator ofemerge sources which are (not determined yet) polymorphism of + :class:`~b2sdk.v1.CopySource` and :class:`~b2sdk.v1.UploadSource` + :param str file_name: the file name of the new B2 file + :param str content_type: the MIME type + :param dict,None file_info: a file info to store with the file or ``None`` to not store anything + :param b2sdk.v1.AbstractProgressListener progress_listener: a progress listener object to use + + Thin wrapper around ``emerge`` function to show ``Emerger`` possible applications + """ + + def emerge_ranges_generator(): + current_position = 0 + for emerge_source in emrege_sources_iterator: + current_position, upload_range = self._wrap_emerge_range( + current_position, emerge_source + ) + yield upload_range + + return self.emerge( + emerge_ranges_generator, file_name, content_type, file_info, progress_listener + ) + + def _wrap_emerge_range(self, current_position, emerge_source): + pass diff --git a/b2sdk/transferer/__init__.py b/b2sdk/transfer/inbound/__init__.py similarity index 84% rename from b2sdk/transferer/__init__.py rename to b2sdk/transfer/inbound/__init__.py index f76f80f46..a0efa3fad 100644 --- a/b2sdk/transferer/__init__.py +++ b/b2sdk/transfer/inbound/__init__.py @@ -7,7 +7,3 @@ # License https://www.backblaze.com/using_b2_code.html # ###################################################################### - -from .transferer import Transferer - -assert Transferer diff --git a/b2sdk/transferer/transferer.py b/b2sdk/transfer/inbound/download_manager.py similarity index 73% rename from b2sdk/transferer/transferer.py rename to b2sdk/transfer/inbound/download_manager.py index 8064b6dd3..dcab62f92 100644 --- a/b2sdk/transferer/transferer.py +++ b/b2sdk/transfer/inbound/download_manager.py @@ -1,29 +1,29 @@ -###################################################################### -# -# File: b2sdk/transferer/transferer.py -# -# Copyright 2019 Backblaze Inc. All Rights Reserved. -# -# License https://www.backblaze.com/using_b2_code.html -# -###################################################################### - +import logging import six -from ..download_dest import DownloadDestProgressWrapper -from ..exception import ChecksumMismatch, UnexpectedCloudBehaviour, TruncatedOutput, InvalidRange -from ..progress import DoNothingProgressListener -from ..raw_api import SRC_LAST_MODIFIED_MILLIS -from ..utils import B2TraceMetaAbstract +from b2sdk.download_dest import DownloadDestProgressWrapper +from b2sdk.progress import DoNothingProgressListener + +from b2sdk.exception import ( + ChecksumMismatch, + InvalidRange, + TruncatedOutput, + UnexpectedCloudBehaviour, +) +from b2sdk.raw_api import SRC_LAST_MODIFIED_MILLIS +from b2sdk.utils import B2TraceMetaAbstract + +from .downloader.parallel import ParallelDownloader +from .downloader.simple import SimpleDownloader from .file_metadata import FileMetadata -from .parallel import ParallelDownloader -from .simple import SimpleDownloader + +logger = logging.getLogger(__name__) @six.add_metaclass(B2TraceMetaAbstract) -class Transferer(object): +class DownloadManager(object): """ - Handle complex actions around downloads and uploads to free raw_api from that responsibility. + Handle complex actions around downloads to free raw_api from that responsibility. """ # how many chunks to break a downloaded file into @@ -36,15 +36,16 @@ class Transferer(object): MIN_CHUNK_SIZE = 8192 # ~1MB file will show ~1% progress increment MAX_CHUNK_SIZE = 1024**2 - def __init__(self, session, account_info): + def __init__(self, session): """ - :param max_streams: limit on a number of streams to use when downloading in multiple parts - :param min_part_size: the smallest part size for which a stream will be run - when downloading in multiple parts + Initialize the DownloadManager using the given session. + + :param session: an instance of :class:`~b2sdk.v1.B2Session`, + or any custom class derived from + :class:`~b2sdk.v1.B2Session` """ - self.session = session - self.account_info = account_info + self.session = session self.strategies = [ ParallelDownloader( max_streams=self.DEFAULT_MAX_STREAMS, @@ -52,7 +53,7 @@ def __init__(self, session, account_info): min_chunk_size=self.MIN_CHUNK_SIZE, max_chunk_size=self.MAX_CHUNK_SIZE, ), - #IOTDownloader(), # TODO: curl -s httpbin.org/get | tee /dev/stderr 2>ble | sha1sum | cut -c -40 + # IOTDownloader(), # TODO: curl -s httpbin.org/get | tee /dev/stderr 2>ble | sha1sum | cut -c -40 SimpleDownloader( min_chunk_size=self.MIN_CHUNK_SIZE, max_chunk_size=self.MAX_CHUNK_SIZE, @@ -76,7 +77,6 @@ def download_file_from_url( download_dest = DownloadDestProgressWrapper(download_dest, progress_listener) with self.session.download_file_from_url( url, - url_factory=self.account_info.get_download_url, range_=range_, ) as response: metadata = FileMetadata.from_response(response) @@ -123,8 +123,7 @@ def _validate_download(self, range_, bytes_read, actual_sha1, metadata): if bytes_read != metadata.content_length: raise TruncatedOutput(bytes_read, metadata.content_length) - if metadata.content_sha1 != 'none' and \ - actual_sha1 != metadata.content_sha1: # no yapf + if metadata.content_sha1 != 'none' and actual_sha1 != metadata.content_sha1: raise ChecksumMismatch( checksum_type='sha1', expected=metadata.content_length, diff --git a/b2sdk/transfer/inbound/downloader/__init__.py b/b2sdk/transfer/inbound/downloader/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/b2sdk/transferer/abstract.py b/b2sdk/transfer/inbound/downloader/abstract.py similarity index 98% rename from b2sdk/transferer/abstract.py rename to b2sdk/transfer/inbound/downloader/abstract.py index 0606d788f..707b51ebe 100644 --- a/b2sdk/transferer/abstract.py +++ b/b2sdk/transfer/inbound/downloader/abstract.py @@ -14,7 +14,7 @@ import six -from ..utils import B2TraceMetaAbstract +from b2sdk.utils import B2TraceMetaAbstract from .range import Range diff --git a/b2sdk/transferer/parallel.py b/b2sdk/transfer/inbound/downloader/parallel.py similarity index 100% rename from b2sdk/transferer/parallel.py rename to b2sdk/transfer/inbound/downloader/parallel.py diff --git a/b2sdk/transferer/range.py b/b2sdk/transfer/inbound/downloader/range.py similarity index 100% rename from b2sdk/transferer/range.py rename to b2sdk/transfer/inbound/downloader/range.py diff --git a/b2sdk/transferer/simple.py b/b2sdk/transfer/inbound/downloader/simple.py similarity index 100% rename from b2sdk/transferer/simple.py rename to b2sdk/transfer/inbound/downloader/simple.py diff --git a/b2sdk/transferer/file_metadata.py b/b2sdk/transfer/inbound/file_metadata.py similarity index 100% rename from b2sdk/transferer/file_metadata.py rename to b2sdk/transfer/inbound/file_metadata.py diff --git a/b2sdk/transfer/outbound/__init__.py b/b2sdk/transfer/outbound/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/b2sdk/transfer/outbound/copy_manager.py b/b2sdk/transfer/outbound/copy_manager.py new file mode 100644 index 000000000..232c91561 --- /dev/null +++ b/b2sdk/transfer/outbound/copy_manager.py @@ -0,0 +1,284 @@ +import logging +import six + +from b2sdk.exception import ( + AlreadyFailed, + MaxFileSizeExceeded, +) + +from b2sdk.file_version import FileVersionInfoFactory +from b2sdk.raw_api import MetadataDirectiveMode +from b2sdk.utils import B2TraceMetaAbstract, choose_part_ranges, interruptible_get_result + +from .large_file_upload_state import LargeFileUploadState + +try: + import concurrent.futures as futures +except ImportError: + import futures + +from .copy_source import CopySourcePart + +logger = logging.getLogger(__name__) + + +@six.add_metaclass(B2TraceMetaAbstract) +class CopyManager(object): + """ + Handle complex actions around server side copy to free raw_api from that responsibility. + """ + + MAX_LARGE_FILE_SIZE = 10 * 1000 * 1000 * 1000 * 1000 # 10 TB + + def __init__(self, session, services, max_copy_workers=10): + """ + Initialize the CopyManager using the given session. + + :param session: an instance of :class:`~b2sdk.v1.B2Session`, + or any custom class derived from + :class:`~b2sdk.v1.B2Session` + :param services: an instace of :class:`~b2sdk.v1.Services` + :param int max_copy_workers: a number of copy threads, default is 10 + """ + self.session = session + self.services = services + + self.copy_executor = None + self.max_workers = max_copy_workers + + @property + def account_info(self): + return self.session.account_info + + def set_thread_pool_size(self, max_workers): + """ + Set the size of the thread pool to use for uploads and downloads. + + Must be called before any work starts, or the thread pool will get + the default size of 1. + + :param int max_workers: maximum allowed number of workers in a pool + """ + if self.copy_executor is not None: + raise Exception('thread pool already created') + self.max_workers = max_workers + + def get_thread_pool(self): + """ + Return the thread pool executor to use for uploads and downloads. + """ + if self.copy_executor is None: + self.copy_executor = futures.ThreadPoolExecutor(max_workers=self.max_workers) + return self.copy_executor + + def copy( + self, + copy_source, + file_name, + content_type=None, + file_info=None, + progress_listener=None, + destination_bucket_id=None, + min_large_file_size=None + ): + """ + Server side copy of file on B2. + + The source of the copy is an CopySource object that contains all required + information about copy source, like content length, offset. + + :param b2sdk.v1.UploadSource copy_source: an object that opens the source of the upload + :param str file_name: the file name of the new B2 file + :param str,None content_type: the MIME type, or ``None`` to use metadataDirective=COPY if possible + or fetch metadata from server + :param dict,None file_info: a file info to store with the file or ``None`` to not store anything + or copy from source if metadataDirective=COPY is possible + :param b2sdk.v1.AbstractProgressListener,None progress_listener: a progress listener object to use or ``None`` + :param :param str,None destination_bucket_id: a destination bucket ID or ``None`` if destination bucket is the same + as for copy source + :param int,None min_large_file_size: the smallest size that would upload large file or ``None`` to determine automatically + + This function will work in "auto" mode (not supported yet) which would + download file metadata (size, content type, file info) to decide if + file can be copied directly or large file has to be used. For direct copy + there is possibility for copy file metadata without additional call to server + (metadataDirective=COPY is automatically applied if possible and content_type is ``None``) + """ + # We don't copy any large files unless all of the parts can be at least + # the minimum part size. + if min_large_file_size is None: + min_large_file_size = self.account_info.get_minimum_part_size() * 2 + + copy_small_file = False + if copy_source.content_length is None: + if copy_source.small_file_promise: + copy_small_file = True + else: + # TODO: should be supported + raise NotImplementedError('auto mode not supported') + elif copy_source.content_length < min_large_file_size: + copy_small_file = True + + if copy_small_file: + # Run small copies in the same thread pool as large file copies, + # so that they share resources during a sync. + f = self.get_thread_pool().submit( + self._copy_small_file, + copy_source, + file_name, + content_type=content_type, + file_info=file_info, + destination_bucket_id=destination_bucket_id, + ) + # TODO: validate `small_file_promise` with actual copied file size? + return f.result() + else: + if progress_listener is None: + # TODO: proper logic or proper error + raise RuntimeError('progress listener is required for large file copy') + return self._copy_large_file( + copy_source, + file_name, + progress_listener, + content_type=content_type, + file_info=file_info, + destination_bucket_id=destination_bucket_id, + ) + + def split_copy_source(self, copy_source): + """ + Split copy source to copy source part for large file upload + + :param b2sdk.v1.CopySource copy_source: an object that represents remote file to copy + """ + if copy_source.content_length is None: + raise ValueError('cannot split CopySource of unknown length') + source_offset = copy_source.offset or 0 + minimum_part_size = self.account_info.get_minimum_part_size() + part_ranges = choose_part_ranges(copy_source.content_length, minimum_part_size) + for part_number, (part_offset, part_length) in enumerate(part_ranges, 1): + yield CopySourcePart(copy_source, source_offset + part_offset, part_length, part_number) + + def copy_part( + self, large_file_id, copy_source_part, large_file_upload_state, finished_parts=None + ): + """ + Copy a file part to started large file. + + :param :param str bucket_id: a bucket ID + :param file_id: a large file ID + :param b2sdk.v1.CopySourcePart copy_source_part: wrapper for copy source that represnts part range + :param b2sdk.v1.LargeFileUploadState large_file_upload_state: state object for progress reporting + on large file upload + :param dict,None finished_parts: dictionary of known finished parts, keys are part numbers, + values are instances of :class:`~b2sdk.v1.Part` + """ + + # Check if this part was uploaded before + if finished_parts is not None and copy_source_part.part_number in finished_parts: + # Report this part finished + part = finished_parts[copy_source_part.part_number] + large_file_upload_state.update_part_bytes(part.content_length) + + # Return SHA1 hash + return {'contentSha1': part.content_sha1} + + # if another part has already had an error there's no point in + # uploading this part + if large_file_upload_state.has_error(): + raise AlreadyFailed(large_file_upload_state.get_error_message()) + + response = self.session.copy_part( + copy_source_part.file_id, + large_file_id, + copy_source_part.part_number, + bytes_range=copy_source_part.get_bytes_range(), + ) + # TODO: large_file_upload_state.update_part_bytes + return response + + def _copy_small_file( + self, copy_source, file_name, content_type=None, file_info=None, destination_bucket_id=None + ): + # no progress report - because there is nothing to report + if copy_source.content_length is None and copy_source.offset is not None: + raise NotImplementedError('copy offset of unknown length is not supported yet') + + bytes_range = copy_source.get_bytes_range() + + if content_type is None: + metadata_directive = MetadataDirectiveMode.COPY + else: + if file_info is None: + raise ValueError('file_info can be None only when content_type is also None') + metadata_directive = MetadataDirectiveMode.REPLACE + + response = self.session.copy_file( + copy_source.file_id, + file_name, + bytes_range=bytes_range, + metadata_directive=metadata_directive, + content_type=content_type, + file_info=file_info, + destination_bucket_id=destination_bucket_id + ) + return FileVersionInfoFactory.from_api_response(response) + + def _copy_large_file( + self, + copy_source, + file_name, + progress_listener, + content_type=None, + file_info=None, + destination_bucket_id=None + ): + if destination_bucket_id is None: + # TODO: should be supported + raise NotImplementedError( + 'checking for bucket_id of copy source file_id is not supported' + ) + + if content_type is None: + # TODO: should be supported + raise NotImplementedError('metadata copy directive for large files is not supported') + + if self.MAX_LARGE_FILE_SIZE < copy_source.content_length: + raise MaxFileSizeExceeded(copy_source.content_length, self.MAX_LARGE_FILE_SIZE) + + # Set up the progress reporting for the parts + progress_listener.set_total_bytes(copy_source.content_length) + + # Select the part boundaries + copy_source_parts = list(self.split_copy_source(copy_source)) + + # TODO: match unfinished large file and match parts - how? + unfinished_file, finished_parts = None + + if unfinished_file is None: + unfinished_file = self.services.large_file.start_large_file( + destination_bucket_id, file_name, content_type, file_info + ) + file_id = unfinished_file.file_id + + with progress_listener: + large_file_upload_state = LargeFileUploadState(progress_listener) + # Tell the executor to upload each of the parts + part_futures = [ + self.get_thread_pool().submit( + self.copy_part, + file_id, + copy_source_part, + large_file_upload_state, + finished_parts=finished_parts, + ) for copy_source_part in copy_source_parts + ] + + # Collect the sha1 checksums of the parts as the uploads finish. + # If any of them raised an exception, that same exception will + # be raised here by result() + part_sha1_array = [interruptible_get_result(f)['contentSha1'] for f in part_futures] + + # Finish the large file + response = self.session.finish_large_file(file_id, part_sha1_array) + return FileVersionInfoFactory.from_api_response(response) diff --git a/b2sdk/transfer/outbound/copy_source.py b/b2sdk/transfer/outbound/copy_source.py new file mode 100644 index 000000000..63bbc3126 --- /dev/null +++ b/b2sdk/transfer/outbound/copy_source.py @@ -0,0 +1,38 @@ +class CopySource(object): + def __init__( + self, file_id, content_length=None, file_info=None, offset=0, small_file_promise=False + ): + self.file_id = file_id + if self.content_length is not None and small_file_promise: + raise ValueError('Cannot promise small file of known content length') + self.content_length = content_length + self.file_info = file_info + if self.offset > 0 and small_file_promise: + raise ValueError('Cannot offset in promissed small file') + self.offset = offset + self.small_file_promise = small_file_promise + + def get_bytes_range(self): + if self.content_length is None: + if self.offset > 0: + # auto mode should get file info and create correct copy source (with content_length) + raise ValueError('cannot return bytes range for non zero offset and unknown length') + return None + + return (self.offset, self.offset + self.content_length - 1) + + +class CopySourcePart(object): + def __init__(self, copy_source, part_number, source_offset=0, part_length=None): + self.copy_source = copy_source + if source_offset > 0 and part_length is None: + raise ValueError('Cannot set offset for unknown length') + self.source_offset = source_offset + self.part_length = part_length + self.part_number = part_number + + def get_bytes_range(self): + if self.part_length is None: + return None + + return (self.source_offset, self.source_offset + self.part_length - 1) diff --git a/b2sdk/transfer/outbound/large_file_upload_state.py b/b2sdk/transfer/outbound/large_file_upload_state.py new file mode 100644 index 000000000..aead2e6fe --- /dev/null +++ b/b2sdk/transfer/outbound/large_file_upload_state.py @@ -0,0 +1,62 @@ +import threading + + +class LargeFileUploadState(object): + """ + Track the status of uploading a large file, accepting updates + from the tasks that upload each of the parts. + + The aggregated progress is passed on to a ProgressListener that + reports the progress for the file as a whole. + + This class is THREAD SAFE. + """ + + def __init__(self, file_progress_listener): + """ + :param b2sdk.v1.AbstractProgressListener file_progress_listener: a progress listener object to use. Use :py:class:`b2sdk.v1.DoNothingProgressListener` to disable. + """ + self.lock = threading.RLock() + self.error_message = None + self.file_progress_listener = file_progress_listener + self.part_number_to_part_state = {} + self.bytes_completed = 0 + + def set_error(self, message): + """ + Set an error message. + + :param str message: an error message + """ + with self.lock: + self.error_message = message + + def has_error(self): + """ + Check whether an error occured. + + :rtype: bool + """ + with self.lock: + return self.error_message is not None + + def get_error_message(self): + """ + Fetche an error message. + + :return: an error message + :rtype: str + """ + with self.lock: + assert self.has_error() + return self.error_message + + def update_part_bytes(self, bytes_delta): + """ + Update listener progress info. + + :param int bytes_delta: number of bytes to increase a progress for + """ + with self.lock: + self.bytes_completed += bytes_delta + self.file_progress_listener.bytes_completed(self.bytes_completed) diff --git a/b2sdk/transfer/outbound/progress_reporter.py b/b2sdk/transfer/outbound/progress_reporter.py new file mode 100644 index 000000000..59a6403ac --- /dev/null +++ b/b2sdk/transfer/outbound/progress_reporter.py @@ -0,0 +1,31 @@ +from b2sdk.progress import AbstractProgressListener + + +class PartProgressReporter(AbstractProgressListener): + """ + An adapter that listens to the progress of upload a part and + gives the information to a :py:class:`b2sdk.bucket.LargeFileUploadState`. + + Accepts absolute bytes_completed from the uploader, and reports + deltas to the :py:class:`b2sdk.bucket.LargeFileUploadState`. The bytes_completed for the + part will drop back to 0 on a retry, which will result in a + negative delta. + """ + + def __init__(self, large_file_upload_state, *args, **kwargs): + """ + :param b2sdk.bucket.LargeFileUploadState large_file_upload_state: object to relay the progress to + """ + super(PartProgressReporter, self).__init__(*args, **kwargs) + self.large_file_upload_state = large_file_upload_state + self.prev_byte_count = 0 + + def bytes_completed(self, byte_count): + self.large_file_upload_state.update_part_bytes(byte_count - self.prev_byte_count) + self.prev_byte_count = byte_count + + def close(self): + pass + + def set_total_bytes(self, total_byte_count): + pass diff --git a/b2sdk/transfer/outbound/upload_manager.py b/b2sdk/transfer/outbound/upload_manager.py new file mode 100644 index 000000000..f709fc03a --- /dev/null +++ b/b2sdk/transfer/outbound/upload_manager.py @@ -0,0 +1,323 @@ +import logging +import six + +from b2sdk.exception import ( + AlreadyFailed, + B2Error, + MaxFileSizeExceeded, + MaxRetriesExceeded, +) +from b2sdk.file_version import FileVersionInfoFactory +from b2sdk.progress import (ReadingStreamWithProgress, StreamWithHash) +from b2sdk.raw_api import HEX_DIGITS_AT_END +from b2sdk.utils import B2TraceMetaAbstract, choose_part_ranges, hex_sha1_of_stream, interruptible_get_result + +from .large_file_upload_state import LargeFileUploadState +from .progress_reporter import PartProgressReporter +from .upload_source_part import UploadSourcePart + +try: + import concurrent.futures as futures +except ImportError: + import futures + +logger = logging.getLogger(__name__) + + +@six.add_metaclass(B2TraceMetaAbstract) +class UploadManager(object): + """ + Handle complex actions around uploads to free raw_api from that responsibility. + """ + + MAX_UPLOAD_ATTEMPTS = 5 + MAX_LARGE_FILE_SIZE = 10 * 1000 * 1000 * 1000 * 1000 # 10 TB + + def __init__(self, session, services, max_upload_workers=10): + """ + Initialize the CopyManager using the given session. + + :param session: an instance of :class:`~b2sdk.v1.B2Session`, + or any custom class derived from + :class:`~b2sdk.v1.B2Session` + :param services: an instace of :class:`~b2sdk.v1.Services` + :param int max_upload_workers: a number of upload threads, default is 10 + """ + self.session = session + self.services = services + + self.upload_executor = None + self.max_workers = max_upload_workers + + @property + def account_info(self): + return self.session.account_info + + def set_thread_pool_size(self, max_workers): + """ + Set the size of the thread pool to use for uploads and downloads. + + Must be called before any work starts, or the thread pool will get + the default size of 1. + + :param int max_workers: maximum allowed number of workers in a pool + """ + if self.upload_executor is not None: + raise Exception('thread pool already created') + self.max_workers = max_workers + + def get_thread_pool(self): + """ + Return the thread pool executor to use for uploads and downloads. + """ + if self.upload_executor is None: + self.upload_executor = futures.ThreadPoolExecutor(max_workers=self.max_workers) + return self.upload_executor + + def upload( + self, + bucket_id, + upload_source, + file_name, + content_type, + file_info, + progress_listener, + min_large_file_size=None + ): + """ + Upload a file to B2, retrying as needed. + + The source of the upload is an UploadSource object that can be used to + open (and re-open) the file. The result of opening should be a binary + file whose read() method returns bytes. + + :param :param str bucket_id: a bucket ID + :param b2sdk.v1.UploadSource upload_source: an object that opens the source of the upload + :param str file_name: the file name of the new B2 file + :param str,None content_type: the MIME type, or ``None`` to accept the default based on file extension of the B2 file name + :param dict,None file_info: a file info to store with the file or ``None`` to not store anything + :param b2sdk.v1.AbstractProgressListener progress_listener: a progress listener object to use + :param int,None min_large_file_size: the smallest size that would upload large file or ``None`` to determine automatically + + The function `opener` should return a file-like object, and it + must be possible to call it more than once in case the upload + is retried. + """ + + # We don't upload any large files unless all of the parts can be at least + # the minimum part size. + if min_large_file_size is None: + min_large_file_size = self.account_info.get_minimum_part_size() * 2 + + if upload_source.get_content_length() < min_large_file_size: + # Run small uploads in the same thread pool as large file uploads, + # so that they share resources during a sync. + f = self.get_thread_pool().submit( + self._upload_small_file, + bucket_id, + upload_source, + file_name, + content_type, + file_info, + progress_listener, + ) + return f.result() + else: + return self._upload_large_file( + bucket_id, + upload_source, + file_name, + content_type, + file_info, + progress_listener, + ) + + def split_upload_source(self, upload_source): + """ + Split upload source to upload source part for large file upload + + :param b2sdk.v1.UploadSource upload_source: an object that opens the source of the upload + """ + minimum_part_size = self.account_info.get_minimum_part_size() + part_ranges = choose_part_ranges(upload_source.get_content_length(), minimum_part_size) + for (part_number, (source_offset, part_length)) in enumerate(part_ranges, 1): + yield UploadSourcePart(upload_source, source_offset, part_length, part_number) + + def upload_part( + self, bucket_id, file_id, upload_source_part, large_file_upload_state, finished_parts=None + ): + """ + Upload a file part to started large file. + + :param :param str bucket_id: a bucket ID + :param file_id: a large file ID + :param b2sdk.v1.UploadSourcePart upload_source_part: wrapper for upload source that reads only required range + :param b2sdk.v1.LargeFileUploadState large_file_upload_state: state object for progress reporting + on large file upload + :param dict,None finished_parts: dictionary of known finished parts, keys are part numbers, + values are instances of :class:`~b2sdk.v1.Part` + """ + # Check if this part was uploaded before + if finished_parts is not None and upload_source_part.part_number in finished_parts: + # Report this part finished + part = finished_parts[upload_source_part.part_number] + large_file_upload_state.update_part_bytes(part.content_length) + + # Return SHA1 hash + return {'contentSha1': part.content_sha1} + + # Set up a progress listener + part_progress_listener = PartProgressReporter(large_file_upload_state) + + # Retry the upload as needed + exception_list = [] + for _ in six.moves.xrange(self.MAX_UPLOAD_ATTEMPTS): + # if another part has already had an error there's no point in + # uploading this part + if large_file_upload_state.has_error(): + raise AlreadyFailed(large_file_upload_state.get_error_message()) + + try: + with upload_source_part.range_of_input_stream() as range_stream: + input_stream = ReadingStreamWithProgress(range_stream, part_progress_listener) + hashing_stream = StreamWithHash(input_stream) + length_with_hash = upload_source_part.part_length + hashing_stream.hash_size() + response = self.session.upload_part( + file_id, upload_source_part.part_number, length_with_hash, + HEX_DIGITS_AT_END, hashing_stream + ) + assert hashing_stream.hash == response['contentSha1'] + return response + + except B2Error as e: + if not e.should_retry_upload(): + raise + exception_list.append(e) + self.account_info.clear_bucket_upload_data(bucket_id) + + large_file_upload_state.set_error(str(exception_list[-1])) + raise MaxRetriesExceeded(self.MAX_UPLOAD_ATTEMPTS, exception_list) + + def _upload_small_file( + self, bucket_id, upload_source, file_name, content_type, file_info, progress_listener + ): + content_length = upload_source.get_content_length() + exception_info_list = [] + progress_listener.set_total_bytes(content_length) + with progress_listener: + for _ in six.moves.xrange(self.MAX_UPLOAD_ATTEMPTS): + try: + with upload_source.open() as file: + input_stream = ReadingStreamWithProgress(file, progress_listener) + hashing_stream = StreamWithHash(input_stream) + length_with_hash = content_length + hashing_stream.hash_size() + response = self.session.upload_file( + bucket_id, file_name, length_with_hash, content_type, HEX_DIGITS_AT_END, + file_info, hashing_stream + ) + assert hashing_stream.hash == response['contentSha1'] + return FileVersionInfoFactory.from_api_response(response) + + except B2Error as e: + if not e.should_retry_upload(): + raise + exception_info_list.append(e) + self.account_info.clear_bucket_upload_data(bucket_id) + + raise MaxRetriesExceeded(self.MAX_UPLOAD_ATTEMPTS, exception_info_list) + + def _upload_large_file( + self, bucket_id, upload_source, file_name, content_type, file_info, progress_listener + ): + content_length = upload_source.get_content_length() + if self.MAX_LARGE_FILE_SIZE < content_length: + raise MaxFileSizeExceeded(content_length, self.MAX_LARGE_FILE_SIZE) + + # Set up the progress reporting for the parts + progress_listener.set_total_bytes(content_length) + + # Select the part boundaries + upload_source_parts = list(self.split_upload_source(upload_source)) + + # Check for unfinished files with same name + unfinished_file, finished_parts = self._find_unfinished_file_if_possible( + bucket_id, + file_name, + file_info, + upload_source_parts, + ) + + # Tell B2 we're going to upload a file if necessary + if unfinished_file is None: + unfinished_file = self.services.large_file.start_large_file( + bucket_id, file_name, content_type, file_info + ) + file_id = unfinished_file.file_id + + with progress_listener: + large_file_upload_state = LargeFileUploadState(progress_listener) + # Tell the executor to upload each of the parts + part_futures = [ + self.get_thread_pool().submit( + self.upload_part, + bucket_id, + file_id, + upload_source_part, + large_file_upload_state, + finished_parts, + ) for upload_source_part in upload_source_parts + ] + + # Collect the sha1 checksums of the parts as the uploads finish. + # If any of them raised an exception, that same exception will + # be raised here by result() + part_sha1_array = [interruptible_get_result(f)['contentSha1'] for f in part_futures] + + # Finish the large file + response = self.session.finish_large_file(file_id, part_sha1_array) + return FileVersionInfoFactory.from_api_response(response) + + def _find_unfinished_file_if_possible( + self, bucket_id, file_name, file_info, upload_source_parts + ): + """ + Find an unfinished file that may be used to resume a large file upload. The + file is found using the filename and comparing the uploaded parts against + the local file. + + This is only possible if the application key being used allows ``listFiles`` access. + """ + upload_source_parts_dict = { + upload_source_part.part_number: upload_source_part + for upload_source_part in upload_source_parts + } + if 'listFiles' in self.account_info.get_allowed()['capabilities']: + for file_ in self.services.large_file.list_unfinished_large_files(bucket_id): + if file_.file_name == file_name and file_.file_info == file_info: + files_match = True + finished_parts = {} + for part in self.services.large_file.list_parts(file_.file_id): + # Compare part sizes + upload_source_part = upload_source_parts_dict[part.part_number] + if upload_source_part.part_length != part.content_length: + files_match = False + break + + # Compare hash + with upload_source_part.upload_source.open() as f: + f.seek(upload_source_part.source_offset) + sha1_sum = hex_sha1_of_stream(f, upload_source_part.part_length) + if sha1_sum != part.content_sha1: + files_match = False + break + + # Save part + finished_parts[part.part_number] = part + + # Skip not matching files or unfinished files with no uploaded parts + if not files_match or not finished_parts: + continue + + # Return first matched file + return file_, finished_parts + return None, {} diff --git a/b2sdk/transfer/outbound/upload_source_part.py b/b2sdk/transfer/outbound/upload_source_part.py new file mode 100644 index 000000000..2bc599c17 --- /dev/null +++ b/b2sdk/transfer/outbound/upload_source_part.py @@ -0,0 +1,17 @@ +from contextlib import contextmanager + +from b2sdk.progress import RangeOfInputStream + + +class UploadSourcePart(object): + def __init__(self, upload_source, source_offset, part_length, part_number): + self.upload_source = upload_source + self.source_offset = source_offset + self.part_length = part_length + self.part_number = part_number + + @contextmanager + def range_of_input_stream(self): + with self.upload_source.open() as file: + file.seek(self.source_offset) # FIXME: RangeOfInputStream should fo it in __init__ + yield RangeOfInputStream(file, self.source_offset, self.part_length) diff --git a/b2sdk/v1/__init__.py b/b2sdk/v1/__init__.py index 7d1a67323..ed9939acf 100644 --- a/b2sdk/v1/__init__.py +++ b/b2sdk/v1/__init__.py @@ -16,8 +16,6 @@ from b2sdk.api import B2Api from b2sdk.bucket import Bucket from b2sdk.bucket import BucketFactory -from b2sdk.bucket import LargeFileUploadState -from b2sdk.bucket import PartProgressReporter from b2sdk.raw_api import ALL_CAPABILITIES # account info @@ -43,8 +41,8 @@ from b2sdk.file_version import FileIdAndName from b2sdk.file_version import FileVersionInfo -from b2sdk.part import Part -from b2sdk.unfinished_large_file import UnfinishedLargeFile +from b2sdk.large_file.part import Part +from b2sdk.large_file.unfinished_large_file import UnfinishedLargeFile # progress reporting @@ -68,7 +66,6 @@ from b2sdk.raw_api import AbstractRawApi from b2sdk.raw_api import B2RawApi from b2sdk.raw_api import MetadataDirectiveMode -from b2sdk.raw_api import TokenType # progress @@ -90,19 +87,21 @@ from b2sdk.upload_source import UploadSourceBytes from b2sdk.upload_source import UploadSourceLocalFile -# trasferer - -from b2sdk.transferer.abstract import AbstractDownloader -from b2sdk.transferer.file_metadata import FileMetadata -from b2sdk.transferer.parallel import AbstractDownloaderThread -from b2sdk.transferer.parallel import FirstPartDownloaderThread -from b2sdk.transferer.parallel import NonHashingDownloaderThread -from b2sdk.transferer.parallel import ParallelDownloader -from b2sdk.transferer.parallel import PartToDownload -from b2sdk.transferer.parallel import WriterThread -from b2sdk.transferer.range import Range -from b2sdk.transferer.simple import SimpleDownloader -from b2sdk.transferer.transferer import Transferer +# trasfer + +from b2sdk.transfer.inbound.downloader.abstract import AbstractDownloader +from b2sdk.transfer.inbound.file_metadata import FileMetadata +from b2sdk.transfer.outbound.large_file_upload_state import LargeFileUploadState +from b2sdk.transfer.inbound.downloader.parallel import AbstractDownloaderThread +from b2sdk.transfer.inbound.downloader.parallel import FirstPartDownloaderThread +from b2sdk.transfer.inbound.downloader.parallel import NonHashingDownloaderThread +from b2sdk.transfer.inbound.downloader.parallel import ParallelDownloader +from b2sdk.transfer.inbound.downloader.parallel import PartToDownload +from b2sdk.transfer.inbound.downloader.parallel import WriterThread +from b2sdk.transfer.outbound.progress_reporter import PartProgressReporter +from b2sdk.transfer.inbound.downloader.range import Range +from b2sdk.transfer.inbound.downloader.simple import SimpleDownloader +from b2sdk.transfer.outbound.upload_source_part import UploadSourcePart # sync diff --git a/doc/source/api/internal/transfer/emerger.rst b/doc/source/api/internal/transfer/emerger.rst new file mode 100644 index 000000000..aca6ed61c --- /dev/null +++ b/doc/source/api/internal/transfer/emerger.rst @@ -0,0 +1,8 @@ +:mod:`b2sdk.transfer.emerger` -- Manager of multiple source copy/uploads +======================================================================== + +.. automodule:: b2sdk.transfer.emerger + :members: + :undoc-members: + :show-inheritance: + :special-members: __init__ diff --git a/doc/source/api/internal/transfer/inbound/download_manager.rst b/doc/source/api/internal/transfer/inbound/download_manager.rst new file mode 100644 index 000000000..b5ca47bb1 --- /dev/null +++ b/doc/source/api/internal/transfer/inbound/download_manager.rst @@ -0,0 +1,8 @@ +:mod:`b2sdk.transfer.inbound.download_manager` -- Manager of downloaders +======================================================================== + +.. automodule:: b2sdk.transfer.inbound.download_manager + :members: + :undoc-members: + :show-inheritance: + :special-members: __init__ diff --git a/doc/source/api/internal/transfer/inbound/downloader/abstract.rst b/doc/source/api/internal/transfer/inbound/downloader/abstract.rst new file mode 100644 index 000000000..68ef282b6 --- /dev/null +++ b/doc/source/api/internal/transfer/inbound/downloader/abstract.rst @@ -0,0 +1,8 @@ +:mod:`b2sdk.transfer.inbound.downloader.abstract` -- Downloader base class +========================================================================== + +.. automodule:: b2sdk.transfer.inbound.downloader.abstract + :members: + :undoc-members: + :show-inheritance: + :special-members: __init__ diff --git a/doc/source/api/internal/transfer/inbound/downloader/parallel.rst b/doc/source/api/internal/transfer/inbound/downloader/parallel.rst new file mode 100644 index 000000000..8e78f3668 --- /dev/null +++ b/doc/source/api/internal/transfer/inbound/downloader/parallel.rst @@ -0,0 +1,8 @@ +:mod:`b2sdk.transfer.inbound.downloader.parallel` -- ParallelTransferer +======================================================================= + +.. automodule:: b2sdk.transfer.inbound.downloader.parallel + :members: + :undoc-members: + :show-inheritance: + :special-members: __init__ diff --git a/doc/source/api/internal/transfer/inbound/downloader/range.rst b/doc/source/api/internal/transfer/inbound/downloader/range.rst new file mode 100644 index 000000000..5e5204411 --- /dev/null +++ b/doc/source/api/internal/transfer/inbound/downloader/range.rst @@ -0,0 +1,8 @@ +:mod:`b2sdk.transfer.inbound.downloader.range` -- transfer range toolkit +======================================================================== + +.. automodule:: b2sdk.transfer.inbound.downloader.range + :members: + :undoc-members: + :show-inheritance: + :special-members: __init__ diff --git a/doc/source/api/internal/transfer/inbound/downloader/simple.rst b/doc/source/api/internal/transfer/inbound/downloader/simple.rst new file mode 100644 index 000000000..dbddfa9cb --- /dev/null +++ b/doc/source/api/internal/transfer/inbound/downloader/simple.rst @@ -0,0 +1,8 @@ +:mod:`b2sdk.transfer.inbound.downloader.simple` -- SimpleDownloader +=================================================================== + +.. automodule:: b2sdk.transfer.inbound.downloader.simple + :members: + :undoc-members: + :show-inheritance: + :special-members: __init__ diff --git a/doc/source/api/internal/transfer/inbound/file_metadata.rst b/doc/source/api/internal/transfer/inbound/file_metadata.rst new file mode 100644 index 000000000..fcd64b9d5 --- /dev/null +++ b/doc/source/api/internal/transfer/inbound/file_metadata.rst @@ -0,0 +1,8 @@ +:mod:`b2sdk.transfer.inbound.file_metadata` +=========================================== + +.. automodule:: b2sdk.transfer.inbound.file_metadata + :members: + :undoc-members: + :show-inheritance: + :special-members: __init__ diff --git a/doc/source/api/internal/transfer/outbound/copy_manager.rst b/doc/source/api/internal/transfer/outbound/copy_manager.rst new file mode 100644 index 000000000..18fe0b427 --- /dev/null +++ b/doc/source/api/internal/transfer/outbound/copy_manager.rst @@ -0,0 +1,8 @@ +:mod:`b2sdk.transfer.outbound.copy_manager` -- Manager of server side copy +========================================================================== + +.. automodule:: b2sdk.transfer.outbound.copy_manager + :members: + :undoc-members: + :show-inheritance: + :special-members: __init__ diff --git a/doc/source/api/internal/transfer/outbound/copy_source.rst b/doc/source/api/internal/transfer/outbound/copy_source.rst new file mode 100644 index 000000000..5b96f8e1c --- /dev/null +++ b/doc/source/api/internal/transfer/outbound/copy_source.rst @@ -0,0 +1,8 @@ +:mod:`b2sdk.transfer.outbound.copy_source` +========================================== + +.. automodule:: b2sdk.transfer.outbound.copy_source + :members: + :undoc-members: + :show-inheritance: + :special-members: __init__ diff --git a/doc/source/api/internal/transferer/parallel.rst b/doc/source/api/internal/transfer/outbound/large_file_upload_state.rst similarity index 54% rename from doc/source/api/internal/transferer/parallel.rst rename to doc/source/api/internal/transfer/outbound/large_file_upload_state.rst index 0dfe138a8..2b02d5198 100644 --- a/doc/source/api/internal/transferer/parallel.rst +++ b/doc/source/api/internal/transfer/outbound/large_file_upload_state.rst @@ -1,7 +1,7 @@ -:mod:`b2sdk.transferer.parallel` -- ParallelTransferer +:mod:`b2sdk.transfer.outbound.large_file_upload_state` ====================================================== -.. automodule:: b2sdk.transferer.parallel +.. automodule:: b2sdk.transfer.outbound.large_file_upload_state :members: :undoc-members: :show-inheritance: diff --git a/doc/source/api/internal/transfer/outbound/progress_reporter.rst b/doc/source/api/internal/transfer/outbound/progress_reporter.rst new file mode 100644 index 000000000..66cbf2911 --- /dev/null +++ b/doc/source/api/internal/transfer/outbound/progress_reporter.rst @@ -0,0 +1,8 @@ +:mod:`b2sdk.transfer.outbound.progress_reporter` +================================================ + +.. automodule:: b2sdk.transfer.outbound.progress_reporter + :members: + :undoc-members: + :show-inheritance: + :special-members: __init__ diff --git a/doc/source/api/internal/transfer/outbound/upload_manager.rst b/doc/source/api/internal/transfer/outbound/upload_manager.rst new file mode 100644 index 000000000..630f2b1ca --- /dev/null +++ b/doc/source/api/internal/transfer/outbound/upload_manager.rst @@ -0,0 +1,8 @@ +:mod:`b2sdk.transfer.outbound.upload_manager` -- Manager of local source uploads +================================================================================= + +.. automodule:: b2sdk.transfer.outbound.upload_manager + :members: + :undoc-members: + :show-inheritance: + :special-members: __init__ diff --git a/doc/source/api/internal/transfer/outbound/upload_source_part.rst b/doc/source/api/internal/transfer/outbound/upload_source_part.rst new file mode 100644 index 000000000..a45b79fca --- /dev/null +++ b/doc/source/api/internal/transfer/outbound/upload_source_part.rst @@ -0,0 +1,8 @@ +:mod:`b2sdk.transfer.outbound.upload_source_part` +================================================= + +.. automodule:: b2sdk.transfer.outbound.upload_source_part + :members: + :undoc-members: + :show-inheritance: + :special-members: __init__ diff --git a/doc/source/api/internal/transferer/abstract.rst b/doc/source/api/internal/transferer/abstract.rst deleted file mode 100644 index 4bf194bee..000000000 --- a/doc/source/api/internal/transferer/abstract.rst +++ /dev/null @@ -1,8 +0,0 @@ -:mod:`b2sdk.transferer.abstract` -- Downloader base class -========================================================= - -.. automodule:: b2sdk.transferer.abstract - :members: - :undoc-members: - :show-inheritance: - :special-members: __init__ diff --git a/doc/source/api/internal/transferer/file_metadata.rst b/doc/source/api/internal/transferer/file_metadata.rst deleted file mode 100644 index 751517c55..000000000 --- a/doc/source/api/internal/transferer/file_metadata.rst +++ /dev/null @@ -1,8 +0,0 @@ -:mod:`b2sdk.transferer.file_metadata` -======================================= - -.. automodule:: b2sdk.transferer.file_metadata - :members: - :undoc-members: - :show-inheritance: - :special-members: __init__ diff --git a/doc/source/api/internal/transferer/range.rst b/doc/source/api/internal/transferer/range.rst deleted file mode 100644 index fda6fe77b..000000000 --- a/doc/source/api/internal/transferer/range.rst +++ /dev/null @@ -1,8 +0,0 @@ -:mod:`b2sdk.transferer.range` -- transfer range toolkit -======================================================= - -.. automodule:: b2sdk.transferer.range - :members: - :undoc-members: - :show-inheritance: - :special-members: __init__ diff --git a/doc/source/api/internal/transferer/simple.rst b/doc/source/api/internal/transferer/simple.rst deleted file mode 100644 index 22321197b..000000000 --- a/doc/source/api/internal/transferer/simple.rst +++ /dev/null @@ -1,8 +0,0 @@ -:mod:`b2sdk.transferer.simple` -- SimpleDownloader -================================================== - -.. automodule:: b2sdk.transferer.simple - :members: - :undoc-members: - :show-inheritance: - :special-members: __init__ diff --git a/doc/source/api/internal/transferer/transferer.rst b/doc/source/api/internal/transferer/transferer.rst deleted file mode 100644 index 247fabd7a..000000000 --- a/doc/source/api/internal/transferer/transferer.rst +++ /dev/null @@ -1,8 +0,0 @@ -:mod:`b2sdk.transferer.transferer` -- Manager of downloaders -============================================================ - -.. automodule:: b2sdk.transferer.transferer - :members: - :undoc-members: - :show-inheritance: - :special-members: __init__ diff --git a/doc/source/api_reference.rst b/doc/source/api_reference.rst index 3acd718dd..5a4a34fa5 100644 --- a/doc/source/api_reference.rst +++ b/doc/source/api_reference.rst @@ -58,12 +58,19 @@ Internal API api/internal/sync/policy_manager api/internal/sync/scan_policies api/internal/sync/sync - api/internal/transferer/abstract - api/internal/transferer/file_metadata - api/internal/transferer/parallel - api/internal/transferer/range - api/internal/transferer/simple - api/internal/transferer/transferer + api/internal/transfer/inbound/downloader/abstract + api/internal/transfer/inbound/downloader/parallel + api/internal/transfer/inbound/downloader/range + api/internal/transfer/inbound/downloader/simple + api/internal/transfer/inbound/file_metadata + api/internal/transfer/inbound/download_manager + api/internal/transfer/outbound/copy_manager + api/internal/transfer/outbound/copy_source + api/internal/transfer/outbound/large_file_upload_state + api/internal/transfer/outbound/progress_reporter + api/internal/transfer/outbound/upload_manager + api/internal/transfer/outbound/upload_source_part + api/internal/transfer/emerger api/internal/upload_source api/internal/raw_simulator diff --git a/test/v0/test_bucket.py b/test/v0/test_bucket.py index 7f5804404..0e04e1f1c 100644 --- a/test/v0/test_bucket.py +++ b/test/v0/test_bucket.py @@ -38,7 +38,7 @@ from .deps import StubAccountInfo, RawSimulator, BucketSimulator, FakeResponse from .deps import ParallelDownloader from .deps import SimpleDownloader -from .deps import UploadSourceBytes +from .deps import UploadSourceBytes, UploadSourcePart from .deps import hex_sha1_of_bytes, TempDir try: @@ -156,14 +156,17 @@ def testThree(self): content_sha1 = hex_sha1_of_bytes(content) large_file_upload_state = mock.MagicMock() large_file_upload_state.has_error.return_value = False - self.bucket._upload_part( - file1.file_id, 1, (0, 11), UploadSourceBytes(content), large_file_upload_state + self.api.upload_manager.upload_part( + self.bucket_id, file1.file_id, UploadSourcePart(UploadSourceBytes(content), 0, 11, 1), + large_file_upload_state ) - self.bucket._upload_part( - file1.file_id, 2, (0, 11), UploadSourceBytes(content), large_file_upload_state + self.api.upload_manager.upload_part( + self.bucket_id, file1.file_id, UploadSourcePart(UploadSourceBytes(content), 0, 11, 2), + large_file_upload_state ) - self.bucket._upload_part( - file1.file_id, 3, (0, 11), UploadSourceBytes(content), large_file_upload_state + self.api.upload_manager.upload_part( + self.bucket_id, file1.file_id, UploadSourcePart(UploadSourceBytes(content), 0, 11, 3), + large_file_upload_state ) expected_parts = [ Part('9999', 1, 11, content_sha1), @@ -181,8 +184,9 @@ def test_error_in_state(self): large_file_upload_state = LargeFileUploadState(file_progress_listener) large_file_upload_state.set_error('test error') try: - self.bucket._upload_part( - file1.file_id, 1, (0, 11), UploadSourceBytes(content), large_file_upload_state + self.bucket.api.upload_manager.upload_part( + self.bucket.id_, file1.file_id, + UploadSourcePart(UploadSourceBytes(content), 0, 11, 1), large_file_upload_state ) self.fail('should have thrown') except AlreadyFailed: @@ -729,13 +733,13 @@ class TestDownloadDefault(DownloadTests, EmptyFileDownloadScenarioMixin, TestCas class TestDownloadSimple(DownloadTests, EmptyFileDownloadScenarioMixin, TestCaseWithBucket): def setUp(self): super(TestDownloadSimple, self).setUp() - self.bucket.api.transferer.strategies = [SimpleDownloader(force_chunk_size=20,)] + self.bucket.api.download_manager.strategies = [SimpleDownloader(force_chunk_size=20,)] class TestDownloadParallel(DownloadTests, TestCaseWithBucket): def setUp(self): super(TestDownloadParallel, self).setUp() - self.bucket.api.transferer.strategies = [ + self.bucket.api.download_manager.strategies = [ ParallelDownloader( force_chunk_size=2, max_streams=999, @@ -776,13 +780,13 @@ class TestCaseWithTruncatedDownloadBucket(TestCaseWithBucket): class TestTruncatedDownloadSimple(DownloadTests, TestCaseWithTruncatedDownloadBucket): def setUp(self): super(TestTruncatedDownloadSimple, self).setUp() - self.bucket.api.transferer.strategies = [SimpleDownloader(force_chunk_size=20,)] + self.bucket.api.download_manager.strategies = [SimpleDownloader(force_chunk_size=20,)] class TestTruncatedDownloadParallel(DownloadTests, TestCaseWithTruncatedDownloadBucket): def setUp(self): super(TestTruncatedDownloadParallel, self).setUp() - self.bucket.api.transferer.strategies = [ + self.bucket.api.download_manager.strategies = [ ParallelDownloader( force_chunk_size=3, max_streams=2, diff --git a/test/v0/test_session.py b/test/v0/test_session.py index ea68f196c..8ce0f365f 100644 --- a/test/v0/test_session.py +++ b/test/v0/test_session.py @@ -13,7 +13,6 @@ from .deps_exception import InvalidAuthToken, Unauthorized from .deps import ALL_CAPABILITIES from .deps import B2Session -from .deps import TokenType try: import unittest.mock as mock @@ -30,29 +29,28 @@ def setUp(self): self.api.account_info = self.account_info self.raw_api = mock.MagicMock() - self.raw_api.do_it.__name__ = 'do_it' - self.raw_api.do_it.token_type = TokenType.API - self.raw_api.do_it.side_effect = ['ok'] + self.raw_api.get_file_info.__name__ = 'get_file_info' + self.raw_api.get_file_info.side_effect = ['ok'] - self.session = B2Session(self.api, self.raw_api) + self.session = B2Session(self.account_info, raw_api=self.raw_api) def test_works_first_time(self): - self.assertEqual('ok', self.session.do_it()) + self.assertEqual('ok', self.session.get_file_info(None)) def test_works_second_time(self): - self.raw_api.do_it.side_effect = [ + self.raw_api.get_file_info.side_effect = [ InvalidAuthToken('message', 'code'), 'ok', ] - self.assertEqual('ok', self.session.do_it()) + self.assertEqual('ok', self.session.get_file_info(None)) def test_fails_second_time(self): - self.raw_api.do_it.side_effect = [ + self.raw_api.get_file_info.side_effect = [ InvalidAuthToken('message', 'code'), InvalidAuthToken('message', 'code'), ] with self.assertRaises(InvalidAuthToken): - self.session.do_it() + self.session.get_file_info(None) def test_app_key_info_no_info(self): self.account_info.get_allowed.return_value = dict( @@ -61,11 +59,11 @@ def test_app_key_info_no_info(self): capabilities=ALL_CAPABILITIES, namePrefix=None, ) - self.raw_api.do_it.side_effect = Unauthorized('no_go', 'code') + self.raw_api.get_file_info.side_effect = Unauthorized('no_go', 'code') with self.assertRaisesRegexp( Unauthorized, r'no_go for application key with no restrictions \(code\)' ): - self.session.do_it() + self.session.get_file_info(None) def test_app_key_info_no_info_no_message(self): self.account_info.get_allowed.return_value = dict( @@ -74,11 +72,11 @@ def test_app_key_info_no_info_no_message(self): capabilities=ALL_CAPABILITIES, namePrefix=None, ) - self.raw_api.do_it.side_effect = Unauthorized('', 'code') + self.raw_api.get_file_info.side_effect = Unauthorized('', 'code') with self.assertRaisesRegexp( Unauthorized, r'unauthorized for application key with no restrictions \(code\)' ): - self.session.do_it() + self.session.get_file_info(None) def test_app_key_info_all_info(self): self.account_info.get_allowed.return_value = dict( @@ -87,9 +85,9 @@ def test_app_key_info_all_info(self): capabilities=['readFiles'], namePrefix='prefix/', ) - self.raw_api.do_it.side_effect = Unauthorized('no_go', 'code') + self.raw_api.get_file_info.side_effect = Unauthorized('no_go', 'code') with self.assertRaisesRegexp( Unauthorized, r"no_go for application key with capabilities 'readFiles', restricted to bucket 'my-bucket', restricted to files that start with 'prefix/' \(code\)" ): - self.session.do_it() + self.session.get_file_info(None) diff --git a/test/v1/test_bucket.py b/test/v1/test_bucket.py index 5f6141ef1..e0d7921f8 100644 --- a/test/v1/test_bucket.py +++ b/test/v1/test_bucket.py @@ -38,7 +38,7 @@ from .deps import StubAccountInfo, RawSimulator, BucketSimulator, FakeResponse from .deps import ParallelDownloader from .deps import SimpleDownloader -from .deps import UploadSourceBytes +from .deps import UploadSourceBytes, UploadSourcePart from .deps import hex_sha1_of_bytes, TempDir try: @@ -156,14 +156,17 @@ def testThree(self): content_sha1 = hex_sha1_of_bytes(content) large_file_upload_state = mock.MagicMock() large_file_upload_state.has_error.return_value = False - self.bucket._upload_part( - file1.file_id, 1, (0, 11), UploadSourceBytes(content), large_file_upload_state + self.api.upload_manager.upload_part( + self.bucket_id, file1.file_id, UploadSourcePart(UploadSourceBytes(content), 0, 11, 1), + large_file_upload_state ) - self.bucket._upload_part( - file1.file_id, 2, (0, 11), UploadSourceBytes(content), large_file_upload_state + self.api.upload_manager.upload_part( + self.bucket_id, file1.file_id, UploadSourcePart(UploadSourceBytes(content), 0, 11, 2), + large_file_upload_state ) - self.bucket._upload_part( - file1.file_id, 3, (0, 11), UploadSourceBytes(content), large_file_upload_state + self.api.upload_manager.upload_part( + self.bucket_id, file1.file_id, UploadSourcePart(UploadSourceBytes(content), 0, 11, 3), + large_file_upload_state ) expected_parts = [ Part('9999', 1, 11, content_sha1), @@ -181,8 +184,9 @@ def test_error_in_state(self): large_file_upload_state = LargeFileUploadState(file_progress_listener) large_file_upload_state.set_error('test error') try: - self.bucket._upload_part( - file1.file_id, 1, (0, 11), UploadSourceBytes(content), large_file_upload_state + self.api.upload_manager.upload_part( + self.bucket_id, file1.file_id, + UploadSourcePart(UploadSourceBytes(content), 0, 11, 1), large_file_upload_state ) self.fail('should have thrown') except AlreadyFailed: @@ -736,13 +740,13 @@ class TestDownloadDefault(DownloadTests, EmptyFileDownloadScenarioMixin, TestCas class TestDownloadSimple(DownloadTests, EmptyFileDownloadScenarioMixin, TestCaseWithBucket): def setUp(self): super(TestDownloadSimple, self).setUp() - self.bucket.api.transferer.strategies = [SimpleDownloader(force_chunk_size=20,)] + self.bucket.api.download_manager.strategies = [SimpleDownloader(force_chunk_size=20,)] class TestDownloadParallel(DownloadTests, TestCaseWithBucket): def setUp(self): super(TestDownloadParallel, self).setUp() - self.bucket.api.transferer.strategies = [ + self.bucket.api.download_manager.strategies = [ ParallelDownloader( force_chunk_size=2, max_streams=999, @@ -783,13 +787,13 @@ class TestCaseWithTruncatedDownloadBucket(TestCaseWithBucket): class TestTruncatedDownloadSimple(DownloadTests, TestCaseWithTruncatedDownloadBucket): def setUp(self): super(TestTruncatedDownloadSimple, self).setUp() - self.bucket.api.transferer.strategies = [SimpleDownloader(force_chunk_size=20,)] + self.bucket.api.download_manager.strategies = [SimpleDownloader(force_chunk_size=20,)] class TestTruncatedDownloadParallel(DownloadTests, TestCaseWithTruncatedDownloadBucket): def setUp(self): super(TestTruncatedDownloadParallel, self).setUp() - self.bucket.api.transferer.strategies = [ + self.bucket.api.download_manager.strategies = [ ParallelDownloader( force_chunk_size=3, max_streams=2, diff --git a/test/v1/test_session.py b/test/v1/test_session.py index 3c3ff1f63..26cb8a5f9 100644 --- a/test/v1/test_session.py +++ b/test/v1/test_session.py @@ -13,7 +13,6 @@ from .deps_exception import InvalidAuthToken, Unauthorized from .deps import ALL_CAPABILITIES from .deps import B2Session -from .deps import TokenType try: import unittest.mock as mock @@ -30,29 +29,28 @@ def setUp(self): self.api.account_info = self.account_info self.raw_api = mock.MagicMock() - self.raw_api.do_it.__name__ = 'do_it' - self.raw_api.do_it.token_type = TokenType.API - self.raw_api.do_it.side_effect = ['ok'] + self.raw_api.get_file_info.__name__ = 'get_file_info' + self.raw_api.get_file_info.side_effect = ['ok'] - self.session = B2Session(self.api, self.raw_api) + self.session = B2Session(self.account_info, raw_api=self.raw_api) def test_works_first_time(self): - self.assertEqual('ok', self.session.do_it()) + self.assertEqual('ok', self.session.get_file_info(None)) def test_works_second_time(self): - self.raw_api.do_it.side_effect = [ + self.raw_api.get_file_info.side_effect = [ InvalidAuthToken('message', 'code'), 'ok', ] - self.assertEqual('ok', self.session.do_it()) + self.assertEqual('ok', self.session.get_file_info(None)) def test_fails_second_time(self): - self.raw_api.do_it.side_effect = [ + self.raw_api.get_file_info.side_effect = [ InvalidAuthToken('message', 'code'), InvalidAuthToken('message', 'code'), ] with self.assertRaises(InvalidAuthToken): - self.session.do_it() + self.session.get_file_info(None) def test_app_key_info_no_info(self): self.account_info.get_allowed.return_value = dict( @@ -61,11 +59,11 @@ def test_app_key_info_no_info(self): capabilities=ALL_CAPABILITIES, namePrefix=None, ) - self.raw_api.do_it.side_effect = Unauthorized('no_go', 'code') + self.raw_api.get_file_info.side_effect = Unauthorized('no_go', 'code') with self.assertRaisesRegexp( Unauthorized, r'no_go for application key with no restrictions \(code\)' ): - self.session.do_it() + self.session.get_file_info(None) def test_app_key_info_no_info_no_message(self): self.account_info.get_allowed.return_value = dict( @@ -74,11 +72,11 @@ def test_app_key_info_no_info_no_message(self): capabilities=ALL_CAPABILITIES, namePrefix=None, ) - self.raw_api.do_it.side_effect = Unauthorized('', 'code') + self.raw_api.get_file_info.side_effect = Unauthorized('', 'code') with self.assertRaisesRegexp( Unauthorized, r'unauthorized for application key with no restrictions \(code\)' ): - self.session.do_it() + self.session.get_file_info(None) def test_app_key_info_all_info(self): self.account_info.get_allowed.return_value = dict( @@ -87,9 +85,9 @@ def test_app_key_info_all_info(self): capabilities=['readFiles'], namePrefix='prefix/', ) - self.raw_api.do_it.side_effect = Unauthorized('no_go', 'code') + self.raw_api.get_file_info.side_effect = Unauthorized('no_go', 'code') with self.assertRaisesRegexp( Unauthorized, r"no_go for application key with capabilities 'readFiles', restricted to bucket 'my-bucket', restricted to files that start with 'prefix/' \(code\)" ): - self.session.do_it() + self.session.get_file_info(None)