From 68b0e69de5b9f4602b6afc5e7c99fd961481f6ba Mon Sep 17 00:00:00 2001 From: Stephen Crowe <6042774+crowecawcaw@users.noreply.github.com> Date: Fri, 21 Nov 2025 10:31:59 -0800 Subject: [PATCH 1/4] Avoid HEAD requests for downloads --- s3transfer/download.py | 302 +++++++++++++++++------------- tests/functional/test_download.py | 170 +++++++++-------- tests/unit/test_download.py | 24 ++- 3 files changed, 283 insertions(+), 213 deletions(-) diff --git a/s3transfer/download.py b/s3transfer/download.py index f6646cd8..14db7a72 100644 --- a/s3transfer/download.py +++ b/s3transfer/download.py @@ -352,54 +352,22 @@ def _submit( :param bandwidth_limiter: The bandwidth limiter to use when downloading streams """ - if ( - transfer_future.meta.size is None - or transfer_future.meta.etag is None - ): - response = client.head_object( - Bucket=transfer_future.meta.call_args.bucket, - Key=transfer_future.meta.call_args.key, - **transfer_future.meta.call_args.extra_args, - ) - # If a size was not provided figure out the size for the - # user. - transfer_future.meta.provide_transfer_size( - response['ContentLength'] - ) - # Provide an etag to ensure a stored object is not modified - # during a multipart download. - transfer_future.meta.provide_object_etag(response.get('ETag')) - download_output_manager = self._get_download_output_manager_cls( transfer_future, osutil )(osutil, self._transfer_coordinator, io_executor) - # If it is greater than threshold do a ranged download, otherwise - # do a regular GetObject download. - if transfer_future.meta.size < config.multipart_threshold: - self._submit_download_request( - client, - config, - osutil, - request_executor, - io_executor, - download_output_manager, - transfer_future, - bandwidth_limiter, - ) - else: - self._submit_ranged_download_request( - client, - config, - osutil, - request_executor, - io_executor, - download_output_manager, - transfer_future, - bandwidth_limiter, - ) + self._submit_first_chunk_request( + client, + config, + osutil, + request_executor, + io_executor, + download_output_manager, + transfer_future, + bandwidth_limiter, + ) - def _submit_download_request( + def _submit_first_chunk_request( self, client, config, @@ -424,123 +392,182 @@ def _submit_download_request( # Get any associated tags for the get object task. get_object_tag = download_output_manager.get_download_task_tag() - # Get the final io task to run once the download is complete. - final_task = download_output_manager.get_final_io_task() + # Request first chunk to get object metadata from response headers + chunk_size = config.multipart_chunksize + extra_args = dict(call_args.extra_args) + extra_args['Range'] = f'bytes=0-{chunk_size - 1}' + + # Callback will determine if additional chunks are needed based on + # the Content-Range header in the response + on_done_callback = GetObjectOnDoneCallback( + transfer_future, + download_output_manager, + io_executor, + self._transfer_coordinator, + client, + config, + request_executor, + bandwidth_limiter, + fileobj, + progress_callbacks, + get_object_tag, + ) + + task = GetObjectTask( + transfer_coordinator=self._transfer_coordinator, + main_kwargs={ + 'client': client, + 'bucket': call_args.bucket, + 'key': call_args.key, + 'fileobj': fileobj, + 'extra_args': extra_args, + 'callbacks': progress_callbacks, + 'max_attempts': config.num_download_attempts, + 'start_index': 0, + 'download_output_manager': download_output_manager, + 'io_chunksize': config.io_chunksize, + 'bandwidth_limiter': bandwidth_limiter, + }, + done_callbacks=[on_done_callback], + ) + on_done_callback.set_task(task) - # Submit the task to download the object. self._transfer_coordinator.submit( request_executor, - ImmediatelyWriteIOGetObjectTask( - transfer_coordinator=self._transfer_coordinator, - main_kwargs={ - 'client': client, - 'bucket': call_args.bucket, - 'key': call_args.key, - 'fileobj': fileobj, - 'extra_args': call_args.extra_args, - 'callbacks': progress_callbacks, - 'max_attempts': config.num_download_attempts, - 'download_output_manager': download_output_manager, - 'io_chunksize': config.io_chunksize, - 'bandwidth_limiter': bandwidth_limiter, - }, - done_callbacks=[final_task], - ), + task, tag=get_object_tag, ) - def _submit_ranged_download_request( + +class GetObjectOnDoneCallback: + def __init__( self, + transfer_future, + download_output_manager, + io_executor, + transfer_coordinator, client, config, - osutil, request_executor, - io_executor, - download_output_manager, - transfer_future, bandwidth_limiter, + fileobj, + progress_callbacks, + get_object_tag, ): - call_args = transfer_future.meta.call_args + self._transfer_future = transfer_future + self._download_output_manager = download_output_manager + self._io_executor = io_executor + self._transfer_coordinator = transfer_coordinator + self._client = client + self._config = config + self._request_executor = request_executor + self._bandwidth_limiter = bandwidth_limiter + self._fileobj = fileobj + self._progress_callbacks = progress_callbacks + self._get_object_tag = get_object_tag + self._task = None + + def __call__(self): + # Always check if we have a task and response first + if not self._task: + # No task means we can't proceed, but we still need to signal completion + final_task = self._download_output_manager.get_final_io_task() + self._transfer_coordinator.submit(self._io_executor, final_task) + return - # Get the needed progress callbacks for the task - progress_callbacks = get_callbacks(transfer_future, 'progress') + response = self._task.get_response() + if not response: + # No response means the GET failed or was cancelled + # Still need to submit final task to signal completion + final_task = self._download_output_manager.get_final_io_task() + self._transfer_coordinator.submit(self._io_executor, final_task) + return - # Get a handle to the file that will be used for writing downloaded - # contents - fileobj = download_output_manager.get_fileobj_for_io_writes( - transfer_future - ) + # If transfer is already done (cancelled/failed), don't schedule more work + # but still submit the final task + if self._transfer_coordinator.done(): + final_task = self._download_output_manager.get_final_io_task() + self._transfer_coordinator.submit(self._io_executor, final_task) + return - # Determine the number of parts - part_size = config.multipart_chunksize - num_parts = calculate_num_parts(transfer_future.meta.size, part_size) + size, etag = self._extract_metadata(response) + self._transfer_future.meta.provide_transfer_size(size) + self._transfer_future.meta.provide_object_etag(etag) - # Get any associated tags for the get object task. - get_object_tag = download_output_manager.get_download_task_tag() + chunk_size = self._config.multipart_chunksize + if size > chunk_size: + self._schedule_remaining_chunks(size, etag) + else: + final_task = self._download_output_manager.get_final_io_task() + self._transfer_coordinator.submit(self._io_executor, final_task) + + def set_task(self, task): + self._task = task + + def _extract_metadata(self, response): + content_range = response.get('ContentRange') + if content_range: + # Content-Range format: 'bytes 0-8388607/39542919' + # Extract total size from the part after the slash + size = int(content_range.split('/')[-1]) + else: + size = response['ContentLength'] + etag = response.get('ETag') + return size, etag + + def _schedule_remaining_chunks(self, size, etag): + call_args = self._transfer_future.meta.call_args + part_size = self._config.multipart_chunksize + num_parts = calculate_num_parts(size, part_size) # Callback invoker to submit the final io task once all downloads # are complete. + final_task = self._download_output_manager.get_final_io_task() finalize_download_invoker = CountCallbackInvoker( - self._get_final_io_task_submission_callback( - download_output_manager, io_executor + FunctionContainer( + self._transfer_coordinator.submit, + self._io_executor, + final_task, ) ) - for i in range(num_parts): - # Calculate the range parameter + + # Start from 1 since chunk 0 was already requested + for i in range(1, num_parts): range_parameter = calculate_range_parameter( part_size, i, num_parts ) - - # Inject extra parameters to be passed in as extra args extra_args = { 'Range': range_parameter, } - if transfer_future.meta.etag is not None: - extra_args['IfMatch'] = transfer_future.meta.etag + # Use IfMatch to ensure object hasn't changed during download + if etag is not None: + extra_args['IfMatch'] = etag extra_args.update(call_args.extra_args) finalize_download_invoker.increment() - # Submit the ranged downloads + self._transfer_coordinator.submit( - request_executor, + self._request_executor, GetObjectTask( transfer_coordinator=self._transfer_coordinator, main_kwargs={ - 'client': client, + 'client': self._client, 'bucket': call_args.bucket, 'key': call_args.key, - 'fileobj': fileobj, + 'fileobj': self._fileobj, 'extra_args': extra_args, - 'callbacks': progress_callbacks, - 'max_attempts': config.num_download_attempts, + 'callbacks': self._progress_callbacks, + 'max_attempts': self._config.num_download_attempts, 'start_index': i * part_size, - 'download_output_manager': download_output_manager, - 'io_chunksize': config.io_chunksize, - 'bandwidth_limiter': bandwidth_limiter, + 'download_output_manager': self._download_output_manager, + 'io_chunksize': self._config.io_chunksize, + 'bandwidth_limiter': self._bandwidth_limiter, }, done_callbacks=[finalize_download_invoker.decrement], ), - tag=get_object_tag, + tag=self._get_object_tag, ) finalize_download_invoker.finalize() - def _get_final_io_task_submission_callback( - self, download_manager, io_executor - ): - final_task = download_manager.get_final_io_task() - return FunctionContainer( - self._transfer_coordinator.submit, io_executor, final_task - ) - - def _calculate_range_param(self, part_size, part_index, num_parts): - # Used to calculate the Range parameter - start_range = part_index * part_size - if part_index == num_parts - 1: - end_range = '' - else: - end_range = start_range + part_size - 1 - range_param = f'bytes={start_range}-{end_range}' - return range_param - class GetObjectTask(Task): def _main( @@ -582,6 +609,9 @@ def _main( response = client.get_object( Bucket=bucket, Key=key, **extra_args ) + # Store response so callback can extract metadata + self._response = response + self._validate_content_range( extra_args.get('Range'), response.get('ContentRange'), @@ -643,6 +673,9 @@ def _main( def _handle_io(self, download_output_manager, fileobj, chunk, index): download_output_manager.queue_file_io_task(fileobj, chunk, index) + def get_response(self): + return getattr(self, '_response', None) + def _validate_content_range(self, requested_range, content_range): if not requested_range or not content_range: return @@ -650,18 +683,33 @@ def _validate_content_range(self, requested_range, content_range): # where `0-8388607` is the fetched range and `39542919` is # the total object size. response_range, total_size = content_range.split('/') - # Subtract `1` because range is 0-indexed. - final_byte = str(int(total_size) - 1) - # If it's the last part, the requested range will not include - # the final byte, eg `bytes=33554432-`. - if requested_range.endswith('-'): - requested_range += final_byte - # Request looks like `bytes=0-8388607`. - # Parsed response looks like `bytes 0-8388607`. - if requested_range[6:] != response_range[6:]: + # Parse requested range: `bytes=0-8388607` -> start=0, end=8388607 + req_range_part = requested_range[6:] # Remove 'bytes=' + if '-' not in req_range_part: + return + req_start, req_end = req_range_part.split('-', 1) + req_start = int(req_start) + # req_end might be empty for open-ended ranges + req_end = int(req_end) if req_end else int(total_size) - 1 + + # Parse response range: `bytes 0-8388607` -> start=0, end=8388607 + resp_range_part = response_range[6:] # Remove 'bytes ' + resp_start, resp_end = resp_range_part.split('-', 1) + resp_start = int(resp_start) + resp_end = int(resp_end) + + # Validate that response starts where we requested + if resp_start != req_start: + raise S3ValidationError( + f"Response range start `{resp_start}` does not match " + f"requested start `{req_start}`" + ) + + # Validate that response doesn't exceed what we requested + if resp_end > req_end: raise S3ValidationError( - f"Requested range: `{requested_range[6:]}` does not match " - f"content range in response: `{response_range[6:]}`" + f"Response range end `{resp_end}` exceeds " + f"requested end `{req_end}`" ) diff --git a/tests/functional/test_download.py b/tests/functional/test_download.py index b8d07427..c637611f 100644 --- a/tests/functional/test_download.py +++ b/tests/functional/test_download.py @@ -89,16 +89,14 @@ def create_stubbed_responses(self): self.stream.seek(0) return [ { - 'method': 'head_object', + 'method': 'get_object', 'service_response': { + 'Body': self.stream, 'ContentLength': len(self.content), + 'ContentRange': f'bytes 0-{len(self.content) - 1}/{len(self.content)}', 'ETag': self.etag, }, }, - { - 'method': 'get_object', - 'service_response': {'Body': self.stream}, - }, ] def create_expected_progress_callback_info(self): @@ -116,20 +114,29 @@ def add_successful_get_object_responses( self, expected_params=None, expected_ranges=None, extras=None ): # Add all get_object responses needed to complete the download. - # Should account for both ranged and nonranged downloads. - for i, stubbed_response in enumerate( - self.create_stubbed_responses()[1:] - ): + for i, stubbed_response in enumerate(self.create_stubbed_responses()): if expected_params: stubbed_response['expected_params'] = copy.deepcopy( expected_params ) - if expected_ranges: - stubbed_response['expected_params']['Range'] = ( - expected_ranges[i] - ) - if extras: - stubbed_response['service_response'].update(extras[i]) + # Remove IfMatch from first chunk since we get ETag from it + if i == 0 and 'IfMatch' in stubbed_response['expected_params']: + del stubbed_response['expected_params']['IfMatch'] + + if expected_ranges: + if 'expected_params' not in stubbed_response: + stubbed_response['expected_params'] = {} + stubbed_response['expected_params']['Range'] = expected_ranges[ + i + ] + + if extras: + for key, value in extras[i].items(): + if value is None: + # Remove the key if value is None + stubbed_response['service_response'].pop(key, None) + else: + stubbed_response['service_response'][key] = value self.stubber.add_response(**stubbed_response) def add_n_retryable_get_object_responses(self, n, num_reads=0): @@ -144,7 +151,6 @@ def add_n_retryable_get_object_responses(self, n, num_reads=0): ) def test_download_temporary_file_does_not_exist(self): - self.add_head_object_response() self.add_successful_get_object_responses() future = self.manager.download(**self.create_call_kwargs()) @@ -156,7 +162,6 @@ def test_download_temporary_file_does_not_exist(self): self.assertEqual(possible_matches, []) def test_download_for_fileobj(self): - self.add_head_object_response() self.add_successful_get_object_responses() with open(self.filename, 'wb') as f: @@ -170,7 +175,6 @@ def test_download_for_fileobj(self): self.assertEqual(self.content, f.read()) def test_download_for_seekable_filelike_obj(self): - self.add_head_object_response() self.add_successful_get_object_responses() # Create a file-like object to test. In this case, it is a BytesIO @@ -187,7 +191,6 @@ def test_download_for_seekable_filelike_obj(self): self.assertEqual(self.content, bytes_io.read()) def test_download_for_nonseekable_filelike_obj(self): - self.add_head_object_response() self.add_successful_get_object_responses() with open(self.filename, 'wb') as f: @@ -201,8 +204,6 @@ def test_download_for_nonseekable_filelike_obj(self): self.assertEqual(self.content, f.read()) def test_download_cleanup_on_failure(self): - self.add_head_object_response() - # Throw an error on the download self.stubber.add_client_error('get_object') @@ -216,7 +217,6 @@ def test_download_cleanup_on_failure(self): self.assertEqual(possible_matches, []) def test_download_with_nonexistent_directory(self): - self.add_head_object_response() self.add_successful_get_object_responses() call_kwargs = self.create_call_kwargs() @@ -228,7 +228,6 @@ def test_download_with_nonexistent_directory(self): future.result() def test_retries_and_succeeds(self): - self.add_head_object_response() # Insert a response that will trigger a retry. self.add_n_retryable_get_object_responses(1) # Add the normal responses to simulate the download proceeding @@ -245,8 +244,6 @@ def test_retries_and_succeeds(self): self.assertEqual(self.content, f.read()) def test_retry_failure(self): - self.add_head_object_response() - max_retries = 3 self.config.num_download_attempts = max_retries self._manager = TransferManager(self.client, self.config) @@ -263,7 +260,6 @@ def test_retry_failure(self): self.stubber.assert_no_pending_responses() def test_retry_rewinds_callbacks(self): - self.add_head_object_response() # Insert a response that will trigger a retry after one read of the # stream has been made. self.add_n_retryable_get_object_responses(1, num_reads=1) @@ -324,7 +320,6 @@ def test_uses_provided_osutil(self): # Use the recording os utility for the transfer manager self._manager = TransferManager(self.client, self.config, osutil) - self.add_head_object_response() self.add_successful_get_object_responses() future = self.manager.download(**self.create_call_kwargs()) @@ -340,7 +335,6 @@ def test_uses_provided_osutil(self): 'A separate thread is needed to read from the fifo' ) def test_download_for_fifo_file(self): - self.add_head_object_response() self.add_successful_get_object_responses() # Create the fifo file @@ -382,8 +376,8 @@ def test_download(self): 'Bucket': self.bucket, 'Key': self.key, 'RequestPayer': 'requester', + 'Range': 'bytes=0-8388607', } - self.add_head_object_response(expected_params) self.add_successful_get_object_responses(expected_params) future = self.manager.download( self.bucket, self.key, self.filename, self.extra_args @@ -400,8 +394,8 @@ def test_download_with_checksum_enabled(self): 'Bucket': self.bucket, 'Key': self.key, 'ChecksumMode': 'ENABLED', + 'Range': 'bytes=0-8388607', } - self.add_head_object_response(expected_params) self.add_successful_get_object_responses(expected_params) future = self.manager.download( self.bucket, self.key, self.filename, self.extra_args @@ -420,7 +414,6 @@ def test_allowed_copy_params_are_valid(self): def test_download_empty_object(self): self.content = b'' self.stream = BytesIO(self.content) - self.add_head_object_response() self.add_successful_get_object_responses() future = self.manager.download( self.bucket, self.key, self.filename, self.extra_args @@ -439,7 +432,6 @@ def test_uses_bandwidth_limiter(self): ) self._manager = TransferManager(self.client, self.config) - self.add_head_object_response() self.add_successful_get_object_responses() start = time.time() @@ -482,23 +474,31 @@ def setUp(self): def create_stubbed_responses(self): return [ { - 'method': 'head_object', + 'method': 'get_object', 'service_response': { - 'ContentLength': len(self.content), + 'Body': BytesIO(self.content[0:4]), + 'ContentLength': 4, + 'ContentRange': 'bytes 0-3/10', 'ETag': self.etag, }, }, { 'method': 'get_object', - 'service_response': {'Body': BytesIO(self.content[0:4])}, - }, - { - 'method': 'get_object', - 'service_response': {'Body': BytesIO(self.content[4:8])}, + 'service_response': { + 'Body': BytesIO(self.content[4:8]), + 'ContentLength': 4, + 'ContentRange': 'bytes 4-7/10', + 'ETag': self.etag, + }, }, { 'method': 'get_object', - 'service_response': {'Body': BytesIO(self.content[8:])}, + 'service_response': { + 'Body': BytesIO(self.content[8:]), + 'ContentLength': 2, + 'ContentRange': 'bytes 8-9/10', + 'ETag': self.etag, + }, }, ] @@ -518,12 +518,22 @@ def test_download(self): } expected_ranges = ['bytes=0-3', 'bytes=4-7', 'bytes=8-'] stubbed_ranges = ['bytes 0-3/10', 'bytes 4-7/10', 'bytes 8-9/10'] - self.add_head_object_response(expected_params) - self.add_successful_get_object_responses( - {**expected_params, 'IfMatch': self.etag}, - expected_ranges, - [{"ContentRange": r} for r in stubbed_ranges], - ) + + # First chunk doesn't have IfMatch, subsequent chunks do + for i, (range_val, content_range) in enumerate( + zip(expected_ranges, stubbed_ranges) + ): + params = copy.deepcopy(expected_params) + params['Range'] = range_val + if i > 0: + params['IfMatch'] = self.etag + + stubbed_response = self.create_stubbed_responses()[i] + stubbed_response['expected_params'] = params + stubbed_response['service_response']['ContentRange'] = ( + content_range + ) + self.stubber.add_response(**stubbed_response) future = self.manager.download( self.bucket, self.key, self.filename, self.extra_args @@ -542,10 +552,17 @@ def test_download_with_checksum_enabled(self): 'ChecksumMode': 'ENABLED', } expected_ranges = ['bytes=0-3', 'bytes=4-7', 'bytes=8-'] - self.add_head_object_response(expected_params) - self.add_successful_get_object_responses( - {**expected_params, 'IfMatch': self.etag}, expected_ranges - ) + + # First chunk doesn't have IfMatch, subsequent chunks do + for i, range_val in enumerate(expected_ranges): + params = copy.deepcopy(expected_params) + params['Range'] = range_val + if i > 0: + params['IfMatch'] = self.etag + + stubbed_response = self.create_stubbed_responses()[i] + stubbed_response['expected_params'] = params + self.stubber.add_response(**stubbed_response) future = self.manager.download( self.bucket, self.key, self.filename, self.extra_args @@ -564,19 +581,29 @@ def test_download_raises_if_content_range_mismatch(self): expected_ranges = ['bytes=0-3', 'bytes=4-7', 'bytes=8-'] # Note that the final retrieved range should be `bytes 8-9/10`. stubbed_ranges = ['bytes 0-3/10', 'bytes 4-7/10', 'bytes 7-8/10'] - self.add_head_object_response(expected_params) - self.add_successful_get_object_responses( - {**expected_params, 'IfMatch': self.etag}, - expected_ranges, - [{"ContentRange": r} for r in stubbed_ranges], - ) + + # First chunk doesn't have IfMatch, subsequent chunks do + for i, (range_val, content_range) in enumerate( + zip(expected_ranges, stubbed_ranges) + ): + params = copy.deepcopy(expected_params) + params['Range'] = range_val + if i > 0: + params['IfMatch'] = self.etag + + stubbed_response = self.create_stubbed_responses()[i] + stubbed_response['expected_params'] = params + stubbed_response['service_response']['ContentRange'] = ( + content_range + ) + self.stubber.add_response(**stubbed_response) future = self.manager.download( self.bucket, self.key, self.filename, self.extra_args ) with self.assertRaises(S3ValidationError) as e: future.result() - self.assertIn('does not match content range', str(e.exception)) + self.assertIn('does not match requested', str(e.exception)) def test_download_raises_if_etag_validation_fails(self): expected_params = { @@ -584,16 +611,16 @@ def test_download_raises_if_etag_validation_fails(self): 'Key': self.key, } expected_ranges = ['bytes=0-3', 'bytes=4-7'] - self.add_head_object_response(expected_params) # Add successful GetObject responses for the first 2 requests. - for i, stubbed_response in enumerate( - self.create_stubbed_responses()[1:3] - ): - stubbed_response['expected_params'] = copy.deepcopy( - {**expected_params, 'IfMatch': self.etag} - ) - stubbed_response['expected_params']['Range'] = expected_ranges[i] + for i, range_val in enumerate(expected_ranges): + params = copy.deepcopy(expected_params) + params['Range'] = range_val + if i > 0: + params['IfMatch'] = self.etag + + stubbed_response = self.create_stubbed_responses()[i] + stubbed_response['expected_params'] = params self.stubber.add_response(**stubbed_response) # Simulate ETag validation failure by adding a @@ -624,19 +651,10 @@ def test_download_without_etag(self): } expected_ranges = ['bytes=0-3', 'bytes=4-7', 'bytes=8-'] - # Stub HeadObject response with no ETag - head_object_response = { - 'method': 'head_object', - 'service_response': { - 'ContentLength': len(self.content), - }, - 'expected_params': expected_params, - } - self.stubber.add_response(**head_object_response) - + # Stub GET responses with no ETag # This asserts that IfMatch isn't in the GetObject requests. self.add_successful_get_object_responses( - expected_params, expected_ranges + expected_params, expected_ranges, [{'ETag': None}] * 3 ) future = self.manager.download( diff --git a/tests/unit/test_download.py b/tests/unit/test_download.py index f042b677..9ba0c933 100644 --- a/tests/unit/test_download.py +++ b/tests/unit/test_download.py @@ -462,13 +462,23 @@ def add_head_object_response(self): def add_get_responses(self): chunksize = self.config.multipart_chunksize - for i in range(0, len(self.content), chunksize): - if i + chunksize > len(self.content): + total_size = len(self.content) + for i in range(0, total_size, chunksize): + if i + chunksize > total_size: stream = BytesIO(self.content[i:]) - self.stubber.add_response('get_object', {'Body': stream}) + end = total_size - 1 else: stream = BytesIO(self.content[i : i + chunksize]) - self.stubber.add_response('get_object', {'Body': stream}) + end = i + chunksize - 1 + + response = { + 'Body': stream, + 'ContentLength': len(self.content[i : i + chunksize]), + 'ETag': self.etag, + } + if i == 0: + response['ContentRange'] = f'bytes {i}-{end}/{total_size}' + self.stubber.add_response('get_object', response) def configure_for_ranged_get(self): self.config.multipart_threshold = 1 @@ -486,7 +496,6 @@ def wait_and_assert_completed_successfully(self, submission_task): def test_submits_no_tag_for_get_object_filename(self): self.wrap_executor_in_recorder() - self.add_head_object_response() self.add_get_responses() self.submission_task = self.get_download_submission_task() @@ -499,7 +508,6 @@ def test_submits_no_tag_for_get_object_filename(self): def test_submits_no_tag_for_ranged_get_filename(self): self.wrap_executor_in_recorder() self.configure_for_ranged_get() - self.add_head_object_response() self.add_get_responses() self.submission_task = self.get_download_submission_task() @@ -511,7 +519,6 @@ def test_submits_no_tag_for_ranged_get_filename(self): def test_submits_no_tag_for_get_object_fileobj(self): self.wrap_executor_in_recorder() - self.add_head_object_response() self.add_get_responses() with open(self.filename, 'wb') as f: @@ -526,7 +533,6 @@ def test_submits_no_tag_for_get_object_fileobj(self): def test_submits_no_tag_for_ranged_get_object_fileobj(self): self.wrap_executor_in_recorder() self.configure_for_ranged_get() - self.add_head_object_response() self.add_get_responses() with open(self.filename, 'wb') as f: @@ -540,7 +546,6 @@ def test_submits_no_tag_for_ranged_get_object_fileobj(self): def tests_submits_tag_for_get_object_nonseekable_fileobj(self): self.wrap_executor_in_recorder() - self.add_head_object_response() self.add_get_responses() with open(self.filename, 'wb') as f: @@ -555,7 +560,6 @@ def tests_submits_tag_for_get_object_nonseekable_fileobj(self): def tests_submits_tag_for_ranged_get_object_nonseekable_fileobj(self): self.wrap_executor_in_recorder() self.configure_for_ranged_get() - self.add_head_object_response() self.add_get_responses() with open(self.filename, 'wb') as f: From c138cd00320c9525f77fb295e3d2019690346a9c Mon Sep 17 00:00:00 2001 From: Stephen Crowe <6042774+crowecawcaw@users.noreply.github.com> Date: Fri, 21 Nov 2025 13:52:45 -0800 Subject: [PATCH 2/4] Add benchmarking script for small files --- scripts/performance/time-batch-download.py | 60 ++++++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100755 scripts/performance/time-batch-download.py diff --git a/scripts/performance/time-batch-download.py b/scripts/performance/time-batch-download.py new file mode 100755 index 00000000..93e8a2a4 --- /dev/null +++ b/scripts/performance/time-batch-download.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python +"""Direct timing of batch downloads without shell wrapper.""" + +import argparse +import tempfile +import shutil +import time +from botocore.session import get_session +from s3transfer.manager import TransferManager + + +def create_file(filename, file_size): + with open(filename, 'wb') as f: + f.write(b'a' * file_size) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('--file-count', type=int, required=True) + parser.add_argument('--file-size', type=int, required=True) + parser.add_argument('--s3-bucket', required=True) + args = parser.parse_args() + + session = get_session() + client = session.create_client('s3') + + tempdir = tempfile.mkdtemp() + s3_keys = [] + + try: + # Upload files + print(f"Uploading {args.file_count} files...") + with TransferManager(client) as manager: + for i in range(args.file_count): + file_path = f"{tempdir}/upload_{i}" + create_file(file_path, args.file_size) + s3_key = f"perf_test_{i}" + manager.upload(file_path, args.s3_bucket, s3_key) + s3_keys.append(s3_key) + + # Download files + print(f"Downloading {args.file_count} files...") + start_time = time.time() + with TransferManager(client) as manager: + for i, s3_key in enumerate(s3_keys): + download_path = f"{tempdir}/download_{i}" + manager.download(args.s3_bucket, s3_key, download_path) + duration = time.time() - start_time + + print(f"Download duration: {duration:.2f} seconds") + + # Cleanup + for s3_key in s3_keys: + client.delete_object(Bucket=args.s3_bucket, Key=s3_key) + finally: + shutil.rmtree(tempdir) + + +if __name__ == '__main__': + main() From fa197033254647d68c68c07ac09ace4b45cfe32c Mon Sep 17 00:00:00 2001 From: Stephen Crowe <6042774+crowecawcaw@users.noreply.github.com> Date: Fri, 6 Mar 2026 08:01:28 -0800 Subject: [PATCH 3/4] address PR notes Signed-off-by: Stephen Crowe <6042774+crowecawcaw@users.noreply.github.com> --- s3transfer/download.py | 7 +++-- tests/functional/test_download.py | 47 +++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/s3transfer/download.py b/s3transfer/download.py index 14db7a72..b5d4356c 100644 --- a/s3transfer/download.py +++ b/s3transfer/download.py @@ -397,9 +397,12 @@ def _submit_first_chunk_request( extra_args = dict(call_args.extra_args) extra_args['Range'] = f'bytes=0-{chunk_size - 1}' + if transfer_future.meta.etag is not None: + extra_args['IfMatch'] = transfer_future.meta.etag + # Callback will determine if additional chunks are needed based on # the Content-Range header in the response - on_done_callback = GetObjectOnDoneCallback( + on_done_callback = GetObjectFirstChunkOnDoneCallback( transfer_future, download_output_manager, io_executor, @@ -439,7 +442,7 @@ def _submit_first_chunk_request( ) -class GetObjectOnDoneCallback: +class GetObjectFirstChunkOnDoneCallback: def __init__( self, transfer_future, diff --git a/tests/functional/test_download.py b/tests/functional/test_download.py index c637611f..538dfcc4 100644 --- a/tests/functional/test_download.py +++ b/tests/functional/test_download.py @@ -411,6 +411,31 @@ def test_allowed_copy_params_are_valid(self): for allowed_upload_arg in self._manager.ALLOWED_DOWNLOAD_ARGS: self.assertIn(allowed_upload_arg, op_model.input_shape.members) + def test_first_chunk_includes_ifmatch_when_etag_provided(self): + self.stubber.add_response( + method='get_object', + service_response={ + 'Body': self.stream, + 'ContentLength': len(self.content), + 'ContentRange': f'bytes 0-{len(self.content) - 1}/{len(self.content)}', + 'ETag': self.etag, + }, + expected_params={ + 'Bucket': self.bucket, + 'Key': self.key, + 'Range': 'bytes=0-8388607', + 'IfMatch': self.etag, + }, + ) + + call_kwargs = self.create_call_kwargs() + call_kwargs['subscribers'] = [ETagProvider(self.etag)] + + future = self.manager.download(**call_kwargs) + future.result() + + self.stubber.assert_no_pending_responses() + def test_download_empty_object(self): self.content = b'' self.stream = BytesIO(self.content) @@ -665,3 +690,25 @@ def test_download_without_etag(self): # Ensure that the contents are correct with open(self.filename, 'rb') as f: self.assertEqual(self.content, f.read()) + + def test_first_chunk_includes_ifmatch_when_etag_provided(self): + expected_ranges = ['bytes=0-3', 'bytes=4-7', 'bytes=8-'] + + for i, range_val in enumerate(expected_ranges): + params = { + 'Bucket': self.bucket, + 'Key': self.key, + 'Range': range_val, + 'IfMatch': self.etag, + } + stubbed_response = self.create_stubbed_responses()[i] + stubbed_response['expected_params'] = params + self.stubber.add_response(**stubbed_response) + + call_kwargs = self.create_call_kwargs() + call_kwargs['subscribers'] = [ETagProvider(self.etag)] + + future = self.manager.download(**call_kwargs) + future.result() + + self.stubber.assert_no_pending_responses() From e4c77d5efae08205b6b02898d60aff975fc7e94c Mon Sep 17 00:00:00 2001 From: Stephen Crowe <6042774+crowecawcaw@users.noreply.github.com> Date: Fri, 6 Mar 2026 08:18:14 -0800 Subject: [PATCH 4/4] handle 0 range files Signed-off-by: Stephen Crowe <6042774+crowecawcaw@users.noreply.github.com> --- s3transfer/download.py | 13 +++++++++++++ tests/functional/test_download.py | 12 +++++++++--- tests/integration/test_download.py | 19 +++++++++++++++++++ 3 files changed, 41 insertions(+), 3 deletions(-) diff --git a/s3transfer/download.py b/s3transfer/download.py index b5d4356c..a313feae 100644 --- a/s3transfer/download.py +++ b/s3transfer/download.py @@ -497,6 +497,12 @@ def __call__(self): self._transfer_future.meta.provide_transfer_size(size) self._transfer_future.meta.provide_object_etag(etag) + if size == 0: + # Force-open the DeferredOpenFile so the temp file exists + # on disk for IORenameFileTask. Without this, the deferred + # file is never opened since no bytes are written. + self._fileobj.write(b'') + chunk_size = self._config.multipart_chunksize if size > chunk_size: self._schedule_remaining_chunks(size, etag) @@ -652,6 +658,13 @@ def _main( f'Contents of stored object "{key}" in bucket ' f'"{bucket}" did not match expected ETag.' ) + elif error_code == "InvalidRange": + self._response = { + 'ContentLength': 0, + 'ContentRange': None, + 'ETag': None, + } + return else: raise except S3_RETRYABLE_DOWNLOAD_ERRORS as e: diff --git a/tests/functional/test_download.py b/tests/functional/test_download.py index 538dfcc4..dba12a25 100644 --- a/tests/functional/test_download.py +++ b/tests/functional/test_download.py @@ -437,9 +437,14 @@ def test_first_chunk_includes_ifmatch_when_etag_provided(self): self.stubber.assert_no_pending_responses() def test_download_empty_object(self): - self.content = b'' - self.stream = BytesIO(self.content) - self.add_successful_get_object_responses() + # Real S3 returns InvalidRange when a ranged GET is made on a + # 0-byte object since no byte range can be satisfied. + self.stubber.add_client_error( + method='get_object', + service_error_code='InvalidRange', + service_message='The requested range is not satisfiable', + http_status_code=416, + ) future = self.manager.download( self.bucket, self.key, self.filename, self.extra_args ) @@ -448,6 +453,7 @@ def test_download_empty_object(self): # Ensure that the empty file exists with open(self.filename, 'rb') as f: self.assertEqual(b'', f.read()) + self.assertEqual(future.meta.size, 0) def test_uses_bandwidth_limiter(self): self.content = b'a' * 1024 * 1024 diff --git a/tests/integration/test_download.py b/tests/integration/test_download.py index da08514e..2b338f23 100644 --- a/tests/integration/test_download.py +++ b/tests/integration/test_download.py @@ -38,6 +38,25 @@ def setUp(self): multipart_threshold=self.multipart_threshold ) + def test_download_empty_object(self): + transfer_manager = self.create_transfer_manager(self.config) + + # Upload a 0-byte object + self.client.put_object( + Bucket=self.bucket_name, Key='empty.txt', Body=b'' + ) + self.addCleanup(self.delete_object, 'empty.txt') + + download_path = os.path.join(self.files.rootdir, 'empty.txt') + future = transfer_manager.download( + self.bucket_name, 'empty.txt', download_path + ) + future.result() + + with open(download_path, 'rb') as f: + self.assertEqual(b'', f.read()) + self.assertEqual(future.meta.size, 0) + def test_below_threshold(self): transfer_manager = self.create_transfer_manager(self.config)