diff --git a/awscli/s3transfer/__init__.py b/awscli/s3transfer/__init__.py index 95b8c34f1434..08bd030874ec 100644 --- a/awscli/s3transfer/__init__.py +++ b/awscli/s3transfer/__init__.py @@ -123,6 +123,7 @@ def __call__(self, bytes_amount): """ + import concurrent.futures import functools import logging @@ -134,13 +135,12 @@ def __call__(self, bytes_amount): import string import threading +import s3transfer.compat from botocore.compat import six # noqa: F401 from botocore.exceptions import IncompleteReadError from botocore.vendored.requests.packages.urllib3.exceptions import ( ReadTimeoutError, ) - -import s3transfer.compat from s3transfer.exceptions import RetriesExceededError, S3UploadFailedError __author__ = 'Amazon Web Services' @@ -681,7 +681,6 @@ def __init__( class S3Transfer: - ALLOWED_DOWNLOAD_ARGS = [ 'VersionId', 'SSECustomerAlgorithm', diff --git a/awscli/s3transfer/bandwidth.py b/awscli/s3transfer/bandwidth.py index 9bac5885e1a2..9301c8e3b9d1 100644 --- a/awscli/s3transfer/bandwidth.py +++ b/awscli/s3transfer/bandwidth.py @@ -30,9 +30,7 @@ def __init__(self, requested_amt, retry_time): """ self.requested_amt = requested_amt self.retry_time = retry_time - msg = 'Request amount {} exceeded the amount available. Retry in {}'.format( - requested_amt, retry_time - ) + msg = f'Request amount {requested_amt} exceeded the amount available. Retry in {retry_time}' super().__init__(msg) diff --git a/awscli/s3transfer/constants.py b/awscli/s3transfer/constants.py index e716a5e6214d..56d03bf4c913 100644 --- a/awscli/s3transfer/constants.py +++ b/awscli/s3transfer/constants.py @@ -27,11 +27,11 @@ ] FULL_OBJECT_CHECKSUM_ARGS = [ - 'ChecksumCRC32', - 'ChecksumCRC32C', - 'ChecksumCRC64NVME', - 'ChecksumSHA1', - 'ChecksumSHA256', + 'ChecksumCRC32', + 'ChecksumCRC32C', + 'ChecksumCRC64NVME', + 'ChecksumSHA1', + 'ChecksumSHA256', ] USER_AGENT = 's3transfer/%s' % s3transfer.__version__ diff --git a/awscli/s3transfer/crt.py b/awscli/s3transfer/crt.py index 8ccfa07b3653..c110e29fdcbd 100644 --- a/awscli/s3transfer/crt.py +++ b/awscli/s3transfer/crt.py @@ -44,7 +44,6 @@ from botocore.exceptions import NoCredentialsError from botocore.useragent import register_feature_id from botocore.utils import ArnParser, InvalidArnException, is_s3express_bucket - from s3transfer.constants import FULL_OBJECT_CHECKSUM_ARGS, MB from s3transfer.exceptions import TransferNotDoneError from s3transfer.futures import BaseTransferFuture, BaseTransferMeta @@ -628,7 +627,7 @@ class CRTTransferCoordinator: """A helper class for managing CRTTransferFuture""" def __init__( - self, transfer_id=None, s3_request=None, exception_translator=None + self, transfer_id=None, s3_request=None, exception_translator=None ): self.transfer_id = transfer_id self._exception_translator = exception_translator @@ -858,9 +857,8 @@ def _default_get_make_request_args( arn_handler = _S3ArnParamHandler() if ( - (accesspoint_arn_details := arn_handler.handle_arn(call_args.bucket)) - and accesspoint_arn_details['region'] == "" - ): + accesspoint_arn_details := arn_handler.handle_arn(call_args.bucket) + ) and accesspoint_arn_details['region'] == "": # Configure our region to `*` to propogate in `x-amz-region-set` # for multi-region support in MRAP accesspoints. # use_double_uri_encode and should_normalize_uri_path are defaulted to be True @@ -928,6 +926,7 @@ class _S3ArnParamHandler: purposes. This should be safe to remove once we properly integrate auth resolution from Botocore into the CRT transfer integration. """ + _RESOURCE_REGEX = re.compile( r'^(?Paccesspoint|outpost)[/:](?P.+)$' ) @@ -948,9 +947,9 @@ def _get_arn_details_from_bucket(self, bucket): self._add_resource_type_and_name(arn_details) return arn_details except InvalidArnException: - pass + pass return None - + def _add_resource_type_and_name(self, arn_details): match = self._RESOURCE_REGEX.match(arn_details['resource']) if match: diff --git a/awscli/s3transfer/download.py b/awscli/s3transfer/download.py index dc8980d4ed12..e1a9cf0e9af9 100644 --- a/awscli/s3transfer/download.py +++ b/awscli/s3transfer/download.py @@ -307,9 +307,7 @@ def _get_download_output_manager_cls(self, transfer_future, osutil): if download_manager_cls.is_compatible(fileobj, osutil): return download_manager_cls raise RuntimeError( - 'Output {} of type: {} is not supported.'.format( - fileobj, type(fileobj) - ) + f'Output {fileobj} of type: {type(fileobj)} is not supported.' ) def _submit( diff --git a/awscli/s3transfer/futures.py b/awscli/s3transfer/futures.py index 9b70b48efea4..bdeef056de03 100644 --- a/awscli/s3transfer/futures.py +++ b/awscli/s3transfer/futures.py @@ -18,7 +18,6 @@ from concurrent import futures from botocore.context import get_context - from s3transfer.compat import MAXINT from s3transfer.exceptions import CancelledError, TransferNotDoneError from s3transfer.utils import FunctionContainer, TaskSemaphore @@ -177,9 +176,7 @@ def __init__(self, transfer_id=None): self._failure_cleanups_lock = threading.Lock() def __repr__(self): - return '{}(transfer_id={})'.format( - self.__class__.__name__, self.transfer_id - ) + return f'{self.__class__.__name__}(transfer_id={self.transfer_id})' @property def exception(self): @@ -318,9 +315,7 @@ def submit(self, executor, task, tag=None): :returns: A future representing the submitted task """ logger.debug( - "Submitting task {} to executor {} for transfer request: {}.".format( - task, executor, self.transfer_id - ) + f"Submitting task {task} to executor {executor} for transfer request: {self.transfer_id}." ) future = executor.submit(task, tag=tag) # Add this created future to the list of associated future just @@ -509,6 +504,7 @@ def add_done_callback(self, fn): than concurrent.futures.Future.add_done_callback that requires a single argument for the future. """ + # The done callback for concurrent.futures.Future will always pass a # the future in as the only argument. So we need to create the # proper signature wrapper that will invoke the callback provided. diff --git a/awscli/s3transfer/manager.py b/awscli/s3transfer/manager.py index 9ccf932d89a8..b97d6efac6f1 100644 --- a/awscli/s3transfer/manager.py +++ b/awscli/s3transfer/manager.py @@ -16,7 +16,6 @@ import threading from botocore.useragent import register_feature_id - from s3transfer.bandwidth import BandwidthLimiter, LeakyBucket from s3transfer.constants import ( ALLOWED_DOWNLOAD_ARGS, @@ -192,12 +191,12 @@ class TransferManager: ] ALLOWED_UPLOAD_ARGS = ( - _ALLOWED_SHARED_ARGS - + [ - 'ChecksumType', - 'MpuObjectSize', - ] - + FULL_OBJECT_CHECKSUM_ARGS + _ALLOWED_SHARED_ARGS + + [ + 'ChecksumType', + 'MpuObjectSize', + ] + + FULL_OBJECT_CHECKSUM_ARGS ) ALLOWED_COPY_ARGS = _ALLOWED_SHARED_ARGS + [ diff --git a/awscli/s3transfer/tasks.py b/awscli/s3transfer/tasks.py index b97566a1050e..7da35a6d9359 100644 --- a/awscli/s3transfer/tasks.py +++ b/awscli/s3transfer/tasks.py @@ -14,7 +14,6 @@ import logging from botocore.context import start_as_current_context - from s3transfer.utils import get_callbacks logger = logging.getLogger(__name__) @@ -98,11 +97,7 @@ def __repr__(self): main_kwargs_to_display = self._get_kwargs_with_params_to_include( self._main_kwargs, params_to_display ) - return '{}(transfer_id={}, {})'.format( - self.__class__.__name__, - self._transfer_coordinator.transfer_id, - main_kwargs_to_display, - ) + return f'{self.__class__.__name__}(transfer_id={self._transfer_coordinator.transfer_id}, {main_kwargs_to_display})' @property def transfer_id(self): diff --git a/awscli/s3transfer/upload.py b/awscli/s3transfer/upload.py index bec990bad616..a3db8f899e2b 100644 --- a/awscli/s3transfer/upload.py +++ b/awscli/s3transfer/upload.py @@ -557,9 +557,7 @@ def _get_upload_input_manager_cls(self, transfer_future): if upload_manager_cls.is_compatible(fileobj): return upload_manager_cls raise RuntimeError( - 'Input {} of type: {} is not supported.'.format( - fileobj, type(fileobj) - ) + f'Input {fileobj} of type: {type(fileobj)} is not supported.' ) def _submit( @@ -691,7 +689,7 @@ def _submit_multipart_request( 'client': client, 'bucket': call_args.bucket, 'key': call_args.key, - 'extra_args':create_multipart_extra_args, + 'extra_args': create_multipart_extra_args, }, ), ) @@ -830,7 +828,7 @@ def _main( UploadId=upload_id, PartNumber=part_number, Body=body, - **extra_args + **extra_args, ) etag = response['ETag'] part_metadata = {'ETag': etag, 'PartNumber': part_number} diff --git a/awscli/s3transfer/utils.py b/awscli/s3transfer/utils.py index ca0f18c19724..e2a2f1d3b442 100644 --- a/awscli/s3transfer/utils.py +++ b/awscli/s3transfer/utils.py @@ -24,7 +24,6 @@ from botocore.exceptions import IncompleteReadError, ReadTimeoutError from botocore.httpchecksum import DEFAULT_CHECKSUM_ALGORITHM, AwsChunkedWrapper from botocore.utils import is_s3express_bucket - from s3transfer.compat import SOCKET_ERROR, fallocate, rename_file from s3transfer.constants import FULL_OBJECT_CHECKSUM_ARGS @@ -32,8 +31,8 @@ # The maximum file size you can upload via S3 per request. # See: http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html # and: http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html -MAX_SINGLE_UPLOAD_SIZE = 5 * (1024 ** 3) -MIN_UPLOAD_CHUNKSIZE = 5 * (1024 ** 2) +MAX_SINGLE_UPLOAD_SIZE = 5 * (1024**3) +MIN_UPLOAD_CHUNKSIZE = 5 * (1024**2) logger = logging.getLogger(__name__) @@ -194,9 +193,7 @@ def __init__(self, func, *args, **kwargs): self._kwargs = kwargs def __repr__(self): - return 'Function: {} with args {} and kwargs {}'.format( - self._func, self._args, self._kwargs - ) + return f'Function: {self._func} with args {self._args} and kwargs {self._kwargs}' def __call__(self): return self._func(*self._args, **self._kwargs) @@ -825,7 +822,7 @@ def add_s3express_defaults(bucket, extra_args): def set_default_checksum_algorithm(extra_args): - """Set the default algorithm if not specified by the user.""" - if any(checksum in extra_args for checksum in FULL_OBJECT_CHECKSUM_ARGS): - return - extra_args.setdefault("ChecksumAlgorithm", DEFAULT_CHECKSUM_ALGORITHM) + """Set the default algorithm if not specified by the user.""" + if any(checksum in extra_args for checksum in FULL_OBJECT_CHECKSUM_ARGS): + return + extra_args.setdefault("ChecksumAlgorithm", DEFAULT_CHECKSUM_ALGORITHM) diff --git a/tests/functional/s3transfer/test_copy.py b/tests/functional/s3transfer/test_copy.py index d6533346ab28..996e5b3d98cd 100644 --- a/tests/functional/s3transfer/test_copy.py +++ b/tests/functional/s3transfer/test_copy.py @@ -12,9 +12,9 @@ # language governing permissions and limitations under the License. from botocore.exceptions import ClientError from botocore.stub import Stubber - from s3transfer.manager import TransferConfig, TransferManager from s3transfer.utils import MIN_UPLOAD_CHUNKSIZE + from tests import BaseGeneralInterfaceTest, FileSizeProvider @@ -84,7 +84,6 @@ def add_successful_copy_responses( expected_create_mpu_params=None, expected_complete_mpu_params=None, ): - # Add all responses needed to do the copy of the object. # Should account for both ranged and nonranged downloads. stubbed_responses = self.create_stubbed_responses()[1:] @@ -97,9 +96,9 @@ def add_successful_copy_responses( # Add the expected create multipart upload params. if expected_create_mpu_params: - stubbed_responses[0][ - 'expected_params' - ] = expected_create_mpu_params + stubbed_responses[0]['expected_params'] = ( + expected_create_mpu_params + ) # Add any expected copy parameters. if expected_copy_params: @@ -111,9 +110,9 @@ def add_successful_copy_responses( # Add the expected complete multipart upload params. if expected_complete_mpu_params: - stubbed_responses[-1][ - 'expected_params' - ] = expected_complete_mpu_params + stubbed_responses[-1]['expected_params'] = ( + expected_complete_mpu_params + ) # Add the responses to the stubber. for stubbed_response in stubbed_responses: @@ -353,7 +352,7 @@ def create_stubbed_responses(self): ] def add_get_head_response_with_default_expected_params( - self, extra_expected_params=None + self, extra_expected_params=None ): expected_params = { 'Bucket': 'mysourcebucket', @@ -366,7 +365,7 @@ def add_get_head_response_with_default_expected_params( self.stubber.add_response(**response) def add_create_multipart_response_with_default_expected_params( - self, extra_expected_params=None + self, extra_expected_params=None ): expected_params = {'Bucket': self.bucket, 'Key': self.key} if extra_expected_params: @@ -376,7 +375,7 @@ def add_create_multipart_response_with_default_expected_params( self.stubber.add_response(**response) def add_upload_part_copy_responses_with_default_expected_params( - self, extra_expected_params=None + self, extra_expected_params=None ): ranges = [ 'bytes=0-5242879', @@ -400,7 +399,7 @@ def add_upload_part_copy_responses_with_default_expected_params( checksum_member = 'Checksum%s' % name.upper() response = upload_part_response['service_response'] response['CopyPartResult'][checksum_member] = 'sum%s==' % ( - i + 1 + i + 1 ) else: expected_params.update(extra_expected_params) @@ -409,7 +408,7 @@ def add_upload_part_copy_responses_with_default_expected_params( self.stubber.add_response(**upload_part_response) def add_complete_multipart_response_with_default_expected_params( - self, extra_expected_params=None + self, extra_expected_params=None ): expected_params = { 'Bucket': self.bucket, @@ -497,7 +496,6 @@ def _get_expected_params(self): def _add_params_to_expected_params( self, add_copy_kwargs, operation_types, new_params ): - expected_params_to_update = [] for operation_type in operation_types: add_copy_kwargs_key = 'expected_' + operation_type + '_params' diff --git a/tests/functional/s3transfer/test_crt.py b/tests/functional/s3transfer/test_crt.py index f65722d74fe4..c07ca7d020b3 100644 --- a/tests/functional/s3transfer/test_crt.py +++ b/tests/functional/s3transfer/test_crt.py @@ -17,8 +17,8 @@ from concurrent.futures import Future from botocore.session import Session - from s3transfer.subscribers import BaseSubscriber + from tests import ( HAS_CRT, FileCreator, @@ -31,7 +31,6 @@ if HAS_CRT: import awscrt - import s3transfer.crt @@ -69,7 +68,9 @@ def setUp(self): self.region = 'us-west-2' self.bucket = "test_bucket" self.s3express_bucket = 's3expressbucket--usw2-az5--x-s3' - self.mrap_accesspoint = 'arn:aws:s3::123456789012:accesspoint/mfzwi23gnjvgw.mrap' + self.mrap_accesspoint = ( + 'arn:aws:s3::123456789012:accesspoint/mfzwi23gnjvgw.mrap' + ) self.mrap_bucket = 'mfzwi23gnjvgw.mrap' self.key = "test_key" self.expected_content = b'my content' @@ -80,11 +81,11 @@ def setUp(self): ) self.expected_path = "/" + self.bucket + "/" + self.key self.expected_host = "s3.%s.amazonaws.com" % (self.region) - self.expected_s3express_host = ( - f'{self.s3express_bucket}.s3express-usw2-az5.us-west-2.amazonaws.com' - ) + self.expected_s3express_host = f'{self.s3express_bucket}.s3express-usw2-az5.us-west-2.amazonaws.com' self.expected_s3express_path = f'/{self.key}' - self.expected_mrap_host = f'{self.mrap_bucket}.accesspoint.s3-global.amazonaws.com' + self.expected_mrap_host = ( + f'{self.mrap_bucket}.accesspoint.s3-global.amazonaws.com' + ) self.expected_mrap_path = f"/{self.key}" self.s3_request = mock.Mock(awscrt.s3.S3Request) self.s3_crt_client = mock.Mock(awscrt.s3.S3Client) @@ -145,9 +146,7 @@ def _assert_expected_crt_http_request( self.assertNotIn(expected_missing_header.lower(), header_names) def _assert_expected_s3express_request( - self, - make_request_kwargs, - expected_http_method='GET' + self, make_request_kwargs, expected_http_method='GET' ): self._assert_expected_crt_http_request( make_request_kwargs["request"], @@ -158,7 +157,7 @@ def _assert_expected_s3express_request( self.assertIn('signing_config', make_request_kwargs) self.assertEqual( make_request_kwargs['signing_config'].algorithm, - awscrt.auth.AwsSigningAlgorithm.V4_S3EXPRESS + awscrt.auth.AwsSigningAlgorithm.V4_S3EXPRESS, ) self.assertFalse( make_request_kwargs['signing_config'].use_double_uri_encode, @@ -168,9 +167,7 @@ def _assert_expected_s3express_request( ) def _assert_expected_mrap_request( - self, - make_request_kwargs, - expected_http_method='GET' + self, make_request_kwargs, expected_http_method='GET' ): self._assert_expected_crt_http_request( make_request_kwargs["request"], @@ -181,11 +178,9 @@ def _assert_expected_mrap_request( self.assertIn('signing_config', make_request_kwargs) self.assertEqual( make_request_kwargs['signing_config'].algorithm, - awscrt.auth.AwsSigningAlgorithm.V4_ASYMMETRIC - ) - self.assertEqual( - make_request_kwargs['signing_config'].region, "*" + awscrt.auth.AwsSigningAlgorithm.V4_ASYMMETRIC, ) + self.assertEqual(make_request_kwargs['signing_config'].region, "*") self.assertFalse( make_request_kwargs['signing_config'].use_double_uri_encode, ) @@ -416,24 +411,30 @@ def test_upload_throws_error_for_unsupported_checksum(self): def test_upload_with_s3express(self): future = self.transfer_manager.upload( - self.filename, self.s3express_bucket, self.key, {}, - [self.record_subscriber] + self.filename, + self.s3express_bucket, + self.key, + {}, + [self.record_subscriber], ) future.result() self._assert_expected_s3express_request( self.s3_crt_client.make_request.call_args[1], - expected_http_method='PUT' + expected_http_method='PUT', ) def test_upload_with_mrap(self): future = self.transfer_manager.upload( - self.filename, self.mrap_accesspoint, self.key, {}, - [self.record_subscriber] + self.filename, + self.mrap_accesspoint, + self.key, + {}, + [self.record_subscriber], ) future.result() self._assert_expected_mrap_request( self.s3_crt_client.make_request.call_args[1], - expected_http_method='PUT' + expected_http_method='PUT', ) def test_upload_with_full_checksum(self): @@ -571,24 +572,30 @@ def test_download_to_nonseekable_stream(self): def test_download_with_s3express(self): future = self.transfer_manager.download( - self.s3express_bucket, self.key, self.filename, {}, - [self.record_subscriber] + self.s3express_bucket, + self.key, + self.filename, + {}, + [self.record_subscriber], ) future.result() self._assert_expected_s3express_request( self.s3_crt_client.make_request.call_args[1], - expected_http_method='GET' + expected_http_method='GET', ) def test_download_with_mrap(self): future = self.transfer_manager.download( - self.mrap_accesspoint, self.key, self.filename, {}, - [self.record_subscriber] + self.mrap_accesspoint, + self.key, + self.filename, + {}, + [self.record_subscriber], ) future.result() self._assert_expected_mrap_request( self.s3_crt_client.make_request.call_args[1], - expected_http_method='GET' + expected_http_method='GET', ) def test_delete(self): @@ -622,7 +629,7 @@ def test_delete_with_s3express(self): future.result() self._assert_expected_s3express_request( self.s3_crt_client.make_request.call_args[1], - expected_http_method='DELETE' + expected_http_method='DELETE', ) def test_blocks_when_max_requests_processes_reached(self): diff --git a/tests/functional/s3transfer/test_delete.py b/tests/functional/s3transfer/test_delete.py index 5e6383532cd0..d8816d1bb47b 100644 --- a/tests/functional/s3transfer/test_delete.py +++ b/tests/functional/s3transfer/test_delete.py @@ -11,11 +11,11 @@ # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. from s3transfer.manager import TransferManager + from tests import BaseGeneralInterfaceTest class TestDeleteObject(BaseGeneralInterfaceTest): - __test__ = True def setUp(self): diff --git a/tests/functional/s3transfer/test_download.py b/tests/functional/s3transfer/test_download.py index f458721d4b14..1166b755abf0 100644 --- a/tests/functional/s3transfer/test_download.py +++ b/tests/functional/s3transfer/test_download.py @@ -19,10 +19,10 @@ from io import BytesIO from botocore.exceptions import ClientError - from s3transfer.compat import SOCKET_ERROR from s3transfer.exceptions import RetriesExceededError from s3transfer.manager import TransferConfig, TransferManager + from tests import ( BaseGeneralInterfaceTest, FileSizeProvider, @@ -116,9 +116,9 @@ def add_successful_get_object_responses( expected_params ) if expected_ranges: - stubbed_response['expected_params'][ - 'Range' - ] = expected_ranges[i] + stubbed_response['expected_params']['Range'] = ( + expected_ranges[i] + ) self.stubber.add_response(**stubbed_response) def add_n_retryable_get_object_responses(self, n, num_reads=0): diff --git a/tests/functional/s3transfer/test_manager.py b/tests/functional/s3transfer/test_manager.py index 4041c3d03897..ebd9335bc361 100644 --- a/tests/functional/s3transfer/test_manager.py +++ b/tests/functional/s3transfer/test_manager.py @@ -13,10 +13,10 @@ from io import BytesIO from botocore.awsrequest import create_request_object - from s3transfer.exceptions import CancelledError, FatalError from s3transfer.futures import BaseExecutor from s3transfer.manager import TransferConfig, TransferManager + from tests import StubbedClientTest, mock, skip_if_using_serial_implementation diff --git a/tests/functional/s3transfer/test_upload.py b/tests/functional/s3transfer/test_upload.py index b0fe39862122..be2be47b0762 100644 --- a/tests/functional/s3transfer/test_upload.py +++ b/tests/functional/s3transfer/test_upload.py @@ -21,10 +21,10 @@ from botocore.exceptions import ClientError from botocore.httpchecksum import DEFAULT_CHECKSUM_ALGORITHM from botocore.stub import ANY - from s3transfer.manager import TransferConfig, TransferManager from s3transfer.upload import UploadSubmissionTask from s3transfer.utils import ChunksizeAdjuster + from tests import ( BaseGeneralInterfaceTest, NonSeekableReader, @@ -468,7 +468,7 @@ def add_complete_multipart_response_with_default_expected_params( ): if bucket is None: bucket = self.bucket - + num_parts = 3 parts = [] for part_num in range(1, num_parts + 1): diff --git a/tests/functional/s3transfer/test_utils.py b/tests/functional/s3transfer/test_utils.py index a033322fd8b9..32eadf8df145 100644 --- a/tests/functional/s3transfer/test_utils.py +++ b/tests/functional/s3transfer/test_utils.py @@ -16,6 +16,7 @@ import tempfile from s3transfer.utils import OSUtils + from tests import skip_if_windows, unittest diff --git a/tests/integration/s3transfer/__init__.py b/tests/integration/s3transfer/__init__.py index 8fcb3e0be58d..d43fc30f3bcf 100644 --- a/tests/integration/s3transfer/__init__.py +++ b/tests/integration/s3transfer/__init__.py @@ -13,9 +13,9 @@ import botocore import botocore.session from botocore.exceptions import WaiterError - from s3transfer.manager import TransferManager from s3transfer.subscribers import BaseSubscriber + from tests import FileCreator, S3Utils, random_bucket_name, unittest diff --git a/tests/integration/s3transfer/test_copy.py b/tests/integration/s3transfer/test_copy.py index c132dae3af84..138cf3dbd478 100644 --- a/tests/integration/s3transfer/test_copy.py +++ b/tests/integration/s3transfer/test_copy.py @@ -11,6 +11,7 @@ # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. from s3transfer.manager import TransferConfig + from tests import RecordingSubscriber from tests.integration.s3transfer import BaseTransferManagerIntegTest diff --git a/tests/integration/s3transfer/test_crt.py b/tests/integration/s3transfer/test_crt.py index 460f84b7fe4a..7280cd943d3a 100644 --- a/tests/integration/s3transfer/test_crt.py +++ b/tests/integration/s3transfer/test_crt.py @@ -16,9 +16,9 @@ from uuid import uuid4 from botocore.exceptions import ClientError - from s3transfer.subscribers import BaseSubscriber from s3transfer.utils import OSUtils + from tests import ( HAS_CRT, NonSeekableReader, @@ -29,9 +29,8 @@ from tests.integration.s3transfer import BaseTransferManagerIntegTest if HAS_CRT: - from awscrt.exceptions import AwsCrtError - import s3transfer.crt + from awscrt.exceptions import AwsCrtError class RecordingSubscriber(BaseSubscriber): diff --git a/tests/integration/s3transfer/test_download.py b/tests/integration/s3transfer/test_download.py index 7348dd8b12b3..02a53f376795 100644 --- a/tests/integration/s3transfer/test_download.py +++ b/tests/integration/s3transfer/test_download.py @@ -17,6 +17,7 @@ from concurrent.futures import CancelledError from s3transfer.manager import TransferConfig + from tests import ( NonSeekableWriter, RecordingSubscriber, @@ -115,9 +116,7 @@ def test_large_download_exits_quicky_on_exception(self): self.assertLess( actual_time_to_exit, max_allowed_exit_time, - "Failed to exit under {}. Instead exited in {}.".format( - max_allowed_exit_time, actual_time_to_exit - ), + f"Failed to exit under {max_allowed_exit_time}. Instead exited in {actual_time_to_exit}.", ) # Make sure the future was cancelled because of the KeyboardInterrupt @@ -174,9 +173,7 @@ def test_many_files_exits_quicky_on_exception(self): self.assertLess( end_time - start_time, max_allowed_exit_time, - "Failed to exit under {}. Instead exited in {}.".format( - max_allowed_exit_time, end_time - start_time - ), + f"Failed to exit under {max_allowed_exit_time}. Instead exited in {end_time - start_time}.", ) # Make sure at least one of the futures got cancelled diff --git a/tests/integration/s3transfer/test_s3transfer.py b/tests/integration/s3transfer/test_s3transfer.py index de5cd25a5d0b..c4e299761d91 100644 --- a/tests/integration/s3transfer/test_s3transfer.py +++ b/tests/integration/s3transfer/test_s3transfer.py @@ -18,9 +18,9 @@ import tempfile import threading +import s3transfer from botocore.client import Config -import s3transfer from tests.integration.s3transfer import BaseTransferManagerIntegTest @@ -31,9 +31,7 @@ def assert_files_equal(first, second): second_md5 = md5_checksum(second) if first_md5 != second_md5: raise AssertionError( - "Files are not equal: {}(md5={}) != {}(md5={})".format( - first, first_md5, second, second_md5 - ) + f"Files are not equal: {first}(md5={first_md5}) != {second}(md5={second_md5})" ) diff --git a/tests/integration/s3transfer/test_upload.py b/tests/integration/s3transfer/test_upload.py index 2800002c686d..e39a6b800d49 100644 --- a/tests/integration/s3transfer/test_upload.py +++ b/tests/integration/s3transfer/test_upload.py @@ -16,8 +16,8 @@ from io import BytesIO from botocore.config import Config - from s3transfer.manager import TransferConfig + from tests import ( NonSeekableReader, RecordingSubscriber, @@ -111,9 +111,7 @@ def test_large_upload_exits_quicky_on_exception(self): self.assertLess( actual_time_to_exit, max_allowed_exit_time, - "Failed to exit under {}. Instead exited in {}.".format( - max_allowed_exit_time, actual_time_to_exit - ), + f"Failed to exit under {max_allowed_exit_time}. Instead exited in {actual_time_to_exit}.", ) try: @@ -175,9 +173,7 @@ def test_many_files_exits_quicky_on_exception(self): self.assertLess( end_time - start_time, max_allowed_exit_time, - "Failed to exit under {}. Instead exited in {}.".format( - max_allowed_exit_time, end_time - start_time - ), + f"Failed to exit under {max_allowed_exit_time}. Instead exited in {end_time - start_time}.", ) # Make sure at least one of the futures got cancelled diff --git a/tests/unit/s3transfer/test_bandwidth.py b/tests/unit/s3transfer/test_bandwidth.py index a54d45a50ad4..85b7ce27c8f6 100644 --- a/tests/unit/s3transfer/test_bandwidth.py +++ b/tests/unit/s3transfer/test_bandwidth.py @@ -25,6 +25,7 @@ TimeUtils, ) from s3transfer.futures import TransferCoordinator + from tests import mock, unittest diff --git a/tests/unit/s3transfer/test_compat.py b/tests/unit/s3transfer/test_compat.py index 2081e72eb6dd..cf4a5165b0ee 100644 --- a/tests/unit/s3transfer/test_compat.py +++ b/tests/unit/s3transfer/test_compat.py @@ -16,6 +16,7 @@ from io import BytesIO from s3transfer.compat import readable, seekable + from tests import unittest @@ -56,7 +57,7 @@ def test_non_file_like_obj(self): def test_non_seekable_ioerror(self): # Should return False if IOError is thrown. with open(self.filename, 'w') as f: - self.assertFalse(seekable(ErrorRaisingSeekWrapper(f, IOError()))) + self.assertFalse(seekable(ErrorRaisingSeekWrapper(f, OSError()))) def test_non_seekable_oserror(self): # Should return False if OSError is thrown. diff --git a/tests/unit/s3transfer/test_copies.py b/tests/unit/s3transfer/test_copies.py index 8768db2c0cad..6a03e49afeab 100644 --- a/tests/unit/s3transfer/test_copies.py +++ b/tests/unit/s3transfer/test_copies.py @@ -11,6 +11,7 @@ # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. from s3transfer.copies import CopyObjectTask, CopyPartTask + from tests import BaseTaskTest, RecordingSubscriber diff --git a/tests/unit/s3transfer/test_crt.py b/tests/unit/s3transfer/test_crt.py index 0946fabacd37..5c0714be3514 100644 --- a/tests/unit/s3transfer/test_crt.py +++ b/tests/unit/s3transfer/test_crt.py @@ -16,15 +16,14 @@ from botocore.credentials import Credentials, ReadOnlyCredentials from botocore.exceptions import ClientError, NoCredentialsError from botocore.session import Session - from s3transfer.exceptions import TransferNotDoneError from s3transfer.utils import CallArgs + from tests import HAS_CRT, FileCreator, mock, requires_crt, unittest if HAS_CRT: import awscrt.auth import awscrt.s3 - import s3transfer.crt @@ -165,7 +164,7 @@ def test_delete_request(self): self.assertIsNone(crt_request.headers.get("Authorization")) def _create_crt_response_error( - self, status_code, body, operation_name=None + self, status_code, body, operation_name=None ): return awscrt.s3.S3ResponseError( code=14343, diff --git a/tests/unit/s3transfer/test_delete.py b/tests/unit/s3transfer/test_delete.py index 33f093bc5c95..fd08de275df9 100644 --- a/tests/unit/s3transfer/test_delete.py +++ b/tests/unit/s3transfer/test_delete.py @@ -11,6 +11,7 @@ # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. from s3transfer.delete import DeleteObjectTask + from tests import BaseTaskTest diff --git a/tests/unit/s3transfer/test_download.py b/tests/unit/s3transfer/test_download.py index 35a6cb56b28f..25b94b5b7574 100644 --- a/tests/unit/s3transfer/test_download.py +++ b/tests/unit/s3transfer/test_download.py @@ -37,6 +37,7 @@ from s3transfer.exceptions import RetriesExceededError from s3transfer.futures import IN_MEMORY_DOWNLOAD_TAG, BoundedExecutor from s3transfer.utils import CallArgs, OSUtils + from tests import ( BaseSubmissionTaskTest, BaseTaskTest, @@ -576,7 +577,7 @@ def setUp(self): self.stream = BytesIO(self.content) self.fileobj = WriteCollector() self.osutil = OSUtils() - self.io_chunksize = 64 * (1024 ** 2) + self.io_chunksize = 64 * (1024**2) self.task_cls = GetObjectTask self.download_output_manager = DownloadSeekableOutputManager( self.osutil, self.transfer_coordinator, self.io_executor diff --git a/tests/unit/s3transfer/test_futures.py b/tests/unit/s3transfer/test_futures.py index ed196a3a0d9d..a4c015b10cae 100644 --- a/tests/unit/s3transfer/test_futures.py +++ b/tests/unit/s3transfer/test_futures.py @@ -37,6 +37,7 @@ NoResourcesAvailable, TaskSemaphore, ) + from tests import ( RecordingExecutor, TransferCoordinatorWithInterrupt, @@ -489,9 +490,7 @@ def assert_submit_would_not_block(self, task, tag=None, **kwargs): self.executor.submit(task, tag=tag, block=False) except NoResourcesAvailable: self.fail( - 'Task {} should not have been blocked. Caused by:\n{}'.format( - task, traceback.format_exc() - ) + f'Task {task} should not have been blocked. Caused by:\n{traceback.format_exc()}' ) def add_done_callback_to_future(self, future, fn, *args, **kwargs): diff --git a/tests/unit/s3transfer/test_manager.py b/tests/unit/s3transfer/test_manager.py index b377b200429a..a404af30c54a 100644 --- a/tests/unit/s3transfer/test_manager.py +++ b/tests/unit/s3transfer/test_manager.py @@ -16,6 +16,7 @@ from s3transfer.exceptions import CancelledError, FatalError from s3transfer.futures import TransferCoordinator from s3transfer.manager import TransferConfig, TransferCoordinatorController + from tests import TransferCoordinatorWithInterrupt, unittest diff --git a/tests/unit/s3transfer/test_s3transfer.py b/tests/unit/s3transfer/test_s3transfer.py index a2f46a131282..a042d963e2bd 100644 --- a/tests/unit/s3transfer/test_s3transfer.py +++ b/tests/unit/s3transfer/test_s3transfer.py @@ -33,6 +33,7 @@ random_file_extension, ) from s3transfer.exceptions import RetriesExceededError, S3UploadFailedError + from tests import mock, unittest @@ -377,7 +378,6 @@ def test_multipart_upload_is_aborted_on_error(self): class TestMultipartDownloader(unittest.TestCase): - maxDiff = None def test_multipart_download_uses_correct_client_calls(self): @@ -435,7 +435,7 @@ def test_retry_on_failures_from_stream_reads(self): response_body = b'foobarbaz' stream_with_errors = mock.Mock() stream_with_errors.read.side_effect = [ - socket.error("fake error"), + OSError("fake error"), response_body, ] client.get_object.return_value = {'Body': stream_with_errors} @@ -470,7 +470,7 @@ def test_exception_raised_on_exceeded_retries(self): client = mock.Mock() response_body = b'foobarbaz' stream_with_errors = mock.Mock() - stream_with_errors.read.side_effect = socket.error("fake error") + stream_with_errors.read.side_effect = OSError("fake error") client.get_object.return_value = {'Body': stream_with_errors} config = TransferConfig(multipart_threshold=4, multipart_chunksize=4) @@ -679,7 +679,7 @@ def test_get_object_stream_is_retried_and_succeeds(self): } self.client.get_object.side_effect = [ # First request fails. - socket.error("fake error"), + OSError("fake error"), # Second succeeds. {'Body': BytesIO(b'foobar')}, ] @@ -697,7 +697,7 @@ def test_get_object_stream_uses_all_retries_and_errors_out(self): # Here we're raising an exception every single time, which # will exhaust our retry count and propagate a # RetriesExceededError. - self.client.get_object.side_effect = socket.error("fake error") + self.client.get_object.side_effect = OSError("fake error") with self.assertRaises(RetriesExceededError): transfer.download_file('bucket', 'key', 'smallfile') diff --git a/tests/unit/s3transfer/test_subscribers.py b/tests/unit/s3transfer/test_subscribers.py index a3d5a435283d..1ef766fda253 100644 --- a/tests/unit/s3transfer/test_subscribers.py +++ b/tests/unit/s3transfer/test_subscribers.py @@ -12,6 +12,7 @@ # language governing permissions and limitations under the License. from s3transfer.exceptions import InvalidSubscriberMethodError from s3transfer.subscribers import BaseSubscriber + from tests import unittest diff --git a/tests/unit/s3transfer/test_tasks.py b/tests/unit/s3transfer/test_tasks.py index b35015055e1b..a9b1e80b2285 100644 --- a/tests/unit/s3transfer/test_tasks.py +++ b/tests/unit/s3transfer/test_tasks.py @@ -15,7 +15,6 @@ from threading import Event from botocore.context import ClientContext, get_context - from s3transfer.futures import BoundedExecutor, TransferCoordinator from s3transfer.subscribers import BaseSubscriber from s3transfer.tasks import ( @@ -25,6 +24,7 @@ Task, ) from s3transfer.utils import CallArgs, FunctionContainer, get_callbacks + from tests import ( BaseSubmissionTaskTest, BaseTaskTest, diff --git a/tests/unit/s3transfer/test_upload.py b/tests/unit/s3transfer/test_upload.py index 4c5230111a80..6bf5a2d7d964 100644 --- a/tests/unit/s3transfer/test_upload.py +++ b/tests/unit/s3transfer/test_upload.py @@ -18,7 +18,6 @@ from io import BytesIO from botocore.stub import ANY - from s3transfer.futures import IN_MEMORY_UPLOAD_TAG from s3transfer.manager import TransferConfig from s3transfer.upload import ( @@ -32,6 +31,7 @@ UploadSubmissionTask, ) from s3transfer.utils import MIN_UPLOAD_CHUNKSIZE, CallArgs, OSUtils + from tests import ( BaseSubmissionTaskTest, BaseTaskTest, diff --git a/tests/unit/s3transfer/test_utils.py b/tests/unit/s3transfer/test_utils.py index 9a366d17255d..32257ec97864 100644 --- a/tests/unit/s3transfer/test_utils.py +++ b/tests/unit/s3transfer/test_utils.py @@ -21,7 +21,6 @@ from io import BytesIO, StringIO import pytest - from s3transfer.futures import TransferFuture, TransferMeta from s3transfer.utils import ( MAX_PARTS, @@ -46,6 +45,7 @@ random_file_extension, set_default_checksum_algorithm, ) + from tests import NonSeekableWriter, RecordingSubscriber, mock, unittest @@ -329,7 +329,7 @@ def test_allocate(self): @mock.patch('s3transfer.utils.fallocate') def test_allocate_with_io_error(self, mock_fallocate): - mock_fallocate.side_effect = IOError() + mock_fallocate.side_effect = OSError() with self.assertRaises(IOError): OSUtils().allocate(self.filename, 1) self.assertFalse(os.path.exists(self.filename)) @@ -1149,8 +1149,8 @@ def setUp(self): self.adjuster = ChunksizeAdjuster() def test_valid_chunksize(self): - chunksize = 7 * (1024 ** 2) - file_size = 8 * (1024 ** 2) + chunksize = 7 * (1024**2) + file_size = 8 * (1024**2) new_size = self.adjuster.adjust_chunksize(chunksize, file_size) self.assertEqual(new_size, chunksize) @@ -1167,17 +1167,17 @@ def test_chunksize_above_maximum(self): self.assertEqual(new_size, MAX_SINGLE_UPLOAD_SIZE) def test_chunksize_too_small(self): - chunksize = 7 * (1024 ** 2) - file_size = 5 * (1024 ** 4) + chunksize = 7 * (1024**2) + file_size = 5 * (1024**4) # If we try to upload a 5TB file, we'll need to use 896MB part # sizes. new_size = self.adjuster.adjust_chunksize(chunksize, file_size) - self.assertEqual(new_size, 896 * (1024 ** 2)) + self.assertEqual(new_size, 896 * (1024**2)) num_parts = file_size / new_size self.assertLessEqual(num_parts, MAX_PARTS) def test_unknown_file_size_with_valid_chunksize(self): - chunksize = 7 * (1024 ** 2) + chunksize = 7 * (1024**2) new_size = self.adjuster.adjust_chunksize(chunksize) self.assertEqual(new_size, chunksize) @@ -1218,10 +1218,10 @@ class TestS3Defaults: ), ( "mytestbucket", - {"Some": "Setting"}, - {"ChecksumAlgorithm": "CRC64NVME", "Some": "Setting"}, - ), - ( + {"Some": "Setting"}, + {"ChecksumAlgorithm": "CRC64NVME", "Some": "Setting"}, + ), + ( "mytestbucket", {"ChecksumAlgorithm": "sha256"}, {"ChecksumAlgorithm": "sha256"}, diff --git a/tests/utils/s3transfer/__init__.py b/tests/utils/s3transfer/__init__.py index 102f8e0d0120..feffba9371a6 100644 --- a/tests/utils/s3transfer/__init__.py +++ b/tests/utils/s3transfer/__init__.py @@ -10,8 +10,8 @@ # distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. -import io import hashlib +import io import math import os import platform @@ -23,26 +23,29 @@ import botocore.session from botocore.stub import Stubber - +from s3transfer.futures import ( + IN_MEMORY_DOWNLOAD_TAG, + IN_MEMORY_UPLOAD_TAG, + BoundedExecutor, + NonThreadedExecutor, + TransferCoordinator, + TransferFuture, + TransferMeta, +) from s3transfer.manager import TransferConfig -from s3transfer.futures import IN_MEMORY_UPLOAD_TAG -from s3transfer.futures import IN_MEMORY_DOWNLOAD_TAG -from s3transfer.futures import TransferCoordinator -from s3transfer.futures import TransferMeta -from s3transfer.futures import TransferFuture -from s3transfer.futures import BoundedExecutor -from s3transfer.futures import NonThreadedExecutor from s3transfer.subscribers import BaseSubscriber -from s3transfer.utils import OSUtils -from s3transfer.utils import CallArgs -from s3transfer.utils import TaskSemaphore -from s3transfer.utils import SlidingWindowSemaphore - +from s3transfer.utils import ( + CallArgs, + OSUtils, + SlidingWindowSemaphore, + TaskSemaphore, +) ORIGINAL_EXECUTOR_CLS = BoundedExecutor.EXECUTOR_CLS # Detect if CRT is available for use try: import awscrt.s3 # noqa: F401 + HAS_CRT = True except ImportError: HAS_CRT = False @@ -68,8 +71,9 @@ def assert_files_equal(first, second): second_md5 = md5_checksum(second) if first_md5 != second_md5: raise AssertionError( - "Files are not equal: %s(md5=%s) != %s(md5=%s)" % ( - first, first_md5, second, second_md5)) + "Files are not equal: %s(md5=%s) != %s(md5=%s)" + % (first, first_md5, second, second_md5) + ) def md5_checksum(filename): @@ -96,17 +100,21 @@ def test_some_non_windows_stuff(self): self.assertEqual(...) """ + def decorator(func): return unittest.skipIf( - platform.system() not in ['Darwin', 'Linux'], reason)(func) + platform.system() not in ['Darwin', 'Linux'], reason + )(func) + return decorator def skip_if_using_serial_implementation(reason): """Decorator to skip tests when running as the serial implementation""" + def decorator(func): - return unittest.skipIf( - is_serial_implementation(), reason)(func) + return unittest.skipIf(is_serial_implementation(), reason)(func) + return decorator @@ -116,7 +124,7 @@ def requires_crt(cls, reason=None): return unittest.skipIf(not HAS_CRT, reason)(cls) -class StreamWithError(object): +class StreamWithError: """A wrapper to simulate errors while reading from a stream :param stream: The underlying stream to read from @@ -139,7 +147,7 @@ def read(self, n=-1): return self._stream.read(n) -class FileSizeProvider(object): +class FileSizeProvider: def __init__(self, file_size): self.file_size = file_size @@ -147,7 +155,7 @@ def on_queued(self, future, **kwargs): future.meta.provide_transfer_size(self.file_size) -class FileCreator(object): +class FileCreator: def __init__(self): self.rootdir = tempfile.mkdtemp() @@ -212,7 +220,8 @@ def open(self, filename, mode): def rename_file(self, current_filename, new_filename): self.rename_records.append((current_filename, new_filename)) super(RecordingOSUtils, self).rename_file( - current_filename, new_filename) + current_filename, new_filename + ) class RecordingSubscriber(BaseSubscriber): @@ -244,7 +253,7 @@ def result(self): raise KeyboardInterrupt() -class RecordingExecutor(object): +class RecordingExecutor: """A wrapper on an executor to record calls made to submit() You can access the submissions property to receive a list of dictionaries @@ -263,13 +272,7 @@ def __init__(self, executor): def submit(self, task, tag=None, block=True): future = self._executor.submit(task, tag, block) - self.submissions.append( - { - 'task': task, - 'tag': tag, - 'block': block - } - ) + self.submissions.append({'task': task, 'tag': tag, 'block': block}) return future def shutdown(self): @@ -281,8 +284,11 @@ def setUp(self): self.session = botocore.session.get_session() self.region = 'us-west-2' self.client = self.session.create_client( - 's3', self.region, aws_access_key_id='foo', - aws_secret_access_key='bar') + 's3', + self.region, + aws_access_key_id='foo', + aws_secret_access_key='bar', + ) self.stubber = Stubber(self.client) self.stubber.activate() @@ -294,7 +300,7 @@ def reset_stubber_with_new_client(self, override_client_kwargs): 'service_name': 's3', 'region_name': self.region, 'aws_access_key_id': 'foo', - 'aws_secret_access_key': 'bar' + 'aws_secret_access_key': 'bar', } client_kwargs.update(override_client_kwargs) self.client = self.session.create_client(**client_kwargs) @@ -314,8 +320,7 @@ def get_task(self, task_cls, **kwargs): def get_transfer_future(self, call_args=None): return TransferFuture( - meta=TransferMeta(call_args), - coordinator=self.transfer_coordinator + meta=TransferMeta(call_args), coordinator=self.transfer_coordinator ) @@ -329,8 +334,8 @@ def setUp(self): 1, { IN_MEMORY_UPLOAD_TAG: TaskSemaphore(10), - IN_MEMORY_DOWNLOAD_TAG: SlidingWindowSemaphore(10) - } + IN_MEMORY_DOWNLOAD_TAG: SlidingWindowSemaphore(10), + }, ) def tearDown(self): @@ -345,6 +350,7 @@ class BaseGeneralInterfaceTest(StubbedClientTest): the various tests that all TransferManager method must pass from a functionality standpoint. """ + __test__ = False def manager(self): @@ -363,7 +369,8 @@ def create_call_kwargs(self): def create_invalid_extra_args(self): """A value for extra_args that will cause validation errors""" raise NotImplementedError( - 'create_invalid_extra_args is not implemented') + 'create_invalid_extra_args is not implemented' + ) def create_stubbed_responses(self): """A list of stubbed responses that will cause the request to succeed @@ -374,7 +381,8 @@ def create_stubbed_responses(self): [{'method': 'put_object', 'service_response': {}}] """ raise NotImplementedError( - 'create_stubbed_responses is not implemented') + 'create_stubbed_responses is not implemented' + ) def create_expected_progress_callback_info(self): """A list of kwargs expected to be passed to each progress callback @@ -394,7 +402,8 @@ def create_expected_progress_callback_info(self): values. """ raise NotImplementedError( - 'create_expected_progress_callback_info is not implemented') + 'create_expected_progress_callback_info is not implemented' + ) def _setup_default_stubbed_responses(self): for stubbed_response in self.create_stubbed_responses(): @@ -451,7 +460,7 @@ def test_invalid_extra_args(self): with self.assertRaisesRegex(ValueError, 'Invalid extra_args'): self.method( extra_args=self.create_invalid_extra_args(), - **self.create_call_kwargs() + **self.create_call_kwargs(), ) def test_for_callback_kwargs_correctness(self): @@ -460,7 +469,8 @@ def test_for_callback_kwargs_correctness(self): subscriber = RecordingSubscriber() future = self.method( - subscribers=[subscriber], **self.create_call_kwargs()) + subscribers=[subscriber], **self.create_call_kwargs() + ) # We call shutdown instead of result on future because the future # could be finished but the done callback could still be going. # The manager's shutdown method ensures everything completes.