Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
8c7ca45
Add executor support
jterapin Oct 7, 2025
c21969a
Add changelog entry
jterapin Oct 7, 2025
39ecf0a
Update TM with executor changes
jterapin Oct 7, 2025
a3f2b9f
Remove thread count support from MPU
jterapin Oct 7, 2025
3156f7c
Update Object usage of executor
jterapin Oct 7, 2025
84c9966
Add documentation/remove unused methods from DefaultExecutor
jterapin Oct 8, 2025
8e16a3b
Add Default Executor specs
jterapin Oct 8, 2025
db1cb62
Update TM docs and impl
jterapin Oct 8, 2025
f907c3b
Update streaming MPU to use executor
jterapin Oct 9, 2025
7cb940a
More MP Stream updates
jterapin Oct 9, 2025
4003536
Update specs
jterapin Oct 9, 2025
7dddda9
Update interfaces
jterapin Oct 9, 2025
481f198
Update specs
jterapin Oct 9, 2025
88bf44a
Update changelog
jterapin Oct 9, 2025
c1a25cd
Minor updates
jterapin Oct 9, 2025
7522a16
Fix failing specs
jterapin Oct 9, 2025
89cffe7
Merge branch 'version-3' into s3-executor-support
jterapin Oct 10, 2025
9eea233
Feedback - address sleep in specs
jterapin Oct 10, 2025
75b0d96
Feedback - update method name for cleanup_team_file
jterapin Oct 10, 2025
ad943ee
Feedback - wrap checksum callback
jterapin Oct 10, 2025
f1fc86a
Feedback - update method name in MPU
jterapin Oct 10, 2025
09eae68
Feedback - streamline handling of progress callbacks
jterapin Oct 10, 2025
e824de0
Feedback - streamline docs
jterapin Oct 10, 2025
c073349
Merge branch 'version-3' into s3-executor-support
jterapin Oct 13, 2025
cd91eb7
Feedback - streamline opts
jterapin Oct 13, 2025
abf78d6
Feedback - remove sleep from specs when possible
jterapin Oct 13, 2025
04a287f
Feedback - update to use 10 threads only
jterapin Oct 13, 2025
54b9add
Add directory features
jterapin Oct 13, 2025
ca6c2ae
Add temp changelog entry
jterapin Oct 13, 2025
c9bf8ed
Minor updates
jterapin Oct 13, 2025
5c6caa7
Improve directory uploader
jterapin Oct 13, 2025
7eafc7c
Update uploader
jterapin Oct 13, 2025
d7d5738
Remove keyword args
jterapin Oct 14, 2025
845aa13
Add documentation
jterapin Oct 14, 2025
5201f35
Merge version-3
jterapin Dec 31, 2025
f9e0758
Refactor upload directory
jterapin Dec 31, 2025
bacc68d
Minor refactors to uploader
jterapin Dec 31, 2025
51b76dc
Add upload test holders
jterapin Dec 31, 2025
062ff5e
Merge from version-3
jterapin Jan 5, 2026
0d0fa59
Merge from version-3
jterapin Jan 6, 2026
07d1d43
Add minor edits
jterapin Jan 6, 2026
4e7fddd
Merge from version-3
jterapin Jan 8, 2026
ca0b9ca
Uploader refactors
jterapin Jan 8, 2026
5d0a773
Add testing
jterapin Jan 8, 2026
ee90f2f
Add todo
jterapin Jan 8, 2026
f9b686d
Merge from version-3
jterapin Jan 8, 2026
fb6761b
Fix failure
jterapin Jan 9, 2026
787d81a
Modify uploader specs
jterapin Jan 9, 2026
7c5a803
Merge branch 'version-3' into s3-directory-support
jterapin Jan 12, 2026
25c7dd6
Handle edge cases
jterapin Jan 12, 2026
9412b63
Update upload directory inputs
jterapin Jan 12, 2026
3a82930
Test fixes
jterapin Jan 12, 2026
22df10c
Add rubocop fix
jterapin Jan 12, 2026
3dae2ff
Add directory uploader to tm
jterapin Jan 12, 2026
58fcfe5
Refactor Directory Downloader
jterapin Jan 13, 2026
f83d174
Add fixes
jterapin Jan 13, 2026
569f1f7
Add specs
jterapin Jan 13, 2026
a173b2d
Fixes
jterapin Jan 13, 2026
9c331eb
Adjustments
jterapin Jan 13, 2026
ce9b65f
Mini refactors
jterapin Jan 13, 2026
78fcf15
Refactors
jterapin Jan 13, 2026
31867db
Update changelog
jterapin Jan 13, 2026
853be51
Clean up
jterapin Jan 13, 2026
95ec3db
Fixes
jterapin Jan 13, 2026
12f0338
Populate errors differently
jterapin Jan 13, 2026
7a0f135
Fix
jterapin Jan 13, 2026
e041a1e
Shutdown executor gracefully
jterapin Jan 14, 2026
a76bd21
More streamlining
jterapin Jan 14, 2026
a9f89c2
Merge branch 'version-3' into s3-directory-support
jterapin Jan 14, 2026
ea4f261
Update documentation
jterapin Jan 14, 2026
9d9f6c6
Clean specs
jterapin Jan 14, 2026
c5238ca
Block path traversal keys
jterapin Jan 14, 2026
735c2b3
Remove comments
jterapin Jan 14, 2026
9341672
Refactor
jterapin Jan 14, 2026
bc13fae
Refactors
jterapin Jan 14, 2026
78200c9
Mini refactors
jterapin Jan 14, 2026
36042f2
Merge version-3 into branch
jterapin Jan 15, 2026
7812d29
Scope queue executor
jterapin Jan 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions gems/aws-sdk-s3/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
Unreleased Changes
------------------

* Feature - Added `#upload_directory` and `#download_directory` to `Aws::S3::TransferManager` for bulk directory transfers.

1.211.0 (2026-01-08)
------------------

Expand Down
10 changes: 9 additions & 1 deletion gems/aws-sdk-s3/lib/aws-sdk-s3/customizations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,23 @@ module S3
autoload :Encryption, 'aws-sdk-s3/encryption'
autoload :EncryptionV2, 'aws-sdk-s3/encryption_v2'
autoload :EncryptionV3, 'aws-sdk-s3/encryption_v3'
autoload :LegacySigner, 'aws-sdk-s3/legacy_signer'

# transfer manager + multipart upload/download utilities
autoload :DefaultExecutor, 'aws-sdk-s3/default_executor'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: it looks like default_executor is listed twice here.

autoload :FilePart, 'aws-sdk-s3/file_part'
autoload :DefaultExecutor, 'aws-sdk-s3/default_executor'
autoload :FileUploader, 'aws-sdk-s3/file_uploader'
autoload :FileDownloader, 'aws-sdk-s3/file_downloader'
autoload :LegacySigner, 'aws-sdk-s3/legacy_signer'
autoload :MultipartDownloadError, 'aws-sdk-s3/multipart_download_error'
autoload :MultipartFileUploader, 'aws-sdk-s3/multipart_file_uploader'
autoload :MultipartStreamUploader, 'aws-sdk-s3/multipart_stream_uploader'
autoload :MultipartUploadError, 'aws-sdk-s3/multipart_upload_error'
autoload :DirectoryProgress, 'aws-sdk-s3/directory_progress'
autoload :DirectoryDownloadError, 'aws-sdk-s3/directory_download_error'
autoload :DirectoryDownloader, 'aws-sdk-s3/directory_downloader'
autoload :DirectoryUploadError, 'aws-sdk-s3/directory_upload_error'
autoload :DirectoryUploader, 'aws-sdk-s3/directory_uploader'
autoload :ObjectCopier, 'aws-sdk-s3/object_copier'
autoload :ObjectMultipartCopier, 'aws-sdk-s3/object_multipart_copier'
autoload :PresignedPost, 'aws-sdk-s3/presigned_post'
Expand Down
16 changes: 16 additions & 0 deletions gems/aws-sdk-s3/lib/aws-sdk-s3/directory_download_error.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# frozen_string_literal: true

module Aws
module S3
# Raised when DirectoryDownloader fails to download objects from S3 bucket
class DirectoryDownloadError < StandardError
def initialize(message, errors = [])
@errors = errors
super(message)
end

# @return [Array<StandardError>] The list of errors encountered when downloading objects
attr_reader :errors
end
end
end
212 changes: 212 additions & 0 deletions gems/aws-sdk-s3/lib/aws-sdk-s3/directory_downloader.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
# frozen_string_literal: true

module Aws
module S3
# @api private
class DirectoryDownloader
def initialize(options = {})
@client = options[:client]
@executor = options[:executor]
@abort_requested = false
@mutex = Mutex.new
end

attr_reader :client, :executor

def abort_requested
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be more clear with a ? suffix (ie abort_requested?) to make it clear that its a status check rather than an action.

@mutex.synchronize { @abort_requested }
end

def request_abort
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"request_abort" feels more verbose than just "abort". I think the internal @abort_requested makes sense, but the method's name I think could just be abort. I assume the request_abort is to make it clear that aborting is async, but I think thats implied with abort already. What do you think?

@mutex.synchronize { @abort_requested = true }
end

def download(destination, bucket:, **options)
if File.exist?(destination)
raise ArgumentError, 'invalid destination, expected a directory' unless File.directory?(destination)
else
FileUtils.mkdir_p(destination)
end

download_opts, producer_opts = build_opts(destination, bucket, options)
downloader = FileDownloader.new(client: @client, executor: @executor)
producer = ObjectProducer.new(producer_opts)
downloads, errors = process_download_queue(producer, downloader, download_opts)
build_result(downloads, errors)
ensure
@abort_requested = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we setting @abort_requested to false here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need @mutex.synchronize?

end

private

def build_opts(destination, bucket, opts)
download_opts = {
progress_callback: opts[:progress_callback],
destination: destination,
ignore_failure: opts[:ignore_failure] || false
}
producer_opts = {
client: @client,
directory_downloader: self,
destination: destination,
bucket: bucket,
s3_prefix: opts[:s3_prefix],
filter_callback: opts[:filter_callback],
request_callback: opts[:request_callback]
}
[download_opts, producer_opts]
end

def build_result(download_count, errors)
if abort_requested
msg = "directory download failed: #{errors.map(&:message).join('; ')}"
raise DirectoryDownloadError.new(msg, errors)
else
{
completed_downloads: [download_count - errors.count, 0].max,
failed_downloads: errors.count,
errors: errors.any? ? errors : nil
}.compact
end
end

def download_object(entry, downloader, opts, progress, errors)
raise entry.error if entry.error

FileUtils.mkdir_p(File.dirname(entry.path)) unless Dir.exist?(File.dirname(entry.path))
downloader.download(entry.path, entry.params)
progress&.call(File.size(entry.path))
rescue StandardError => e
@mutex.synchronize { errors << e }
request_abort unless opts[:ignore_failure]
end

def process_download_queue(producer, downloader, opts)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a situation where we need to raise, I decided against doing @queue_executor.kill since there might be work in-flight that hangs. It would be best to exit gracefully and the bubbles up to raise within #build_results

progress = DirectoryProgress.new(opts[:progress_callback]) if opts[:progress_callback]
queue_executor = DefaultExecutor.new
completion_queue = Queue.new
download_attempts = 0
errors = []
begin
producer.each do |object|
break if abort_requested

download_attempts += 1
queue_executor.post(object) do |o|
download_object(o, downloader, opts, progress, errors)
ensure
completion_queue << :done
end
end
rescue StandardError => e
@mutex.synchronize { errors << e }
request_abort
end
download_attempts.times { completion_queue.pop }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it guaranteed that download_attempts will be less than or equal to completion_queue size?

[download_attempts, errors]
ensure
queue_executor&.shutdown
end

# @api private
class ObjectProducer
include Enumerable

DEFAULT_QUEUE_SIZE = 100
DONE_MARKER = :done

def initialize(opts = {})
@directory_downloader = opts[:directory_downloader]
@destination_dir = opts[:destination]
@bucket = opts[:bucket]
@client = opts[:client]
@s3_prefix = opts[:s3_prefix]
@filter_callback = opts[:filter_callback]
@request_callback = opts[:request_callback]
@object_queue = SizedQueue.new(DEFAULT_QUEUE_SIZE)
end

def each
producer_thread = Thread.new do
stream_objects
ensure
@object_queue << DONE_MARKER
end

# Yield objects from internal queue
while (object = @object_queue.shift) != DONE_MARKER
break if @directory_downloader.abort_requested

yield object
end
ensure
producer_thread.join
end

private

def apply_request_callback(key, params)
@request_callback&.call(key, params.dup)
end

def build_object_entry(key)
params = { bucket: @bucket, key: key }
params = apply_request_callback(key, params) if @request_callback
normalized_key = normalize_key(key)
full_path = File.join(@destination_dir, normalized_key)
error = validate_path(full_path, key)
DownloadEntry.new(path: full_path, params: params, error: error)
end

def include_object?(key)
return true unless @filter_callback

@filter_callback.call(key)
end

def directory_marker?(obj)
obj.key.end_with?('/') && obj.size.zero?
end

def normalize_key(key)
if @s3_prefix
prefix = @s3_prefix.end_with?('/') ? @s3_prefix : "#{@s3_prefix}/"
key = key.delete_prefix(prefix)
end
File::SEPARATOR == '/' ? key : key.tr('/', File::SEPARATOR)
end

def stream_objects(continuation_token: nil)
resp = @client.list_objects_v2(bucket: @bucket, prefix: @s3_prefix, continuation_token: continuation_token)
resp.contents&.each do |o|
break if @directory_downloader.abort_requested
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will block on the mutex - I'm not sure we always need to that here. It is definitely safe to do so, but likely has a performance hit. Ditto I think with the check on line 91:

        begin
          producer.each do |object|
              break if abort_requested

Since we're using a SizedQueue for communicating between threads - maybe we could use clear/close on it rather than constantly blocking on the mutex? If we used close It will cause threads waiting to raise ClosedQueueError which we could catch and handle to detect aborts? I haven't fully thought that out, but I think it might simplify the code and avoid locking as much.


next if directory_marker?(o)
next unless include_object?(o.key)

@object_queue << build_object_entry(o.key)
end
stream_objects(continuation_token: resp.next_continuation_token) if resp.next_continuation_token
end

def validate_path(path, key)
segments = path.split('/')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be File::SEPERATOR rather than / here? I think the path at this point will come from File.join and so would have os seperator? I might be wrong about that though.

return unless segments.any? { |s| %w[. ..].include?(s) }

DirectoryDownloadError.new("invalid key '#{key}': contains '.' or '..' path segments")
end

# @api private
class DownloadEntry
def initialize(opts = {})
@path = opts[:path]
@params = opts[:params]
@error = opts[:error]
end

attr_reader :path, :params, :error
end
end
end
end
end
24 changes: 24 additions & 0 deletions gems/aws-sdk-s3/lib/aws-sdk-s3/directory_progress.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# frozen_string_literal: true

module Aws
module S3
# @api private
class DirectoryProgress
def initialize(progress_callback)
@transferred_bytes = 0
@transferred_files = 0
@progress_callback = progress_callback
@mutex = Mutex.new
end

def call(bytes_transferred)
@mutex.synchronize do
@transferred_bytes += bytes_transferred
@transferred_files += 1

@progress_callback.call(@transferred_bytes, @transferred_files)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see why we're calling the progress_callback inside the synchronize block - it does ensure progress is linear for users. However, depending on what they're doing in the callback (things like IO like printing/writing to file, ect) - this could end up being a small bottleneck. I'm not sure how much that matters vs the fully ordered progress callbacks.

end
end
end
end
end
16 changes: 16 additions & 0 deletions gems/aws-sdk-s3/lib/aws-sdk-s3/directory_upload_error.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# frozen_string_literal: true

module Aws
module S3
# Raised when DirectoryUploader fails to upload files to S3 bucket
class DirectoryUploadError < StandardError
def initialize(message, errors = [])
@errors = errors
super(message)
end

# @return [Array<StandardError>] The list of errors encountered when uploading files
attr_reader :errors
end
end
end
Loading