Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
302 changes: 175 additions & 127 deletions s3transfer/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,54 +352,22 @@ def _submit(
:param bandwidth_limiter: The bandwidth limiter to use when
downloading streams
"""
if (
transfer_future.meta.size is None
or transfer_future.meta.etag is None
):
response = client.head_object(
Bucket=transfer_future.meta.call_args.bucket,
Key=transfer_future.meta.call_args.key,
**transfer_future.meta.call_args.extra_args,
)
# If a size was not provided figure out the size for the
# user.
transfer_future.meta.provide_transfer_size(
response['ContentLength']
)
# Provide an etag to ensure a stored object is not modified
# during a multipart download.
transfer_future.meta.provide_object_etag(response.get('ETag'))

download_output_manager = self._get_download_output_manager_cls(
transfer_future, osutil
)(osutil, self._transfer_coordinator, io_executor)

# If it is greater than threshold do a ranged download, otherwise
# do a regular GetObject download.
if transfer_future.meta.size < config.multipart_threshold:
self._submit_download_request(
client,
config,
osutil,
request_executor,
io_executor,
download_output_manager,
transfer_future,
bandwidth_limiter,
)
else:
self._submit_ranged_download_request(
client,
config,
osutil,
request_executor,
io_executor,
download_output_manager,
transfer_future,
bandwidth_limiter,
)
self._submit_first_chunk_request(
client,
config,
osutil,
request_executor,
io_executor,
download_output_manager,
transfer_future,
bandwidth_limiter,
)

def _submit_download_request(
def _submit_first_chunk_request(
self,
client,
config,
Expand All @@ -424,123 +392,182 @@ def _submit_download_request(
# Get any associated tags for the get object task.
get_object_tag = download_output_manager.get_download_task_tag()

# Get the final io task to run once the download is complete.
final_task = download_output_manager.get_final_io_task()
# Request first chunk to get object metadata from response headers
chunk_size = config.multipart_chunksize
extra_args = dict(call_args.extra_args)
extra_args['Range'] = f'bytes=0-{chunk_size - 1}'

# Callback will determine if additional chunks are needed based on
# the Content-Range header in the response
on_done_callback = GetObjectOnDoneCallback(
transfer_future,
download_output_manager,
io_executor,
self._transfer_coordinator,
client,
config,
request_executor,
bandwidth_limiter,
fileobj,
progress_callbacks,
get_object_tag,
)

task = GetObjectTask(
transfer_coordinator=self._transfer_coordinator,
main_kwargs={
'client': client,
'bucket': call_args.bucket,
'key': call_args.key,
'fileobj': fileobj,
'extra_args': extra_args,
'callbacks': progress_callbacks,
'max_attempts': config.num_download_attempts,
'start_index': 0,
'download_output_manager': download_output_manager,
'io_chunksize': config.io_chunksize,
'bandwidth_limiter': bandwidth_limiter,
},
done_callbacks=[on_done_callback],
)
on_done_callback.set_task(task)

# Submit the task to download the object.
self._transfer_coordinator.submit(
request_executor,
ImmediatelyWriteIOGetObjectTask(
transfer_coordinator=self._transfer_coordinator,
main_kwargs={
'client': client,
'bucket': call_args.bucket,
'key': call_args.key,
'fileobj': fileobj,
'extra_args': call_args.extra_args,
'callbacks': progress_callbacks,
'max_attempts': config.num_download_attempts,
'download_output_manager': download_output_manager,
'io_chunksize': config.io_chunksize,
'bandwidth_limiter': bandwidth_limiter,
},
done_callbacks=[final_task],
),
task,
tag=get_object_tag,
)

def _submit_ranged_download_request(

class GetObjectOnDoneCallback:
def __init__(
self,
transfer_future,
download_output_manager,
io_executor,
transfer_coordinator,
client,
config,
osutil,
request_executor,
io_executor,
download_output_manager,
transfer_future,
bandwidth_limiter,
fileobj,
progress_callbacks,
get_object_tag,
):
call_args = transfer_future.meta.call_args
self._transfer_future = transfer_future
self._download_output_manager = download_output_manager
self._io_executor = io_executor
self._transfer_coordinator = transfer_coordinator
self._client = client
self._config = config
self._request_executor = request_executor
self._bandwidth_limiter = bandwidth_limiter
self._fileobj = fileobj
self._progress_callbacks = progress_callbacks
self._get_object_tag = get_object_tag
self._task = None

def __call__(self):
# Always check if we have a task and response first
if not self._task:
# No task means we can't proceed, but we still need to signal completion
final_task = self._download_output_manager.get_final_io_task()
self._transfer_coordinator.submit(self._io_executor, final_task)
return

# Get the needed progress callbacks for the task
progress_callbacks = get_callbacks(transfer_future, 'progress')
response = self._task.get_response()
if not response:
# No response means the GET failed or was cancelled
# Still need to submit final task to signal completion
final_task = self._download_output_manager.get_final_io_task()
self._transfer_coordinator.submit(self._io_executor, final_task)
return

# Get a handle to the file that will be used for writing downloaded
# contents
fileobj = download_output_manager.get_fileobj_for_io_writes(
transfer_future
)
# If transfer is already done (cancelled/failed), don't schedule more work
# but still submit the final task
if self._transfer_coordinator.done():
final_task = self._download_output_manager.get_final_io_task()
self._transfer_coordinator.submit(self._io_executor, final_task)
return

# Determine the number of parts
part_size = config.multipart_chunksize
num_parts = calculate_num_parts(transfer_future.meta.size, part_size)
size, etag = self._extract_metadata(response)
self._transfer_future.meta.provide_transfer_size(size)
self._transfer_future.meta.provide_object_etag(etag)

# Get any associated tags for the get object task.
get_object_tag = download_output_manager.get_download_task_tag()
chunk_size = self._config.multipart_chunksize
if size > chunk_size:
self._schedule_remaining_chunks(size, etag)
else:
final_task = self._download_output_manager.get_final_io_task()
self._transfer_coordinator.submit(self._io_executor, final_task)

def set_task(self, task):
self._task = task

def _extract_metadata(self, response):
content_range = response.get('ContentRange')
if content_range:
# Content-Range format: 'bytes 0-8388607/39542919'
# Extract total size from the part after the slash
size = int(content_range.split('/')[-1])
else:
size = response['ContentLength']
etag = response.get('ETag')
return size, etag

def _schedule_remaining_chunks(self, size, etag):
call_args = self._transfer_future.meta.call_args
part_size = self._config.multipart_chunksize
num_parts = calculate_num_parts(size, part_size)

# Callback invoker to submit the final io task once all downloads
# are complete.
final_task = self._download_output_manager.get_final_io_task()
finalize_download_invoker = CountCallbackInvoker(
self._get_final_io_task_submission_callback(
download_output_manager, io_executor
FunctionContainer(
self._transfer_coordinator.submit,
self._io_executor,
final_task,
)
)
for i in range(num_parts):
# Calculate the range parameter

# Start from 1 since chunk 0 was already requested
for i in range(1, num_parts):
range_parameter = calculate_range_parameter(
part_size, i, num_parts
)

# Inject extra parameters to be passed in as extra args
extra_args = {
'Range': range_parameter,
}
if transfer_future.meta.etag is not None:
extra_args['IfMatch'] = transfer_future.meta.etag
# Use IfMatch to ensure object hasn't changed during download
if etag is not None:
extra_args['IfMatch'] = etag
extra_args.update(call_args.extra_args)
finalize_download_invoker.increment()
# Submit the ranged downloads

self._transfer_coordinator.submit(
request_executor,
self._request_executor,
GetObjectTask(
transfer_coordinator=self._transfer_coordinator,
main_kwargs={
'client': client,
'client': self._client,
'bucket': call_args.bucket,
'key': call_args.key,
'fileobj': fileobj,
'fileobj': self._fileobj,
'extra_args': extra_args,
'callbacks': progress_callbacks,
'max_attempts': config.num_download_attempts,
'callbacks': self._progress_callbacks,
'max_attempts': self._config.num_download_attempts,
'start_index': i * part_size,
'download_output_manager': download_output_manager,
'io_chunksize': config.io_chunksize,
'bandwidth_limiter': bandwidth_limiter,
'download_output_manager': self._download_output_manager,
'io_chunksize': self._config.io_chunksize,
'bandwidth_limiter': self._bandwidth_limiter,
},
done_callbacks=[finalize_download_invoker.decrement],
),
tag=get_object_tag,
tag=self._get_object_tag,
)
finalize_download_invoker.finalize()

def _get_final_io_task_submission_callback(
self, download_manager, io_executor
):
final_task = download_manager.get_final_io_task()
return FunctionContainer(
self._transfer_coordinator.submit, io_executor, final_task
)

def _calculate_range_param(self, part_size, part_index, num_parts):
# Used to calculate the Range parameter
start_range = part_index * part_size
if part_index == num_parts - 1:
end_range = ''
else:
end_range = start_range + part_size - 1
range_param = f'bytes={start_range}-{end_range}'
return range_param


class GetObjectTask(Task):
def _main(
Expand Down Expand Up @@ -582,6 +609,9 @@ def _main(
response = client.get_object(
Bucket=bucket, Key=key, **extra_args
)
# Store response so callback can extract metadata
self._response = response

self._validate_content_range(
extra_args.get('Range'),
response.get('ContentRange'),
Expand Down Expand Up @@ -643,25 +673,43 @@ def _main(
def _handle_io(self, download_output_manager, fileobj, chunk, index):
download_output_manager.queue_file_io_task(fileobj, chunk, index)

def get_response(self):
return getattr(self, '_response', None)

def _validate_content_range(self, requested_range, content_range):
if not requested_range or not content_range:
return
# Unparsed `ContentRange` looks like `bytes 0-8388607/39542919`,
# where `0-8388607` is the fetched range and `39542919` is
# the total object size.
response_range, total_size = content_range.split('/')
# Subtract `1` because range is 0-indexed.
final_byte = str(int(total_size) - 1)
# If it's the last part, the requested range will not include
# the final byte, eg `bytes=33554432-`.
if requested_range.endswith('-'):
requested_range += final_byte
# Request looks like `bytes=0-8388607`.
# Parsed response looks like `bytes 0-8388607`.
if requested_range[6:] != response_range[6:]:
# Parse requested range: `bytes=0-8388607` -> start=0, end=8388607
req_range_part = requested_range[6:] # Remove 'bytes='
if '-' not in req_range_part:
return
req_start, req_end = req_range_part.split('-', 1)
req_start = int(req_start)
# req_end might be empty for open-ended ranges
req_end = int(req_end) if req_end else int(total_size) - 1

# Parse response range: `bytes 0-8388607` -> start=0, end=8388607
resp_range_part = response_range[6:] # Remove 'bytes '
resp_start, resp_end = resp_range_part.split('-', 1)
resp_start = int(resp_start)
resp_end = int(resp_end)

# Validate that response starts where we requested
if resp_start != req_start:
raise S3ValidationError(
f"Response range start `{resp_start}` does not match "
f"requested start `{req_start}`"
)

# Validate that response doesn't exceed what we requested
if resp_end > req_end:
raise S3ValidationError(
f"Requested range: `{requested_range[6:]}` does not match "
f"content range in response: `{response_range[6:]}`"
f"Response range end `{resp_end}` exceeds "
f"requested end `{req_end}`"
)


Expand Down
Loading