From 66c478e9feced6772eec0d2afd1910b73bce59ae Mon Sep 17 00:00:00 2001 From: Aleksandr Goncharov Date: Fri, 6 May 2022 22:10:07 +0300 Subject: [PATCH 1/4] Refactor integration tests --- test/integration/bucket_cleaner.py | 66 +++- test/integration/conftest.py | 167 +++++++- test/integration/test_raw_api.py | 610 +++++++++++------------------ 3 files changed, 452 insertions(+), 391 deletions(-) diff --git a/test/integration/bucket_cleaner.py b/test/integration/bucket_cleaner.py index e1b1e5783..b863c4678 100644 --- a/test/integration/bucket_cleaner.py +++ b/test/integration/bucket_cleaner.py @@ -8,11 +8,17 @@ # ###################################################################### +import re +import time + from typing import Optional +from b2sdk.bucket import Bucket +from b2sdk.file_lock import NO_RETENTION_FILE_SETTING, LegalHold, RetentionMode +from b2sdk.utils import current_time_millis from b2sdk.v2 import * -from .helpers import GENERAL_BUCKET_NAME_PREFIX, BUCKET_CREATED_AT_MILLIS, authorize +from .helpers import BUCKET_CREATED_AT_MILLIS, GENERAL_BUCKET_NAME_PREFIX, authorize ONE_HOUR_MILLIS = 60 * 60 * 1000 @@ -88,3 +94,61 @@ def cleanup_buckets(self): else: print('Removing bucket:', bucket.name) b2_api.delete_bucket(bucket) + + +def _cleanup_old_buckets(raw_api, api_url, account_auth_token, account_id, bucket_list_dict): + for bucket_dict in bucket_list_dict['buckets']: + bucket_id = bucket_dict['bucketId'] + bucket_name = bucket_dict['bucketName'] + if _should_delete_bucket(bucket_name): + print('cleaning up old bucket: ' + bucket_name) + _clean_and_delete_bucket( + raw_api, + api_url, + account_auth_token, + account_id, + bucket_id, + ) + + +def _clean_and_delete_bucket(raw_api, api_url, account_auth_token, account_id, bucket_id): + # Delete the files. This test never creates more than a few files, + # so one call to list_file_versions should get them all. + versions_dict = raw_api.list_file_versions(api_url, account_auth_token, bucket_id) + for version_dict in versions_dict['files']: + file_id = version_dict['fileId'] + file_name = version_dict['fileName'] + action = version_dict['action'] + if action in ['hide', 'upload']: + print('b2_delete_file', file_name, action) + if action == 'upload' and version_dict[ + 'fileRetention'] and version_dict['fileRetention']['value']['mode'] is not None: + raw_api.update_file_retention( + api_url, + account_auth_token, + file_id, + file_name, + NO_RETENTION_FILE_SETTING, + bypass_governance=True + ) + raw_api.delete_file_version(api_url, account_auth_token, file_id, file_name) + else: + print('b2_cancel_large_file', file_name) + raw_api.cancel_large_file(api_url, account_auth_token, file_id) + + # Delete the bucket + print('b2_delete_bucket', bucket_id) + raw_api.delete_bucket(api_url, account_auth_token, account_id, bucket_id) + + +def _should_delete_bucket(bucket_name): + # Bucket names for this test look like: c7b22d0b0ad7-1460060364-5670 + # Other buckets should not be deleted. + match = re.match(r'^test-raw-api-[a-f0-9]+-([0-9]+)-([0-9]+)', bucket_name) + if match is None: + return False + + # Is it more than an hour old? + bucket_time = int(match.group(1)) + now = time.time() + return bucket_time + 3600 <= now diff --git a/test/integration/conftest.py b/test/integration/conftest.py index 7b2c2d316..1e7524ab9 100644 --- a/test/integration/conftest.py +++ b/test/integration/conftest.py @@ -8,8 +8,20 @@ # ###################################################################### +from io import BytesIO +from os import environ +from random import randint +from time import time + import pytest +from b2sdk.b2http import B2Http +from b2sdk.encryption.setting import EncryptionAlgorithm, EncryptionMode, EncryptionSetting +from b2sdk.raw_api import ALL_CAPABILITIES, REALM_URLS, B2RawHTTPApi +from b2sdk.utils import hex_sha1_of_stream + +from .bucket_cleaner import _clean_and_delete_bucket + def pytest_addoption(parser): """Add a flag for not cleaning up old buckets""" @@ -20,6 +32,159 @@ def pytest_addoption(parser): ) -@pytest.fixture +@pytest.fixture(scope='session') def dont_cleanup_old_buckets(request): return request.config.getoption("--dont-cleanup-old-buckets") + + +@pytest.fixture(scope='session') +def application_key_id() -> str: + key_id = environ.get('B2_TEST_APPLICATION_KEY_ID') + assert key_id + return key_id + + +@pytest.fixture(scope='session') +def application_key() -> str: + key = environ.get('B2_TEST_APPLICATION_KEY') + assert key + return key + + +@pytest.fixture(scope='module') +def raw_api() -> B2RawHTTPApi: + return B2RawHTTPApi(B2Http()) + + +@pytest.fixture(scope='session') +def realm_url() -> str: + realm = environ.get('B2_TEST_ENVIRONMENT', 'production') + return REALM_URLS.get(realm, realm) + + +@pytest.fixture(scope='module') +def auth_dict(raw_api, application_key, application_key_id, realm_url) -> dict: + result = raw_api.authorize_account(realm_url, application_key_id, application_key) + + missing_capabilities = set(ALL_CAPABILITIES) - \ + {'readBuckets', 'listAllBucketNames'} - set(result['allowed']['capabilities']) + assert not missing_capabilities, \ + f'Seems like a non-full key. Missing capabilities: {missing_capabilities}' + + return result + + +@pytest.fixture(scope='module') +def account_id(auth_dict) -> str: + return auth_dict['accountId'] + + +@pytest.fixture(scope='module') +def account_auth_token(auth_dict) -> str: + return auth_dict['authorizationToken'] + + +@pytest.fixture(scope='module') +def api_url(auth_dict) -> str: + return auth_dict['apiUrl'] + + +@pytest.fixture(scope='module') +def download_url(auth_dict) -> str: + return auth_dict['downloadUrl'] + + +@pytest.fixture(scope='module') +def bucket_dict(raw_api, api_url, account_auth_token, account_id) -> dict: + # include the account ID in the bucket name to be + # sure it doesn't collide with bucket names from + # other accounts. + bucket_name = f'test-raw-api-{account_id}-{time():.0f}-{randint(1000, 9999)}' + + bucket = raw_api.create_bucket( + api_url, + account_auth_token, + account_id, + bucket_name, + 'allPublic', + is_file_lock_enabled=True, + ) + yield bucket + + _clean_and_delete_bucket( + raw_api, + api_url, + account_auth_token, + account_id, + bucket['bucketId'], + ) + + +@pytest.fixture(scope='module') +def bucket_id(bucket_dict) -> str: + return bucket_dict['bucketId'] + + +@pytest.fixture(scope='module') +def bucket_name(bucket_dict) -> str: + return bucket_dict['bucketName'] + + +@pytest.fixture(scope='module') +def upload_url_dict(raw_api, api_url, account_auth_token, bucket_id) -> dict: + return raw_api.get_upload_url(api_url, account_auth_token, bucket_id) + + +@pytest.fixture(scope='module') +def file_name() -> str: + return 'test.txt' + + +@pytest.fixture(scope='module') +def file_contents() -> bytes: + return b'hello world' + + +@pytest.fixture(scope='session') +def sse_none() -> EncryptionSetting: + return EncryptionSetting(mode=EncryptionMode.NONE) + + +@pytest.fixture(scope='session') +def sse_b2_aes() -> EncryptionSetting: + return EncryptionSetting( + mode=EncryptionMode.SSE_B2, + algorithm=EncryptionAlgorithm.AES256, + ) + + +@pytest.fixture(scope='module') +def file_dict(raw_api, upload_url_dict, file_name, file_contents, sse_b2_aes) -> dict: + return raw_api.upload_file( + upload_url_dict['uploadUrl'], + upload_url_dict['authorizationToken'], + file_name, + len(file_contents), + 'text/plain', + hex_sha1_of_stream(BytesIO(file_contents), len(file_contents)), + {'color': 'blue'}, + BytesIO(file_contents), + server_side_encryption=sse_b2_aes, + ) + + +@pytest.fixture(scope='module') +def file_id(file_dict) -> str: + return file_dict['fileId'] + + +@pytest.fixture(scope='module') +def download_auth_dict(raw_api, api_url, account_auth_token, bucket_id, file_name) -> dict: + return raw_api.get_download_authorization( + api_url, account_auth_token, bucket_id, file_name[:-2], 12345, + ) + + +@pytest.fixture(scope='module') +def download_auth_token(download_auth_dict) -> str: + return download_auth_dict['authorizationToken'] diff --git a/test/integration/test_raw_api.py b/test/integration/test_raw_api.py index 798a62fbf..3f22a4888 100644 --- a/test/integration/test_raw_api.py +++ b/test/integration/test_raw_api.py @@ -8,101 +8,49 @@ # ###################################################################### import io -import os -import random -import re -import sys -import time -import traceback + +from random import randint +from time import time import pytest -from b2sdk.b2http import B2Http -from b2sdk.encryption.setting import EncryptionAlgorithm, EncryptionMode, EncryptionSetting -from b2sdk.replication.setting import ReplicationConfiguration, ReplicationSourceConfiguration, ReplicationRule, ReplicationDestinationConfiguration +from b2sdk.encryption.setting import EncryptionMode, EncryptionSetting +from b2sdk.file_lock import BucketRetentionSetting, RetentionMode, RetentionPeriod +from b2sdk.replication.setting import ReplicationConfiguration, ReplicationDestinationConfiguration, ReplicationRule, ReplicationSourceConfiguration from b2sdk.replication.types import ReplicationStatus -from b2sdk.file_lock import BucketRetentionSetting, NO_RETENTION_FILE_SETTING, RetentionMode, RetentionPeriod -from b2sdk.raw_api import ALL_CAPABILITIES, B2RawHTTPApi, REALM_URLS from b2sdk.utils import hex_sha1_of_stream +from .bucket_cleaner import _clean_and_delete_bucket, _cleanup_old_buckets -# TODO: rewrite to separate test cases -def test_raw_api(dont_cleanup_old_buckets): - """ - Exercise the code in B2RawHTTPApi by making each call once, just - to make sure the parameters are passed in, and the result is - passed back. - - The goal is to be a complete test of B2RawHTTPApi, so the tests for - the rest of the code can use the simulator. - - Prints to stdout if things go wrong. - - :return: 0 on success, non-zero on failure - """ - application_key_id = os.environ.get('B2_TEST_APPLICATION_KEY_ID') - if application_key_id is None: - pytest.fail('B2_TEST_APPLICATION_KEY_ID is not set.') - - application_key = os.environ.get('B2_TEST_APPLICATION_KEY') - if application_key is None: - pytest.fail('B2_TEST_APPLICATION_KEY is not set.') - - print() - - try: - raw_api = B2RawHTTPApi(B2Http()) - raw_api_test_helper(raw_api, not dont_cleanup_old_buckets) - except Exception: - traceback.print_exc(file=sys.stdout) - pytest.fail('test_raw_api failed') - - -def authorize_raw_api(raw_api): - application_key_id = os.environ.get('B2_TEST_APPLICATION_KEY_ID') - if application_key_id is None: - print('B2_TEST_APPLICATION_KEY_ID is not set.', file=sys.stderr) - sys.exit(1) - - application_key = os.environ.get('B2_TEST_APPLICATION_KEY') - if application_key is None: - print('B2_TEST_APPLICATION_KEY is not set.', file=sys.stderr) - sys.exit(1) - - realm = os.environ.get('B2_TEST_ENVIRONMENT', 'production') - realm_url = REALM_URLS.get(realm, realm) - auth_dict = raw_api.authorize_account(realm_url, application_key_id, application_key) - return auth_dict - - -def raw_api_test_helper(raw_api, should_cleanup_old_buckets): - """ - Try each of the calls to the raw api. Raise an - exception if anything goes wrong. - - This uses a Backblaze account that is just for this test. - The account uses the free level of service, which should - be enough to run this test a reasonable number of times - each day. If somebody abuses the account for other things, - this test will break and we'll have to do something about - it. - """ - # b2_authorize_account - print('b2_authorize_account') - auth_dict = authorize_raw_api(raw_api) - missing_capabilities = set(ALL_CAPABILITIES) - set(['readBuckets'] - ) - set(auth_dict['allowed']['capabilities']) - assert not missing_capabilities, 'it appears that the raw_api integration test is being run with a non-full key. Missing capabilities: %s' % ( - missing_capabilities, - ) +""" +Try each of the calls to the raw api. + +This uses a Backblaze account that is just for this test. +The account uses the free level of service, which should +be enough to run this test a reasonable number of times +each day. If somebody abuses the account for other things, +this test will break and we'll have to do something about +it. +""" + + +@pytest.fixture(scope='module', autouse=True) +def cleanup_buckets(raw_api, account_id, account_auth_token, api_url, dont_cleanup_old_buckets): + """ Remove all "stale" test buckets """ + + if dont_cleanup_old_buckets: + return + + bucket_list_dict = raw_api.list_buckets(api_url, account_auth_token, account_id) + + _cleanup_old_buckets(raw_api, api_url, account_auth_token, account_id, bucket_list_dict) - account_id = auth_dict['accountId'] - account_auth_token = auth_dict['authorizationToken'] - api_url = auth_dict['apiUrl'] - download_url = auth_dict['downloadUrl'] - # b2_create_key - print('b2_create_key') +def test_auth(auth_dict): + pass # `auth_dict` implicitly invokes testing + + +def test_keys(raw_api, api_url, account_auth_token, account_id): key_dict = raw_api.create_key( api_url, account_auth_token, @@ -113,65 +61,46 @@ def raw_api_test_helper(raw_api, should_cleanup_old_buckets): None, None, ) - - # b2_list_keys - print('b2_list_keys') raw_api.list_keys(api_url, account_auth_token, account_id, 10) - - # b2_delete_key - print('b2_delete_key') raw_api.delete_key(api_url, account_auth_token, key_dict['applicationKeyId']) - # b2_create_bucket, with a unique bucket name - # Include the account ID in the bucket name to be - # sure it doesn't collide with bucket names from - # other accounts. - print('b2_create_bucket') - bucket_name = 'test-raw-api-%s-%d-%d' % ( - account_id, int(time.time()), random.randint(1000, 9999) - ) - # very verbose http debug - #import http.client; http.client.HTTPConnection.debuglevel = 1 +def test_bucket_creation(bucket_dict): + pass # `bucket_dict` implicitly invokes testing - bucket_dict = raw_api.create_bucket( - api_url, - account_auth_token, - account_id, - bucket_name, - 'allPublic', - is_file_lock_enabled=True, - ) - bucket_id = bucket_dict['bucketId'] - first_bucket_revision = bucket_dict['revision'] - - ################################# - print('b2 / replication') - - # 1) create source key (read permissions) - replication_source_key_dict = raw_api.create_key( - api_url, - account_auth_token, - account_id, - ['listBuckets', 'listFiles', 'readFiles'], - 'testReplicationSourceKey', - None, - None, - None, - ) - replication_source_key = replication_source_key_dict['applicationKeyId'] - # 2) create source bucket with replication to destination - existing bucket - try: - # in order to test replication, we need to create a second bucket - replication_source_bucket_name = 'test-raw-api-%s-%d-%d' % ( - account_id, int(time.time()), random.randint(1000, 9999) +class TestReplication: + @pytest.fixture(scope='class') + def source_key_id(self, raw_api, api_url, account_auth_token, account_id): + key_dict = raw_api.create_key( + api_url, + account_auth_token, + account_id, + ['listBuckets', 'listFiles', 'readFiles'], + 'testReplicationSourceKey', + None, + None, + None, + ) + key_id = key_dict['applicationKeyId'] + yield key_id + raw_api.delete_key( + api_url, + account_auth_token, + key_id, ) - replication_source_bucket_dict = raw_api.create_bucket( + + @pytest.fixture(scope='class') + def source_bucket_dict( + self, raw_api, api_url, account_auth_token, account_id, source_key_id, bucket_id, + bucket_name, + ): + bucket_name = f'test-raw-api-{account_id}-{time():.0f}-{randint(1000, 9999)}' + bucket = raw_api.create_bucket( api_url, account_auth_token, account_id, - replication_source_bucket_name, + bucket_name, 'allPublic', is_file_lock_enabled=True, replication=ReplicationConfiguration( @@ -183,12 +112,45 @@ def raw_api_test_helper(raw_api, should_cleanup_old_buckets): name='test-rule', ), ], - source_application_key_id=replication_source_key, + source_application_key_id=source_key_id, ), ), ) - assert 'replicationConfiguration' in replication_source_bucket_dict - assert replication_source_bucket_dict['replicationConfiguration'] == { + yield bucket + _clean_and_delete_bucket( + raw_api, + api_url, + account_auth_token, + account_id, + bucket['bucketId'], + ) + + @pytest.fixture(scope='class') + def destination_key_id(self, raw_api, api_url, account_auth_token, account_id): + key_dict = raw_api.create_key( + api_url, + account_auth_token, + account_id, + ['listBuckets', 'listFiles', 'writeFiles'], + 'testReplicationDestinationKey', + None, + None, + None, + ) + key_id = key_dict['applicationKeyId'] + yield key_id + raw_api.delete_key( + api_url, + account_auth_token, + key_id, + ) + + def test_replication( + self, raw_api, api_url, account_auth_token, account_id, source_key_id, source_bucket_dict, + bucket_id, destination_key_id, + ): + assert 'replicationConfiguration' in source_bucket_dict + assert source_bucket_dict['replicationConfiguration'] == { 'isClientAuthorizedToRead': True, 'value': { @@ -205,17 +167,17 @@ def raw_api_test_helper(raw_api, should_cleanup_old_buckets): "replicationRuleName": "test-rule" }, ], - "sourceApplicationKeyId": replication_source_key, + "sourceApplicationKeyId": source_key_id, }, "asReplicationDestination": None, }, } - # 3) upload test file and check replication status + # upload test file and check replication status upload_url_dict = raw_api.get_upload_url( api_url, account_auth_token, - replication_source_bucket_dict['bucketId'], + source_bucket_dict['bucketId'], ) file_contents = b'hello world' file_dict = raw_api.upload_file( @@ -229,27 +191,11 @@ def raw_api_test_helper(raw_api, should_cleanup_old_buckets): io.BytesIO(file_contents), ) - assert ReplicationStatus[file_dict['replicationStatus'].upper() - ] == ReplicationStatus.PENDING - - finally: - raw_api.delete_key(api_url, account_auth_token, replication_source_key) - - # 4) create destination key (write permissions) - replication_destination_key_dict = raw_api.create_key( - api_url, - account_auth_token, - account_id, - ['listBuckets', 'listFiles', 'writeFiles'], - 'testReplicationDestinationKey', - None, - None, - None, - ) - replication_destination_key = replication_destination_key_dict['applicationKeyId'] + assert ReplicationStatus[ + file_dict['replicationStatus'].upper() + ] == ReplicationStatus.PENDING - # 5) update destination bucket to receive updates - try: + # update destination bucket to receive updates bucket_dict = raw_api.update_bucket( api_url, account_auth_token, @@ -258,9 +204,7 @@ def raw_api_test_helper(raw_api, should_cleanup_old_buckets): 'allPublic', replication=ReplicationConfiguration( as_replication_destination=ReplicationDestinationConfiguration( - source_to_destination_key_mapping={ - replication_source_key: replication_destination_key, - }, + source_to_destination_key_mapping={source_key_id: destination_key_id}, ), ), ) @@ -271,49 +215,27 @@ def raw_api_test_helper(raw_api, should_cleanup_old_buckets): 'asReplicationDestination': { 'sourceToDestinationKeyMapping': - { - replication_source_key: replication_destination_key, - }, + {source_key_id: destination_key_id}, }, 'asReplicationSource': None, }, } - finally: - raw_api.delete_key( + + bucket_dict = raw_api.update_bucket( api_url, account_auth_token, - replication_destination_key_dict['applicationKeyId'], + account_id, + bucket_id, + 'allPublic', + replication=ReplicationConfiguration(), ) + assert bucket_dict['replicationConfiguration'] == { + 'isClientAuthorizedToRead': True, + 'value': None, + } - # 6) cleanup: disable replication for destination and remove source - bucket_dict = raw_api.update_bucket( - api_url, - account_auth_token, - account_id, - bucket_id, - 'allPublic', - replication=ReplicationConfiguration(), - ) - assert bucket_dict['replicationConfiguration'] == { - 'isClientAuthorizedToRead': True, - 'value': None, - } - _clean_and_delete_bucket( - raw_api, - api_url, - account_auth_token, - account_id, - replication_source_bucket_dict['bucketId'], - ) - - ################# - print('b2_update_bucket') - sse_b2_aes = EncryptionSetting( - mode=EncryptionMode.SSE_B2, - algorithm=EncryptionAlgorithm.AES256, - ) - sse_none = EncryptionSetting(mode=EncryptionMode.NONE) +def test_update_bucket(raw_api, api_url, account_auth_token, account_id, bucket_id, sse_none, sse_b2_aes): for encryption_setting, default_retention in [ ( sse_none, @@ -322,7 +244,7 @@ def raw_api_test_helper(raw_api, should_cleanup_old_buckets): (sse_b2_aes, None), (sse_b2_aes, BucketRetentionSetting(RetentionMode.NONE)), ]: - bucket_dict = raw_api.update_bucket( + raw_api.update_bucket( api_url, account_auth_token, account_id, @@ -332,182 +254,166 @@ def raw_api_test_helper(raw_api, should_cleanup_old_buckets): default_retention=default_retention, ) - # b2_list_buckets - print('b2_list_buckets') - bucket_list_dict = raw_api.list_buckets(api_url, account_auth_token, account_id) - #print(bucket_list_dict) - - # b2_get_upload_url - print('b2_get_upload_url') - upload_url_dict = raw_api.get_upload_url(api_url, account_auth_token, bucket_id) - upload_url = upload_url_dict['uploadUrl'] - upload_auth_token = upload_url_dict['authorizationToken'] - - # b2_upload_file - print('b2_upload_file') - file_name = 'test.txt' - file_contents = b'hello world' - file_sha1 = hex_sha1_of_stream(io.BytesIO(file_contents), len(file_contents)) - file_dict = raw_api.upload_file( - upload_url, - upload_auth_token, - file_name, - len(file_contents), - 'text/plain', - file_sha1, - {'color': 'blue'}, - io.BytesIO(file_contents), - server_side_encryption=sse_b2_aes, - ) - file_id = file_dict['fileId'] +def test_list_buckets(raw_api, api_url, account_auth_token, account_id): + raw_api.list_buckets(api_url, account_auth_token, account_id) - # b2_list_file_versions - print('b2_list_file_versions') + +def test_upload_url(upload_url_dict): + pass + + +def test_file_upload(file_dict): + pass + + +def test_list_file_versions(raw_api, api_url, account_auth_token, bucket_id, file_dict, file_name): list_versions_dict = raw_api.list_file_versions(api_url, account_auth_token, bucket_id) assert [file_name] == [f_dict['fileName'] for f_dict in list_versions_dict['files']] - # b2_download_file_by_id with auth - print('b2_download_file_by_id (auth)') - url = raw_api.get_download_url_by_id(download_url, file_id) + +def test_download_file_by_id_with_auth(raw_api, download_url, file_dict, account_auth_token, file_contents): + url = raw_api.get_download_url_by_id(download_url, file_dict['fileId']) with raw_api.download_file_from_url(account_auth_token, url) as response: data = next(response.iter_content(chunk_size=len(file_contents))) assert data == file_contents, data - # b2_download_file_by_id no auth - print('b2_download_file_by_id (no auth)') - url = raw_api.get_download_url_by_id(download_url, file_id) + +def test_download_file_by_id_no_auth(raw_api, download_url, file_dict, file_contents): + url = raw_api.get_download_url_by_id(download_url, file_dict['fileId']) with raw_api.download_file_from_url(None, url) as response: data = next(response.iter_content(chunk_size=len(file_contents))) assert data == file_contents, data - # b2_download_file_by_name with auth - print('b2_download_file_by_name (auth)') + +def test_download_file_by_name_with_auth(raw_api, download_url, file_name, bucket_name, account_auth_token, file_contents): url = raw_api.get_download_url_by_name(download_url, bucket_name, file_name) with raw_api.download_file_from_url(account_auth_token, url) as response: data = next(response.iter_content(chunk_size=len(file_contents))) assert data == file_contents, data - # b2_download_file_by_name no auth - print('b2_download_file_by_name (no auth)') + +def test_download_file_by_name_no_auth(raw_api, download_url, file_name, bucket_name, file_contents): url = raw_api.get_download_url_by_name(download_url, bucket_name, file_name) with raw_api.download_file_from_url(None, url) as response: data = next(response.iter_content(chunk_size=len(file_contents))) assert data == file_contents, data - # b2_get_download_authorization - print('b2_get_download_authorization') - download_auth = raw_api.get_download_authorization( - api_url, account_auth_token, bucket_id, file_name[:-2], 12345 - ) - download_auth_token = download_auth['authorizationToken'] - # b2_download_file_by_name with download auth - print('b2_download_file_by_name (download auth)') +def test_get_download_authorization(download_auth_dict): + pass + + +def test_download_file_by_name_download_auth(download_auth_dict, raw_api, download_url, bucket_name, file_name, file_contents): + download_auth_token = download_auth_dict['authorizationToken'] url = raw_api.get_download_url_by_name(download_url, bucket_name, file_name) with raw_api.download_file_from_url(download_auth_token, url) as response: data = next(response.iter_content(chunk_size=len(file_contents))) assert data == file_contents, data - # b2_list_file_names - print('b2_list_file_names') + +def test_list_file_names(raw_api, api_url, account_auth_token, bucket_id, file_name): list_names_dict = raw_api.list_file_names(api_url, account_auth_token, bucket_id) assert [file_name] == [f_dict['fileName'] for f_dict in list_names_dict['files']] - # b2_list_file_names (start, count) - print('b2_list_file_names (start, count)') + +def test_list_file_names_start_count(raw_api, api_url, account_auth_token, bucket_id, file_name): list_names_dict = raw_api.list_file_names( api_url, account_auth_token, bucket_id, start_file_name=file_name, max_file_count=5 ) assert [file_name] == [f_dict['fileName'] for f_dict in list_names_dict['files']] - # b2_copy_file - print('b2_copy_file') + +def test_copy_file(raw_api, api_url, account_auth_token, file_id): copy_file_name = 'test_copy.txt' raw_api.copy_file(api_url, account_auth_token, file_id, copy_file_name) - # b2_get_file_info_by_id - print('b2_get_file_info_by_id') + +def test_get_file_info_by_id(raw_api, api_url, account_auth_token, file_id, file_name): file_info_dict = raw_api.get_file_info_by_id(api_url, account_auth_token, file_id) assert file_info_dict['fileName'] == file_name - # b2_get_file_info_by_name - print('b2_get_file_info_by_name (no auth)') + +def test_get_file_info_by_name_no_auth(raw_api, api_url, account_id, bucket_name, file_name, file_id, download_url): info_headers = raw_api.get_file_info_by_name(download_url, None, bucket_name, file_name) assert info_headers['x-bz-file-id'] == file_id - # b2_get_file_info_by_name - print('b2_get_file_info_by_name (auth)') + +def test_get_file_info_by_name_with_auth(raw_api, download_url, account_auth_token, bucket_name, file_name, file_id): info_headers = raw_api.get_file_info_by_name( download_url, account_auth_token, bucket_name, file_name ) assert info_headers['x-bz-file-id'] == file_id - # b2_get_file_info_by_name - print('b2_get_file_info_by_name (download auth)') + +def test_get_file_info_by_name_download_auth(raw_api, download_url, download_auth_token, bucket_name, file_name, file_id): info_headers = raw_api.get_file_info_by_name( download_url, download_auth_token, bucket_name, file_name ) assert info_headers['x-bz-file-id'] == file_id - # b2_hide_file - print('b2_hide_file') + +def test_hide_file(raw_api, api_url, account_auth_token, bucket_id, file_name): raw_api.hide_file(api_url, account_auth_token, bucket_id, file_name) - # b2_start_large_file - print('b2_start_large_file') - file_info = {'color': 'red'} - large_info = raw_api.start_large_file( - api_url, - account_auth_token, - bucket_id, - file_name, - 'text/plain', - file_info, - server_side_encryption=sse_b2_aes, - ) - large_file_id = large_info['fileId'] - - # b2_get_upload_part_url - print('b2_get_upload_part_url') - upload_part_dict = raw_api.get_upload_part_url(api_url, account_auth_token, large_file_id) - upload_part_url = upload_part_dict['uploadUrl'] - upload_path_auth = upload_part_dict['authorizationToken'] - - # b2_upload_part - print('b2_upload_part') - part_contents = b'hello part' - part_sha1 = hex_sha1_of_stream(io.BytesIO(part_contents), len(part_contents)) - raw_api.upload_part( - upload_part_url, upload_path_auth, 1, len(part_contents), part_sha1, - io.BytesIO(part_contents) - ) - # b2_copy_part - print('b2_copy_part') - raw_api.copy_part(api_url, account_auth_token, file_id, large_file_id, 2, (0, 5)) - - # b2_list_parts - print('b2_list_parts') - parts_response = raw_api.list_parts(api_url, account_auth_token, large_file_id, 1, 100) - assert [1, 2] == [part['partNumber'] for part in parts_response['parts']] - - # b2_list_unfinished_large_files - unfinished_list = raw_api.list_unfinished_large_files(api_url, account_auth_token, bucket_id) - assert [file_name] == [f_dict['fileName'] for f_dict in unfinished_list['files']] - assert file_info == unfinished_list['files'][0]['fileInfo'] - - # b2_finish_large_file - print('b2_finish_large_file') - try: - raw_api.finish_large_file(api_url, account_auth_token, large_file_id, [part_sha1]) - raise Exception('finish should have failed') - except Exception as e: - assert 'large files must have at least 2 parts' in str(e) +class TestLargeFile: + + @pytest.fixture(scope='class') + def file_info(self) -> dict: + return {'color': 'red'} + + @pytest.fixture(scope='class') + def large_file_id(self, raw_api, api_url, account_auth_token, bucket_id, file_name, sse_b2_aes, file_info) -> str: + large_info = raw_api.start_large_file( + api_url, + account_auth_token, + bucket_id, + file_name, + 'text/plain', + file_info, + server_side_encryption=sse_b2_aes, + ) + return large_info['fileId'] + + @pytest.fixture(scope='class') + def part_contents(self) -> bytes: + return b'hello part' + + @pytest.fixture(scope='class') + def part_sha1(self, part_contents) -> bytes: + return hex_sha1_of_stream(io.BytesIO(part_contents), len(part_contents)) + + def test_upload_part(self, raw_api, api_url, account_auth_token, large_file_id, file_contents, part_contents, part_sha1): + upload_part_dict = raw_api.get_upload_part_url(api_url, account_auth_token, large_file_id) + upload_part_url = upload_part_dict['uploadUrl'] + upload_path_auth = upload_part_dict['authorizationToken'] + + raw_api.upload_part( + upload_part_url, upload_path_auth, 1, len(part_contents), part_sha1, + io.BytesIO(part_contents) + ) + + def test_copy_part(self, raw_api, api_url, account_auth_token, file_id, large_file_id): + raw_api.copy_part(api_url, account_auth_token, file_id, large_file_id, 2, (0, 5)) + + def test_list_parts(self, raw_api, api_url, account_auth_token, large_file_id): + parts_response = raw_api.list_parts(api_url, account_auth_token, large_file_id, 1, 100) + assert [1, 2] == [part['partNumber'] for part in parts_response['parts']] + + def test_list_unfinished_large_files(self, raw_api, api_url, account_auth_token, bucket_id, file_name, file_info): + unfinished_list = raw_api.list_unfinished_large_files(api_url, account_auth_token, bucket_id) + assert [file_name] == [f_dict['fileName'] for f_dict in unfinished_list['files']] + assert file_info == unfinished_list['files'][0]['fileInfo'] + + def test_finish_large_file(self, raw_api, api_url, account_auth_token, large_file_id, part_sha1): + with pytest.raises(Exception, match='large files must have at least 2 parts'): + raw_api.finish_large_file(api_url, account_auth_token, large_file_id, [part_sha1]) + # TODO: make another attempt to finish but this time successfully - # b2_update_bucket - print('b2_update_bucket') + +def test_update_bucket_revision(raw_api, api_url, account_auth_token, bucket_dict, bucket_id, account_id): updated_bucket = raw_api.update_bucket( api_url, account_auth_token, @@ -519,78 +425,4 @@ def raw_api_test_helper(raw_api, should_cleanup_old_buckets): mode=RetentionMode.GOVERNANCE, period=RetentionPeriod(days=1) ), ) - assert first_bucket_revision < updated_bucket['revision'] - - # Clean up this test. - _clean_and_delete_bucket(raw_api, api_url, account_auth_token, account_id, bucket_id) - - if should_cleanup_old_buckets: - # Clean up from old tests. Empty and delete any buckets more than an hour old. - _cleanup_old_buckets(raw_api, auth_dict, bucket_list_dict) - - -def cleanup_old_buckets(): - raw_api = B2RawHTTPApi(B2Http()) - auth_dict = authorize_raw_api(raw_api) - bucket_list_dict = raw_api.list_buckets( - auth_dict['apiUrl'], auth_dict['authorizationToken'], auth_dict['accountId'] - ) - _cleanup_old_buckets(raw_api, auth_dict, bucket_list_dict) - - -def _cleanup_old_buckets(raw_api, auth_dict, bucket_list_dict): - for bucket_dict in bucket_list_dict['buckets']: - bucket_id = bucket_dict['bucketId'] - bucket_name = bucket_dict['bucketName'] - if _should_delete_bucket(bucket_name): - print('cleaning up old bucket: ' + bucket_name) - _clean_and_delete_bucket( - raw_api, - auth_dict['apiUrl'], - auth_dict['authorizationToken'], - auth_dict['accountId'], - bucket_id, - ) - - -def _clean_and_delete_bucket(raw_api, api_url, account_auth_token, account_id, bucket_id): - # Delete the files. This test never creates more than a few files, - # so one call to list_file_versions should get them all. - versions_dict = raw_api.list_file_versions(api_url, account_auth_token, bucket_id) - for version_dict in versions_dict['files']: - file_id = version_dict['fileId'] - file_name = version_dict['fileName'] - action = version_dict['action'] - if action in ['hide', 'upload']: - print('b2_delete_file', file_name, action) - if action == 'upload' and version_dict[ - 'fileRetention'] and version_dict['fileRetention']['value']['mode'] is not None: - raw_api.update_file_retention( - api_url, - account_auth_token, - file_id, - file_name, - NO_RETENTION_FILE_SETTING, - bypass_governance=True - ) - raw_api.delete_file_version(api_url, account_auth_token, file_id, file_name) - else: - print('b2_cancel_large_file', file_name) - raw_api.cancel_large_file(api_url, account_auth_token, file_id) - - # Delete the bucket - print('b2_delete_bucket', bucket_id) - raw_api.delete_bucket(api_url, account_auth_token, account_id, bucket_id) - - -def _should_delete_bucket(bucket_name): - # Bucket names for this test look like: c7b22d0b0ad7-1460060364-5670 - # Other buckets should not be deleted. - match = re.match(r'^test-raw-api-[a-f0-9]+-([0-9]+)-([0-9]+)', bucket_name) - if match is None: - return False - - # Is it more than an hour old? - bucket_time = int(match.group(1)) - now = time.time() - return bucket_time + 3600 <= now + assert bucket_dict['revision'] < updated_bucket['revision'] From cc87ad4b70650207a9f329d7954f26233b7e0b2b Mon Sep 17 00:00:00 2001 From: Aleksandr Goncharov Date: Fri, 6 May 2022 22:14:11 +0300 Subject: [PATCH 2/4] Yapf fix --- test/integration/bucket_cleaner.py | 4 +- test/integration/conftest.py | 6 +- test/integration/test_raw_api.py | 88 ++++++++++++++++++++++-------- 3 files changed, 71 insertions(+), 27 deletions(-) diff --git a/test/integration/bucket_cleaner.py b/test/integration/bucket_cleaner.py index b863c4678..d54f2156b 100644 --- a/test/integration/bucket_cleaner.py +++ b/test/integration/bucket_cleaner.py @@ -96,7 +96,7 @@ def cleanup_buckets(self): b2_api.delete_bucket(bucket) -def _cleanup_old_buckets(raw_api, api_url, account_auth_token, account_id, bucket_list_dict): +def _cleanup_old_buckets(raw_api, api_url, account_auth_token, account_id, bucket_list_dict): for bucket_dict in bucket_list_dict['buckets']: bucket_id = bucket_dict['bucketId'] bucket_name = bucket_dict['bucketName'] @@ -122,7 +122,7 @@ def _clean_and_delete_bucket(raw_api, api_url, account_auth_token, account_id, b if action in ['hide', 'upload']: print('b2_delete_file', file_name, action) if action == 'upload' and version_dict[ - 'fileRetention'] and version_dict['fileRetention']['value']['mode'] is not None: + 'fileRetention'] and version_dict['fileRetention']['value']['mode'] is not None: raw_api.update_file_retention( api_url, account_auth_token, diff --git a/test/integration/conftest.py b/test/integration/conftest.py index 1e7524ab9..9c27f81f1 100644 --- a/test/integration/conftest.py +++ b/test/integration/conftest.py @@ -181,7 +181,11 @@ def file_id(file_dict) -> str: @pytest.fixture(scope='module') def download_auth_dict(raw_api, api_url, account_auth_token, bucket_id, file_name) -> dict: return raw_api.get_download_authorization( - api_url, account_auth_token, bucket_id, file_name[:-2], 12345, + api_url, + account_auth_token, + bucket_id, + file_name[:-2], + 12345, ) diff --git a/test/integration/test_raw_api.py b/test/integration/test_raw_api.py index 3f22a4888..9416ef4dc 100644 --- a/test/integration/test_raw_api.py +++ b/test/integration/test_raw_api.py @@ -21,7 +21,6 @@ from b2sdk.utils import hex_sha1_of_stream from .bucket_cleaner import _clean_and_delete_bucket, _cleanup_old_buckets - """ Try each of the calls to the raw api. @@ -92,7 +91,13 @@ def source_key_id(self, raw_api, api_url, account_auth_token, account_id): @pytest.fixture(scope='class') def source_bucket_dict( - self, raw_api, api_url, account_auth_token, account_id, source_key_id, bucket_id, + self, + raw_api, + api_url, + account_auth_token, + account_id, + source_key_id, + bucket_id, bucket_name, ): bucket_name = f'test-raw-api-{account_id}-{time():.0f}-{randint(1000, 9999)}' @@ -146,8 +151,15 @@ def destination_key_id(self, raw_api, api_url, account_auth_token, account_id): ) def test_replication( - self, raw_api, api_url, account_auth_token, account_id, source_key_id, source_bucket_dict, - bucket_id, destination_key_id, + self, + raw_api, + api_url, + account_auth_token, + account_id, + source_key_id, + source_bucket_dict, + bucket_id, + destination_key_id, ): assert 'replicationConfiguration' in source_bucket_dict assert source_bucket_dict['replicationConfiguration'] == { @@ -191,9 +203,8 @@ def test_replication( io.BytesIO(file_contents), ) - assert ReplicationStatus[ - file_dict['replicationStatus'].upper() - ] == ReplicationStatus.PENDING + assert ReplicationStatus[file_dict['replicationStatus'].upper() + ] == ReplicationStatus.PENDING # update destination bucket to receive updates bucket_dict = raw_api.update_bucket( @@ -214,8 +225,9 @@ def test_replication( { 'asReplicationDestination': { - 'sourceToDestinationKeyMapping': - {source_key_id: destination_key_id}, + 'sourceToDestinationKeyMapping': { + source_key_id: destination_key_id + }, }, 'asReplicationSource': None, }, @@ -235,7 +247,9 @@ def test_replication( } -def test_update_bucket(raw_api, api_url, account_auth_token, account_id, bucket_id, sse_none, sse_b2_aes): +def test_update_bucket( + raw_api, api_url, account_auth_token, account_id, bucket_id, sse_none, sse_b2_aes +): for encryption_setting, default_retention in [ ( sse_none, @@ -272,7 +286,9 @@ def test_list_file_versions(raw_api, api_url, account_auth_token, bucket_id, fil assert [file_name] == [f_dict['fileName'] for f_dict in list_versions_dict['files']] -def test_download_file_by_id_with_auth(raw_api, download_url, file_dict, account_auth_token, file_contents): +def test_download_file_by_id_with_auth( + raw_api, download_url, file_dict, account_auth_token, file_contents +): url = raw_api.get_download_url_by_id(download_url, file_dict['fileId']) with raw_api.download_file_from_url(account_auth_token, url) as response: data = next(response.iter_content(chunk_size=len(file_contents))) @@ -286,14 +302,18 @@ def test_download_file_by_id_no_auth(raw_api, download_url, file_dict, file_cont assert data == file_contents, data -def test_download_file_by_name_with_auth(raw_api, download_url, file_name, bucket_name, account_auth_token, file_contents): +def test_download_file_by_name_with_auth( + raw_api, download_url, file_name, bucket_name, account_auth_token, file_contents +): url = raw_api.get_download_url_by_name(download_url, bucket_name, file_name) with raw_api.download_file_from_url(account_auth_token, url) as response: data = next(response.iter_content(chunk_size=len(file_contents))) assert data == file_contents, data -def test_download_file_by_name_no_auth(raw_api, download_url, file_name, bucket_name, file_contents): +def test_download_file_by_name_no_auth( + raw_api, download_url, file_name, bucket_name, file_contents +): url = raw_api.get_download_url_by_name(download_url, bucket_name, file_name) with raw_api.download_file_from_url(None, url) as response: data = next(response.iter_content(chunk_size=len(file_contents))) @@ -304,7 +324,9 @@ def test_get_download_authorization(download_auth_dict): pass -def test_download_file_by_name_download_auth(download_auth_dict, raw_api, download_url, bucket_name, file_name, file_contents): +def test_download_file_by_name_download_auth( + download_auth_dict, raw_api, download_url, bucket_name, file_name, file_contents +): download_auth_token = download_auth_dict['authorizationToken'] url = raw_api.get_download_url_by_name(download_url, bucket_name, file_name) with raw_api.download_file_from_url(download_auth_token, url) as response: @@ -334,19 +356,25 @@ def test_get_file_info_by_id(raw_api, api_url, account_auth_token, file_id, file assert file_info_dict['fileName'] == file_name -def test_get_file_info_by_name_no_auth(raw_api, api_url, account_id, bucket_name, file_name, file_id, download_url): +def test_get_file_info_by_name_no_auth( + raw_api, api_url, account_id, bucket_name, file_name, file_id, download_url +): info_headers = raw_api.get_file_info_by_name(download_url, None, bucket_name, file_name) assert info_headers['x-bz-file-id'] == file_id -def test_get_file_info_by_name_with_auth(raw_api, download_url, account_auth_token, bucket_name, file_name, file_id): +def test_get_file_info_by_name_with_auth( + raw_api, download_url, account_auth_token, bucket_name, file_name, file_id +): info_headers = raw_api.get_file_info_by_name( download_url, account_auth_token, bucket_name, file_name ) assert info_headers['x-bz-file-id'] == file_id -def test_get_file_info_by_name_download_auth(raw_api, download_url, download_auth_token, bucket_name, file_name, file_id): +def test_get_file_info_by_name_download_auth( + raw_api, download_url, download_auth_token, bucket_name, file_name, file_id +): info_headers = raw_api.get_file_info_by_name( download_url, download_auth_token, bucket_name, file_name ) @@ -358,13 +386,14 @@ def test_hide_file(raw_api, api_url, account_auth_token, bucket_id, file_name): class TestLargeFile: - @pytest.fixture(scope='class') def file_info(self) -> dict: return {'color': 'red'} @pytest.fixture(scope='class') - def large_file_id(self, raw_api, api_url, account_auth_token, bucket_id, file_name, sse_b2_aes, file_info) -> str: + def large_file_id( + self, raw_api, api_url, account_auth_token, bucket_id, file_name, sse_b2_aes, file_info + ) -> str: large_info = raw_api.start_large_file( api_url, account_auth_token, @@ -384,7 +413,10 @@ def part_contents(self) -> bytes: def part_sha1(self, part_contents) -> bytes: return hex_sha1_of_stream(io.BytesIO(part_contents), len(part_contents)) - def test_upload_part(self, raw_api, api_url, account_auth_token, large_file_id, file_contents, part_contents, part_sha1): + def test_upload_part( + self, raw_api, api_url, account_auth_token, large_file_id, file_contents, part_contents, + part_sha1 + ): upload_part_dict = raw_api.get_upload_part_url(api_url, account_auth_token, large_file_id) upload_part_url = upload_part_dict['uploadUrl'] upload_path_auth = upload_part_dict['authorizationToken'] @@ -401,19 +433,27 @@ def test_list_parts(self, raw_api, api_url, account_auth_token, large_file_id): parts_response = raw_api.list_parts(api_url, account_auth_token, large_file_id, 1, 100) assert [1, 2] == [part['partNumber'] for part in parts_response['parts']] - def test_list_unfinished_large_files(self, raw_api, api_url, account_auth_token, bucket_id, file_name, file_info): - unfinished_list = raw_api.list_unfinished_large_files(api_url, account_auth_token, bucket_id) + def test_list_unfinished_large_files( + self, raw_api, api_url, account_auth_token, bucket_id, file_name, file_info + ): + unfinished_list = raw_api.list_unfinished_large_files( + api_url, account_auth_token, bucket_id + ) assert [file_name] == [f_dict['fileName'] for f_dict in unfinished_list['files']] assert file_info == unfinished_list['files'][0]['fileInfo'] - def test_finish_large_file(self, raw_api, api_url, account_auth_token, large_file_id, part_sha1): + def test_finish_large_file( + self, raw_api, api_url, account_auth_token, large_file_id, part_sha1 + ): with pytest.raises(Exception, match='large files must have at least 2 parts'): raw_api.finish_large_file(api_url, account_auth_token, large_file_id, [part_sha1]) # TODO: make another attempt to finish but this time successfully -def test_update_bucket_revision(raw_api, api_url, account_auth_token, bucket_dict, bucket_id, account_id): +def test_update_bucket_revision( + raw_api, api_url, account_auth_token, bucket_dict, bucket_id, account_id +): updated_bucket = raw_api.update_bucket( api_url, account_auth_token, From 4b7b73d71a6a7f19caf9d604380d824d34ba2f60 Mon Sep 17 00:00:00 2001 From: Aleksandr Goncharov Date: Tue, 17 May 2022 14:19:40 +0300 Subject: [PATCH 3/4] Replace small fixtures with constants --- test/integration/conftest.py | 37 +++-------- test/integration/test_raw_api.py | 102 ++++++++++++++++--------------- 2 files changed, 61 insertions(+), 78 deletions(-) diff --git a/test/integration/conftest.py b/test/integration/conftest.py index 9c27f81f1..0cd9a99b4 100644 --- a/test/integration/conftest.py +++ b/test/integration/conftest.py @@ -135,40 +135,21 @@ def upload_url_dict(raw_api, api_url, account_auth_token, bucket_id) -> dict: return raw_api.get_upload_url(api_url, account_auth_token, bucket_id) -@pytest.fixture(scope='module') -def file_name() -> str: - return 'test.txt' - - -@pytest.fixture(scope='module') -def file_contents() -> bytes: - return b'hello world' - - -@pytest.fixture(scope='session') -def sse_none() -> EncryptionSetting: - return EncryptionSetting(mode=EncryptionMode.NONE) - - -@pytest.fixture(scope='session') -def sse_b2_aes() -> EncryptionSetting: - return EncryptionSetting( - mode=EncryptionMode.SSE_B2, - algorithm=EncryptionAlgorithm.AES256, - ) +FILE_NAME = 'test.txt' +FILE_CONTENTS = b'hello world' @pytest.fixture(scope='module') -def file_dict(raw_api, upload_url_dict, file_name, file_contents, sse_b2_aes) -> dict: +def file_dict(raw_api, upload_url_dict, sse_b2_aes) -> dict: return raw_api.upload_file( upload_url_dict['uploadUrl'], upload_url_dict['authorizationToken'], - file_name, - len(file_contents), + FILE_NAME, + len(FILE_CONTENTS), 'text/plain', - hex_sha1_of_stream(BytesIO(file_contents), len(file_contents)), + hex_sha1_of_stream(BytesIO(FILE_CONTENTS), len(FILE_CONTENTS)), {'color': 'blue'}, - BytesIO(file_contents), + BytesIO(FILE_CONTENTS), server_side_encryption=sse_b2_aes, ) @@ -179,12 +160,12 @@ def file_id(file_dict) -> str: @pytest.fixture(scope='module') -def download_auth_dict(raw_api, api_url, account_auth_token, bucket_id, file_name) -> dict: +def download_auth_dict(raw_api, api_url, account_auth_token, bucket_id) -> dict: return raw_api.get_download_authorization( api_url, account_auth_token, bucket_id, - file_name[:-2], + FILE_NAME[:-2], 12345, ) diff --git a/test/integration/test_raw_api.py b/test/integration/test_raw_api.py index 9416ef4dc..93a47715f 100644 --- a/test/integration/test_raw_api.py +++ b/test/integration/test_raw_api.py @@ -14,13 +14,16 @@ import pytest -from b2sdk.encryption.setting import EncryptionMode, EncryptionSetting +from b2sdk.encryption.setting import SSE_B2_AES, SSE_NONE from b2sdk.file_lock import BucketRetentionSetting, RetentionMode, RetentionPeriod from b2sdk.replication.setting import ReplicationConfiguration, ReplicationDestinationConfiguration, ReplicationRule, ReplicationSourceConfiguration from b2sdk.replication.types import ReplicationStatus from b2sdk.utils import hex_sha1_of_stream from .bucket_cleaner import _clean_and_delete_bucket, _cleanup_old_buckets +from .conftest import FILE_CONTENTS, FILE_NAME + + """ Try each of the calls to the raw api. @@ -191,16 +194,15 @@ def test_replication( account_auth_token, source_bucket_dict['bucketId'], ) - file_contents = b'hello world' file_dict = raw_api.upload_file( upload_url_dict['uploadUrl'], upload_url_dict['authorizationToken'], 'test.txt', - len(file_contents), + len(FILE_CONTENTS), 'text/plain', - hex_sha1_of_stream(io.BytesIO(file_contents), len(file_contents)), + hex_sha1_of_stream(io.BytesIO(FILE_CONTENTS), len(FILE_CONTENTS)), {'color': 'blue'}, - io.BytesIO(file_contents), + io.BytesIO(FILE_CONTENTS), ) assert ReplicationStatus[file_dict['replicationStatus'].upper() @@ -248,15 +250,15 @@ def test_replication( def test_update_bucket( - raw_api, api_url, account_auth_token, account_id, bucket_id, sse_none, sse_b2_aes + raw_api, api_url, account_auth_token, account_id, bucket_id, ): for encryption_setting, default_retention in [ ( - sse_none, + SSE_NONE, BucketRetentionSetting(mode=RetentionMode.GOVERNANCE, period=RetentionPeriod(days=1)) ), - (sse_b2_aes, None), - (sse_b2_aes, BucketRetentionSetting(RetentionMode.NONE)), + (SSE_B2_AES, None), + (SSE_B2_AES, BucketRetentionSetting(RetentionMode.NONE)), ]: raw_api.update_bucket( api_url, @@ -281,43 +283,43 @@ def test_file_upload(file_dict): pass -def test_list_file_versions(raw_api, api_url, account_auth_token, bucket_id, file_dict, file_name): +def test_list_file_versions(raw_api, api_url, account_auth_token, bucket_id, file_dict): list_versions_dict = raw_api.list_file_versions(api_url, account_auth_token, bucket_id) - assert [file_name] == [f_dict['fileName'] for f_dict in list_versions_dict['files']] + assert [FILE_NAME] == [f_dict['fileName'] for f_dict in list_versions_dict['files']] def test_download_file_by_id_with_auth( - raw_api, download_url, file_dict, account_auth_token, file_contents + raw_api, download_url, file_dict, account_auth_token, ): url = raw_api.get_download_url_by_id(download_url, file_dict['fileId']) with raw_api.download_file_from_url(account_auth_token, url) as response: - data = next(response.iter_content(chunk_size=len(file_contents))) - assert data == file_contents, data + data = next(response.iter_content(chunk_size=len(FILE_CONTENTS))) + assert data == FILE_CONTENTS, data -def test_download_file_by_id_no_auth(raw_api, download_url, file_dict, file_contents): +def test_download_file_by_id_no_auth(raw_api, download_url, file_dict): url = raw_api.get_download_url_by_id(download_url, file_dict['fileId']) with raw_api.download_file_from_url(None, url) as response: - data = next(response.iter_content(chunk_size=len(file_contents))) - assert data == file_contents, data + data = next(response.iter_content(chunk_size=len(FILE_CONTENTS))) + assert data == FILE_CONTENTS, data def test_download_file_by_name_with_auth( - raw_api, download_url, file_name, bucket_name, account_auth_token, file_contents + raw_api, download_url, bucket_name, account_auth_token, file_dict ): - url = raw_api.get_download_url_by_name(download_url, bucket_name, file_name) + url = raw_api.get_download_url_by_name(download_url, bucket_name, FILE_NAME) with raw_api.download_file_from_url(account_auth_token, url) as response: - data = next(response.iter_content(chunk_size=len(file_contents))) - assert data == file_contents, data + data = next(response.iter_content(chunk_size=len(FILE_CONTENTS))) + assert data == FILE_CONTENTS, data def test_download_file_by_name_no_auth( - raw_api, download_url, file_name, bucket_name, file_contents + raw_api, download_url, bucket_name, file_dict ): - url = raw_api.get_download_url_by_name(download_url, bucket_name, file_name) + url = raw_api.get_download_url_by_name(download_url, bucket_name, FILE_NAME) with raw_api.download_file_from_url(None, url) as response: - data = next(response.iter_content(chunk_size=len(file_contents))) - assert data == file_contents, data + data = next(response.iter_content(chunk_size=len(FILE_CONTENTS))) + assert data == FILE_CONTENTS, data def test_get_download_authorization(download_auth_dict): @@ -325,25 +327,25 @@ def test_get_download_authorization(download_auth_dict): def test_download_file_by_name_download_auth( - download_auth_dict, raw_api, download_url, bucket_name, file_name, file_contents + download_auth_dict, raw_api, download_url, bucket_name, file_dict ): download_auth_token = download_auth_dict['authorizationToken'] - url = raw_api.get_download_url_by_name(download_url, bucket_name, file_name) + url = raw_api.get_download_url_by_name(download_url, bucket_name, FILE_NAME) with raw_api.download_file_from_url(download_auth_token, url) as response: - data = next(response.iter_content(chunk_size=len(file_contents))) - assert data == file_contents, data + data = next(response.iter_content(chunk_size=len(FILE_CONTENTS))) + assert data == FILE_CONTENTS, data -def test_list_file_names(raw_api, api_url, account_auth_token, bucket_id, file_name): +def test_list_file_names(raw_api, api_url, account_auth_token, bucket_id, file_dict): list_names_dict = raw_api.list_file_names(api_url, account_auth_token, bucket_id) - assert [file_name] == [f_dict['fileName'] for f_dict in list_names_dict['files']] + assert [FILE_NAME] == [f_dict['fileName'] for f_dict in list_names_dict['files']] -def test_list_file_names_start_count(raw_api, api_url, account_auth_token, bucket_id, file_name): +def test_list_file_names_start_count(raw_api, api_url, account_auth_token, bucket_id, file_dict): list_names_dict = raw_api.list_file_names( - api_url, account_auth_token, bucket_id, start_file_name=file_name, max_file_count=5 + api_url, account_auth_token, bucket_id, start_file_name=FILE_NAME, max_file_count=5 ) - assert [file_name] == [f_dict['fileName'] for f_dict in list_names_dict['files']] + assert [FILE_NAME] == [f_dict['fileName'] for f_dict in list_names_dict['files']] def test_copy_file(raw_api, api_url, account_auth_token, file_id): @@ -351,38 +353,38 @@ def test_copy_file(raw_api, api_url, account_auth_token, file_id): raw_api.copy_file(api_url, account_auth_token, file_id, copy_file_name) -def test_get_file_info_by_id(raw_api, api_url, account_auth_token, file_id, file_name): +def test_get_file_info_by_id(raw_api, api_url, account_auth_token, file_id): file_info_dict = raw_api.get_file_info_by_id(api_url, account_auth_token, file_id) - assert file_info_dict['fileName'] == file_name + assert file_info_dict['fileName'] == FILE_NAME def test_get_file_info_by_name_no_auth( - raw_api, api_url, account_id, bucket_name, file_name, file_id, download_url + raw_api, api_url, account_id, bucket_name, file_id, download_url ): - info_headers = raw_api.get_file_info_by_name(download_url, None, bucket_name, file_name) + info_headers = raw_api.get_file_info_by_name(download_url, None, bucket_name, FILE_NAME) assert info_headers['x-bz-file-id'] == file_id def test_get_file_info_by_name_with_auth( - raw_api, download_url, account_auth_token, bucket_name, file_name, file_id + raw_api, download_url, account_auth_token, bucket_name, file_id ): info_headers = raw_api.get_file_info_by_name( - download_url, account_auth_token, bucket_name, file_name + download_url, account_auth_token, bucket_name, FILE_NAME ) assert info_headers['x-bz-file-id'] == file_id def test_get_file_info_by_name_download_auth( - raw_api, download_url, download_auth_token, bucket_name, file_name, file_id + raw_api, download_url, download_auth_token, bucket_name, file_id ): info_headers = raw_api.get_file_info_by_name( - download_url, download_auth_token, bucket_name, file_name + download_url, download_auth_token, bucket_name, FILE_NAME ) assert info_headers['x-bz-file-id'] == file_id -def test_hide_file(raw_api, api_url, account_auth_token, bucket_id, file_name): - raw_api.hide_file(api_url, account_auth_token, bucket_id, file_name) +def test_hide_file(raw_api, api_url, account_auth_token, bucket_id, file_dict): + raw_api.hide_file(api_url, account_auth_token, bucket_id, FILE_NAME) class TestLargeFile: @@ -392,16 +394,16 @@ def file_info(self) -> dict: @pytest.fixture(scope='class') def large_file_id( - self, raw_api, api_url, account_auth_token, bucket_id, file_name, sse_b2_aes, file_info + self, raw_api, api_url, account_auth_token, bucket_id, file_info, file_dict ) -> str: large_info = raw_api.start_large_file( api_url, account_auth_token, bucket_id, - file_name, + FILE_NAME, 'text/plain', file_info, - server_side_encryption=sse_b2_aes, + server_side_encryption=SSE_B2_AES, ) return large_info['fileId'] @@ -414,7 +416,7 @@ def part_sha1(self, part_contents) -> bytes: return hex_sha1_of_stream(io.BytesIO(part_contents), len(part_contents)) def test_upload_part( - self, raw_api, api_url, account_auth_token, large_file_id, file_contents, part_contents, + self, raw_api, api_url, account_auth_token, large_file_id, part_contents, part_sha1 ): upload_part_dict = raw_api.get_upload_part_url(api_url, account_auth_token, large_file_id) @@ -434,12 +436,12 @@ def test_list_parts(self, raw_api, api_url, account_auth_token, large_file_id): assert [1, 2] == [part['partNumber'] for part in parts_response['parts']] def test_list_unfinished_large_files( - self, raw_api, api_url, account_auth_token, bucket_id, file_name, file_info + self, raw_api, api_url, account_auth_token, bucket_id, file_info, file_dict ): unfinished_list = raw_api.list_unfinished_large_files( api_url, account_auth_token, bucket_id ) - assert [file_name] == [f_dict['fileName'] for f_dict in unfinished_list['files']] + assert [FILE_NAME] == [f_dict['fileName'] for f_dict in unfinished_list['files']] assert file_info == unfinished_list['files'][0]['fileInfo'] def test_finish_large_file( From 51dda8276f619042aefd1f6ce99aa42769f4f15e Mon Sep 17 00:00:00 2001 From: Aleksandr Goncharov Date: Wed, 18 May 2022 16:27:13 +0300 Subject: [PATCH 4/4] FILE_NAME -> TEST_FILE_NAME, FILE_CONTENTS -> TEST_FILE_CONTENTS --- test/integration/conftest.py | 14 ++++---- test/integration/test_raw_api.py | 56 ++++++++++++++++---------------- 2 files changed, 35 insertions(+), 35 deletions(-) diff --git a/test/integration/conftest.py b/test/integration/conftest.py index 0cd9a99b4..c5940759e 100644 --- a/test/integration/conftest.py +++ b/test/integration/conftest.py @@ -135,8 +135,8 @@ def upload_url_dict(raw_api, api_url, account_auth_token, bucket_id) -> dict: return raw_api.get_upload_url(api_url, account_auth_token, bucket_id) -FILE_NAME = 'test.txt' -FILE_CONTENTS = b'hello world' +TEST_FILE_NAME = 'test.txt' +TEST_FILE_CONTENTS = b'hello world' @pytest.fixture(scope='module') @@ -144,12 +144,12 @@ def file_dict(raw_api, upload_url_dict, sse_b2_aes) -> dict: return raw_api.upload_file( upload_url_dict['uploadUrl'], upload_url_dict['authorizationToken'], - FILE_NAME, - len(FILE_CONTENTS), + TEST_FILE_NAME, + len(TEST_FILE_CONTENTS), 'text/plain', - hex_sha1_of_stream(BytesIO(FILE_CONTENTS), len(FILE_CONTENTS)), + hex_sha1_of_stream(BytesIO(TEST_FILE_CONTENTS), len(TEST_FILE_CONTENTS)), {'color': 'blue'}, - BytesIO(FILE_CONTENTS), + BytesIO(TEST_FILE_CONTENTS), server_side_encryption=sse_b2_aes, ) @@ -165,7 +165,7 @@ def download_auth_dict(raw_api, api_url, account_auth_token, bucket_id) -> dict: api_url, account_auth_token, bucket_id, - FILE_NAME[:-2], + TEST_FILE_NAME[:-2], 12345, ) diff --git a/test/integration/test_raw_api.py b/test/integration/test_raw_api.py index 93a47715f..ecdc0f49c 100644 --- a/test/integration/test_raw_api.py +++ b/test/integration/test_raw_api.py @@ -21,7 +21,7 @@ from b2sdk.utils import hex_sha1_of_stream from .bucket_cleaner import _clean_and_delete_bucket, _cleanup_old_buckets -from .conftest import FILE_CONTENTS, FILE_NAME +from .conftest import TEST_FILE_CONTENTS, TEST_FILE_NAME """ @@ -198,11 +198,11 @@ def test_replication( upload_url_dict['uploadUrl'], upload_url_dict['authorizationToken'], 'test.txt', - len(FILE_CONTENTS), + len(TEST_FILE_CONTENTS), 'text/plain', - hex_sha1_of_stream(io.BytesIO(FILE_CONTENTS), len(FILE_CONTENTS)), + hex_sha1_of_stream(io.BytesIO(TEST_FILE_CONTENTS), len(TEST_FILE_CONTENTS)), {'color': 'blue'}, - io.BytesIO(FILE_CONTENTS), + io.BytesIO(TEST_FILE_CONTENTS), ) assert ReplicationStatus[file_dict['replicationStatus'].upper() @@ -285,7 +285,7 @@ def test_file_upload(file_dict): def test_list_file_versions(raw_api, api_url, account_auth_token, bucket_id, file_dict): list_versions_dict = raw_api.list_file_versions(api_url, account_auth_token, bucket_id) - assert [FILE_NAME] == [f_dict['fileName'] for f_dict in list_versions_dict['files']] + assert [TEST_FILE_NAME] == [f_dict['fileName'] for f_dict in list_versions_dict['files']] def test_download_file_by_id_with_auth( @@ -293,33 +293,33 @@ def test_download_file_by_id_with_auth( ): url = raw_api.get_download_url_by_id(download_url, file_dict['fileId']) with raw_api.download_file_from_url(account_auth_token, url) as response: - data = next(response.iter_content(chunk_size=len(FILE_CONTENTS))) - assert data == FILE_CONTENTS, data + data = next(response.iter_content(chunk_size=len(TEST_FILE_CONTENTS))) + assert data == TEST_FILE_CONTENTS, data def test_download_file_by_id_no_auth(raw_api, download_url, file_dict): url = raw_api.get_download_url_by_id(download_url, file_dict['fileId']) with raw_api.download_file_from_url(None, url) as response: - data = next(response.iter_content(chunk_size=len(FILE_CONTENTS))) - assert data == FILE_CONTENTS, data + data = next(response.iter_content(chunk_size=len(TEST_FILE_CONTENTS))) + assert data == TEST_FILE_CONTENTS, data def test_download_file_by_name_with_auth( raw_api, download_url, bucket_name, account_auth_token, file_dict ): - url = raw_api.get_download_url_by_name(download_url, bucket_name, FILE_NAME) + url = raw_api.get_download_url_by_name(download_url, bucket_name, TEST_FILE_NAME) with raw_api.download_file_from_url(account_auth_token, url) as response: - data = next(response.iter_content(chunk_size=len(FILE_CONTENTS))) - assert data == FILE_CONTENTS, data + data = next(response.iter_content(chunk_size=len(TEST_FILE_CONTENTS))) + assert data == TEST_FILE_CONTENTS, data def test_download_file_by_name_no_auth( raw_api, download_url, bucket_name, file_dict ): - url = raw_api.get_download_url_by_name(download_url, bucket_name, FILE_NAME) + url = raw_api.get_download_url_by_name(download_url, bucket_name, TEST_FILE_NAME) with raw_api.download_file_from_url(None, url) as response: - data = next(response.iter_content(chunk_size=len(FILE_CONTENTS))) - assert data == FILE_CONTENTS, data + data = next(response.iter_content(chunk_size=len(TEST_FILE_CONTENTS))) + assert data == TEST_FILE_CONTENTS, data def test_get_download_authorization(download_auth_dict): @@ -330,22 +330,22 @@ def test_download_file_by_name_download_auth( download_auth_dict, raw_api, download_url, bucket_name, file_dict ): download_auth_token = download_auth_dict['authorizationToken'] - url = raw_api.get_download_url_by_name(download_url, bucket_name, FILE_NAME) + url = raw_api.get_download_url_by_name(download_url, bucket_name, TEST_FILE_NAME) with raw_api.download_file_from_url(download_auth_token, url) as response: - data = next(response.iter_content(chunk_size=len(FILE_CONTENTS))) - assert data == FILE_CONTENTS, data + data = next(response.iter_content(chunk_size=len(TEST_FILE_CONTENTS))) + assert data == TEST_FILE_CONTENTS, data def test_list_file_names(raw_api, api_url, account_auth_token, bucket_id, file_dict): list_names_dict = raw_api.list_file_names(api_url, account_auth_token, bucket_id) - assert [FILE_NAME] == [f_dict['fileName'] for f_dict in list_names_dict['files']] + assert [TEST_FILE_NAME] == [f_dict['fileName'] for f_dict in list_names_dict['files']] def test_list_file_names_start_count(raw_api, api_url, account_auth_token, bucket_id, file_dict): list_names_dict = raw_api.list_file_names( - api_url, account_auth_token, bucket_id, start_file_name=FILE_NAME, max_file_count=5 + api_url, account_auth_token, bucket_id, start_file_name=TEST_FILE_NAME, max_file_count=5 ) - assert [FILE_NAME] == [f_dict['fileName'] for f_dict in list_names_dict['files']] + assert [TEST_FILE_NAME] == [f_dict['fileName'] for f_dict in list_names_dict['files']] def test_copy_file(raw_api, api_url, account_auth_token, file_id): @@ -355,13 +355,13 @@ def test_copy_file(raw_api, api_url, account_auth_token, file_id): def test_get_file_info_by_id(raw_api, api_url, account_auth_token, file_id): file_info_dict = raw_api.get_file_info_by_id(api_url, account_auth_token, file_id) - assert file_info_dict['fileName'] == FILE_NAME + assert file_info_dict['fileName'] == TEST_FILE_NAME def test_get_file_info_by_name_no_auth( raw_api, api_url, account_id, bucket_name, file_id, download_url ): - info_headers = raw_api.get_file_info_by_name(download_url, None, bucket_name, FILE_NAME) + info_headers = raw_api.get_file_info_by_name(download_url, None, bucket_name, TEST_FILE_NAME) assert info_headers['x-bz-file-id'] == file_id @@ -369,7 +369,7 @@ def test_get_file_info_by_name_with_auth( raw_api, download_url, account_auth_token, bucket_name, file_id ): info_headers = raw_api.get_file_info_by_name( - download_url, account_auth_token, bucket_name, FILE_NAME + download_url, account_auth_token, bucket_name, TEST_FILE_NAME ) assert info_headers['x-bz-file-id'] == file_id @@ -378,13 +378,13 @@ def test_get_file_info_by_name_download_auth( raw_api, download_url, download_auth_token, bucket_name, file_id ): info_headers = raw_api.get_file_info_by_name( - download_url, download_auth_token, bucket_name, FILE_NAME + download_url, download_auth_token, bucket_name, TEST_FILE_NAME ) assert info_headers['x-bz-file-id'] == file_id def test_hide_file(raw_api, api_url, account_auth_token, bucket_id, file_dict): - raw_api.hide_file(api_url, account_auth_token, bucket_id, FILE_NAME) + raw_api.hide_file(api_url, account_auth_token, bucket_id, TEST_FILE_NAME) class TestLargeFile: @@ -400,7 +400,7 @@ def large_file_id( api_url, account_auth_token, bucket_id, - FILE_NAME, + TEST_FILE_NAME, 'text/plain', file_info, server_side_encryption=SSE_B2_AES, @@ -441,7 +441,7 @@ def test_list_unfinished_large_files( unfinished_list = raw_api.list_unfinished_large_files( api_url, account_auth_token, bucket_id ) - assert [FILE_NAME] == [f_dict['fileName'] for f_dict in unfinished_list['files']] + assert [TEST_FILE_NAME] == [f_dict['fileName'] for f_dict in unfinished_list['files']] assert file_info == unfinished_list['files'][0]['fileInfo'] def test_finish_large_file(