diff --git a/gems/aws-sdk-s3/CHANGELOG.md b/gems/aws-sdk-s3/CHANGELOG.md index acfff814db8..51b202d280f 100644 --- a/gems/aws-sdk-s3/CHANGELOG.md +++ b/gems/aws-sdk-s3/CHANGELOG.md @@ -1,6 +1,8 @@ Unreleased Changes ------------------ +* Feature - Added `#upload_directory` and `#download_directory` to `Aws::S3::TransferManager` for bulk directory transfers. + 1.211.0 (2026-01-08) ------------------ diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations.rb index 9a3914a2b39..bf91df81e8a 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations.rb @@ -7,15 +7,23 @@ module S3 autoload :Encryption, 'aws-sdk-s3/encryption' autoload :EncryptionV2, 'aws-sdk-s3/encryption_v2' autoload :EncryptionV3, 'aws-sdk-s3/encryption_v3' + autoload :LegacySigner, 'aws-sdk-s3/legacy_signer' + + # transfer manager + multipart upload/download utilities + autoload :DefaultExecutor, 'aws-sdk-s3/default_executor' autoload :FilePart, 'aws-sdk-s3/file_part' autoload :DefaultExecutor, 'aws-sdk-s3/default_executor' autoload :FileUploader, 'aws-sdk-s3/file_uploader' autoload :FileDownloader, 'aws-sdk-s3/file_downloader' - autoload :LegacySigner, 'aws-sdk-s3/legacy_signer' autoload :MultipartDownloadError, 'aws-sdk-s3/multipart_download_error' autoload :MultipartFileUploader, 'aws-sdk-s3/multipart_file_uploader' autoload :MultipartStreamUploader, 'aws-sdk-s3/multipart_stream_uploader' autoload :MultipartUploadError, 'aws-sdk-s3/multipart_upload_error' + autoload :DirectoryProgress, 'aws-sdk-s3/directory_progress' + autoload :DirectoryDownloadError, 'aws-sdk-s3/directory_download_error' + autoload :DirectoryDownloader, 'aws-sdk-s3/directory_downloader' + autoload :DirectoryUploadError, 'aws-sdk-s3/directory_upload_error' + autoload :DirectoryUploader, 'aws-sdk-s3/directory_uploader' autoload :ObjectCopier, 'aws-sdk-s3/object_copier' autoload :ObjectMultipartCopier, 'aws-sdk-s3/object_multipart_copier' autoload :PresignedPost, 'aws-sdk-s3/presigned_post' diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_download_error.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_download_error.rb new file mode 100644 index 00000000000..7ec3b35a35d --- /dev/null +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_download_error.rb @@ -0,0 +1,16 @@ +# frozen_string_literal: true + +module Aws + module S3 + # Raised when DirectoryDownloader fails to download objects from S3 bucket + class DirectoryDownloadError < StandardError + def initialize(message, errors = []) + @errors = errors + super(message) + end + + # @return [Array] The list of errors encountered when downloading objects + attr_reader :errors + end + end +end diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_downloader.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_downloader.rb new file mode 100644 index 00000000000..092eaf0e1a7 --- /dev/null +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_downloader.rb @@ -0,0 +1,212 @@ +# frozen_string_literal: true + +module Aws + module S3 + # @api private + class DirectoryDownloader + def initialize(options = {}) + @client = options[:client] + @executor = options[:executor] + @abort_requested = false + @mutex = Mutex.new + end + + attr_reader :client, :executor + + def abort_requested + @mutex.synchronize { @abort_requested } + end + + def request_abort + @mutex.synchronize { @abort_requested = true } + end + + def download(destination, bucket:, **options) + if File.exist?(destination) + raise ArgumentError, 'invalid destination, expected a directory' unless File.directory?(destination) + else + FileUtils.mkdir_p(destination) + end + + download_opts, producer_opts = build_opts(destination, bucket, options) + downloader = FileDownloader.new(client: @client, executor: @executor) + producer = ObjectProducer.new(producer_opts) + downloads, errors = process_download_queue(producer, downloader, download_opts) + build_result(downloads, errors) + ensure + @abort_requested = false + end + + private + + def build_opts(destination, bucket, opts) + download_opts = { + progress_callback: opts[:progress_callback], + destination: destination, + ignore_failure: opts[:ignore_failure] || false + } + producer_opts = { + client: @client, + directory_downloader: self, + destination: destination, + bucket: bucket, + s3_prefix: opts[:s3_prefix], + filter_callback: opts[:filter_callback], + request_callback: opts[:request_callback] + } + [download_opts, producer_opts] + end + + def build_result(download_count, errors) + if abort_requested + msg = "directory download failed: #{errors.map(&:message).join('; ')}" + raise DirectoryDownloadError.new(msg, errors) + else + { + completed_downloads: [download_count - errors.count, 0].max, + failed_downloads: errors.count, + errors: errors.any? ? errors : nil + }.compact + end + end + + def download_object(entry, downloader, opts, progress, errors) + raise entry.error if entry.error + + FileUtils.mkdir_p(File.dirname(entry.path)) unless Dir.exist?(File.dirname(entry.path)) + downloader.download(entry.path, entry.params) + progress&.call(File.size(entry.path)) + rescue StandardError => e + @mutex.synchronize { errors << e } + request_abort unless opts[:ignore_failure] + end + + def process_download_queue(producer, downloader, opts) + progress = DirectoryProgress.new(opts[:progress_callback]) if opts[:progress_callback] + queue_executor = DefaultExecutor.new + completion_queue = Queue.new + download_attempts = 0 + errors = [] + begin + producer.each do |object| + break if abort_requested + + download_attempts += 1 + queue_executor.post(object) do |o| + download_object(o, downloader, opts, progress, errors) + ensure + completion_queue << :done + end + end + rescue StandardError => e + @mutex.synchronize { errors << e } + request_abort + end + download_attempts.times { completion_queue.pop } + [download_attempts, errors] + ensure + queue_executor&.shutdown + end + + # @api private + class ObjectProducer + include Enumerable + + DEFAULT_QUEUE_SIZE = 100 + DONE_MARKER = :done + + def initialize(opts = {}) + @directory_downloader = opts[:directory_downloader] + @destination_dir = opts[:destination] + @bucket = opts[:bucket] + @client = opts[:client] + @s3_prefix = opts[:s3_prefix] + @filter_callback = opts[:filter_callback] + @request_callback = opts[:request_callback] + @object_queue = SizedQueue.new(DEFAULT_QUEUE_SIZE) + end + + def each + producer_thread = Thread.new do + stream_objects + ensure + @object_queue << DONE_MARKER + end + + # Yield objects from internal queue + while (object = @object_queue.shift) != DONE_MARKER + break if @directory_downloader.abort_requested + + yield object + end + ensure + producer_thread.join + end + + private + + def apply_request_callback(key, params) + @request_callback&.call(key, params.dup) + end + + def build_object_entry(key) + params = { bucket: @bucket, key: key } + params = apply_request_callback(key, params) if @request_callback + normalized_key = normalize_key(key) + full_path = File.join(@destination_dir, normalized_key) + error = validate_path(full_path, key) + DownloadEntry.new(path: full_path, params: params, error: error) + end + + def include_object?(key) + return true unless @filter_callback + + @filter_callback.call(key) + end + + def directory_marker?(obj) + obj.key.end_with?('/') && obj.size.zero? + end + + def normalize_key(key) + if @s3_prefix + prefix = @s3_prefix.end_with?('/') ? @s3_prefix : "#{@s3_prefix}/" + key = key.delete_prefix(prefix) + end + File::SEPARATOR == '/' ? key : key.tr('/', File::SEPARATOR) + end + + def stream_objects(continuation_token: nil) + resp = @client.list_objects_v2(bucket: @bucket, prefix: @s3_prefix, continuation_token: continuation_token) + resp.contents&.each do |o| + break if @directory_downloader.abort_requested + + next if directory_marker?(o) + next unless include_object?(o.key) + + @object_queue << build_object_entry(o.key) + end + stream_objects(continuation_token: resp.next_continuation_token) if resp.next_continuation_token + end + + def validate_path(path, key) + segments = path.split('/') + return unless segments.any? { |s| %w[. ..].include?(s) } + + DirectoryDownloadError.new("invalid key '#{key}': contains '.' or '..' path segments") + end + + # @api private + class DownloadEntry + def initialize(opts = {}) + @path = opts[:path] + @params = opts[:params] + @error = opts[:error] + end + + attr_reader :path, :params, :error + end + end + end + end +end diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_progress.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_progress.rb new file mode 100644 index 00000000000..d2100782977 --- /dev/null +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_progress.rb @@ -0,0 +1,24 @@ +# frozen_string_literal: true + +module Aws + module S3 + # @api private + class DirectoryProgress + def initialize(progress_callback) + @transferred_bytes = 0 + @transferred_files = 0 + @progress_callback = progress_callback + @mutex = Mutex.new + end + + def call(bytes_transferred) + @mutex.synchronize do + @transferred_bytes += bytes_transferred + @transferred_files += 1 + + @progress_callback.call(@transferred_bytes, @transferred_files) + end + end + end + end +end diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_upload_error.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_upload_error.rb new file mode 100644 index 00000000000..1818e103805 --- /dev/null +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_upload_error.rb @@ -0,0 +1,16 @@ +# frozen_string_literal: true + +module Aws + module S3 + # Raised when DirectoryUploader fails to upload files to S3 bucket + class DirectoryUploadError < StandardError + def initialize(message, errors = []) + @errors = errors + super(message) + end + + # @return [Array] The list of errors encountered when uploading files + attr_reader :errors + end + end +end diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_uploader.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_uploader.rb new file mode 100644 index 00000000000..ae883a41380 --- /dev/null +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_uploader.rb @@ -0,0 +1,262 @@ +# frozen_string_literal: true + +require 'set' + +module Aws + module S3 + # @api private + class DirectoryUploader + def initialize(opts = {}) + @client = opts[:client] + @executor = opts[:executor] + @abort_requested = false + @mutex = Mutex.new + end + + attr_reader :client, :executor + + def abort_requested + @mutex.synchronize { @abort_requested } + end + + def request_abort + @mutex.synchronize { @abort_requested = true } + end + + def upload(source_directory, bucket, **opts) + raise ArgumentError, 'Invalid directory' unless Dir.exist?(source_directory) + + uploader = FileUploader.new( + multipart_threshold: opts.delete(:multipart_threshold), + http_chunk_size: opts.delete(:http_chunk_size), + client: @client, + executor: @executor + ) + upload_opts, producer_opts = build_opts(source_directory, bucket, opts) + producer = FileProducer.new(producer_opts) + uploads, errors = process_upload_queue(producer, uploader, upload_opts) + build_result(uploads, errors) + ensure + @abort_requested = false + end + + private + + def build_opts(source_directory, bucket, opts) + uploader_opts = { progress_callback: opts[:progress_callback], ignore_failure: opts[:ignore_failure] || false } + producer_opts = { + directory_uploader: self, + source_dir: source_directory, + bucket: bucket, + s3_prefix: opts[:s3_prefix], + recursive: opts[:recursive] || false, + follow_symlinks: opts[:follow_symlinks] || false, + filter_callback: opts[:filter_callback], + request_callback: opts[:request_callback] + } + [uploader_opts, producer_opts] + end + + def build_result(upload_count, errors) + if abort_requested + msg = "directory upload failed: #{errors.map(&:message).join('; ')}" + raise DirectoryUploadError.new(msg, errors) + else + { + completed_uploads: [upload_count - errors.count, 0].max, + failed_uploads: errors.count, + errors: errors.any? ? errors : nil + }.compact + end + end + + def process_upload_queue(producer, uploader, opts) + progress = DirectoryProgress.new(opts[:progress_callback]) if opts[:progress_callback] + queue_executor = DefaultExecutor.new + completion_queue = Queue.new + upload_attempts = 0 + errors = [] + begin + producer.each do |file| + break if abort_requested + + upload_attempts += 1 + queue_executor.post(file) do |f| + upload_file(f, uploader, opts, progress, errors) + ensure + completion_queue << :done + end + end + rescue StandardError => e + @mutex.synchronize { errors << e } + request_abort + end + upload_attempts.times { completion_queue.pop } + [upload_attempts, errors] + ensure + queue_executor&.shutdown + end + + def upload_file(entry, uploader, opts, progress, errors) + uploader.upload(entry.path, entry.params) + progress&.call(File.size(entry.path)) + rescue StandardError => e + @mutex.synchronize { errors << e } + request_abort unless opts[:ignore_failure] + end + + # @api private + class FileProducer + include Enumerable + + DEFAULT_QUEUE_SIZE = 100 + DONE_MARKER = :done + + def initialize(opts = {}) + @directory_uploader = opts[:directory_uploader] + @source_dir = opts[:source_dir] + @bucket = opts[:bucket] + @s3_prefix = opts[:s3_prefix] + @recursive = opts[:recursive] + @follow_symlinks = opts[:follow_symlinks] + @filter_callback = opts[:filter_callback] + @request_callback = opts[:request_callback] + @file_queue = SizedQueue.new(DEFAULT_QUEUE_SIZE) + end + + def each + err = nil + producer_thread = Thread.new do + if @recursive + find_recursively + else + find_directly + end + rescue StandardError => e + @directory_uploader.request_abort + @file_queue.clear + + err = DirectoryUploadError.new("Directory traversal failed for '#{@source_dir}': #{e.message}") + ensure + @file_queue << DONE_MARKER + end + + while (file = @file_queue.shift) != DONE_MARKER + break if @directory_uploader.abort_requested + + yield file + end + ensure + producer_thread.join + raise err if err + end + + private + + def apply_request_callback(file_path, params) + callback_params = @request_callback.call(file_path, params.dup) + return params unless callback_params.is_a?(Hash) && callback_params.any? + + params.merge(callback_params) + end + + def build_upload_entry(file_path, key) + params = { bucket: @bucket, key: @s3_prefix ? File.join(@s3_prefix, key) : key } + params = apply_request_callback(file_path, params) if @request_callback + UploadEntry.new(path: file_path, params: params) + end + + def find_directly + Dir.each_child(@source_dir) do |entry| + break if @directory_uploader.abort_requested + + entry_path = File.join(@source_dir, entry) + stat = nil + + if @follow_symlinks + stat = File.stat(entry_path) + next if stat.directory? + else + stat = File.lstat(entry_path) + next if stat.symlink? || stat.directory? + end + + next unless stat.file? + next unless include_file?(entry_path, entry) + + @file_queue << build_upload_entry(entry_path, entry) + end + end + + def find_recursively + if @follow_symlinks + ancestors = Set.new + ancestors << File.stat(@source_dir).ino + scan_directory(@source_dir, ancestors: ancestors) + else + scan_directory(@source_dir) + end + end + + def include_file?(file_path, file_name) + return true unless @filter_callback + + @filter_callback.call(file_path, file_name) + end + + def scan_directory(dir_path, key_prefix: '', ancestors: nil) + return if @directory_uploader.abort_requested + + Dir.each_child(dir_path) do |entry| + break if @directory_uploader.abort_requested + + full_path = File.join(dir_path, entry) + next unless include_file?(full_path, entry) + + stat = get_file_stat(full_path) + next unless stat + + if stat.directory? + handle_directory(full_path, entry, key_prefix, ancestors) + elsif stat.file? # skip non-file types + key = key_prefix.empty? ? entry : File.join(key_prefix, entry) + @file_queue << build_upload_entry(full_path, key) + end + end + end + + def get_file_stat(full_path) + return File.stat(full_path) if @follow_symlinks + + lstat = File.lstat(full_path) + return if lstat.symlink? + + lstat + end + + def handle_directory(dir_path, dir_name, key_prefix, ancestors) + ino = nil + if @follow_symlinks && ancestors + ino = File.stat(dir_path).ino + return if ancestors.include?(ino) # cycle detected - skip + + ancestors.add(ino) + end + new_prefix = key_prefix.empty? ? dir_name : File.join(key_prefix, dir_name) + scan_directory(dir_path, key_prefix: new_prefix, ancestors: ancestors) + ancestors.delete(ino) if @follow_symlinks && ancestors + end + + # @api private + class UploadEntry + def initialize(opts = {}) + @path = opts[:path] + @params = opts[:params] + end + + attr_reader :path, :params + end + end + end + end +end diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb index 6ee2d09deb3..d9adf5893a6 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb @@ -7,7 +7,9 @@ module S3 # # * upload a file with multipart upload # * upload a stream with multipart upload - # * download a S3 object with multipart download + # * upload all files in a directory to an S3 bucket recursively or non-recursively + # * download an S3 object with multipart download + # * download all objects in an S3 bucket with same prefix to a local directory # * track transfer progress by using progress listener # # ## Executor Management @@ -49,7 +51,6 @@ module S3 # executor.shutdown # You must shutdown custom executors # class TransferManager - # @param [Hash] options # @option options [S3::Client] :client (S3::Client.new) # The S3 client to use for {TransferManager} operations. If not provided, a new default client @@ -70,6 +71,90 @@ def initialize(options = {}) # @return [Object] attr_reader :executor + # Downloads objects in a S3 bucket to a local directory. + # + # The downloaded directory structure will match the provided S3 virtual bucket. For example, + # assume that you have the following keys in your bucket: + # + # * sample.jpg + # * photos/2022/January/sample.jpg + # * photos/2022/February/sample1.jpg + # * photos/2022/February/sample2.jpg + # * photos/2022/February/sample3.jpg + # + # Given a request to download bucket to a destination with path of `/test`, the downloaded + # directory would look like this: + # + # ``` + # |- test + # |- sample.jpg + # |- photos + # |- 2022 + # |- January + # |- sample.jpg + # |- February + # |- sample1.jpg + # |- sample2.jpg + # |- sample3.jpg + # ``` + # + # Directory markers (zero-byte objects ending with `/`) are skipped during download. + # Existing files with same name as downloaded objects will be overwritten. + # + # Object keys containing path traversal sequences (`..` or `.`) will raise an error. + # + # @example Downloading buckets to a local directory + # tm = TransferManager.new + # tm.download_directory('/local/path', bucket: 'my-bucket') + # # => {completed_downloads: 7, failed_downloads: 0, errors: 0} + # + # @param [String] destination + # The location directory path to download objects to. Created if it doesn't exist. + # If files with the same names already exist in the destination, they will be overwritten. + # + # @param [String] bucket + # The name of the bucket to download from. + # + # @param [Hash] options + # + # @option options [String] :s3_prefix (nil) + # Limit the download to objects that begin with the specific prefix. The prefix is stripped from + # object key when downloading. + # For example, with prefix `photos/2024/`, an object `photos/2024/vacation/beach.jpg` + # is downloaded to `/vacation/beach.jpg`. + # + # @option options [Boolean] :ignore_failure (false) + # How to handle individual file download failures: + # * `false` (default) - Cancel all ongoing requests, terminate the ongoing downloads and raise an exception + # * `true` - Continue downloading remaining objects, report failures in result. + # + # @option options [Proc] :filter_callback (nil) + # A Proc to filter which objects to download. Called with `(key)` for each object. + # Return `true` to download the object, `false` to skip it. + # + # @option options [Proc] :request_callback (nil) + # A Proc to modify download parameters for each object. Called with `(key, params)`. + # Must return the modified parameters. + # + # @option options [Proc] :progress_callback (nil) + # A Proc that will be called as objects are downloaded. + # It will be invoked with `transferred_bytes` and `transferred_files`. + # + # @raise [DirectoryDownloadError] Raised when download fails with `ignore_failure: false` (default) + # + # @return [Hash] Returns a hash with download statistics: + # + # * `:completed_downloads` - Number of objects successfully downloaded + # * `:failed_downloads` - Number of objects that failed to download + # * `:errors` - Array of errors for failed downloads (only present when failures occur) + def download_directory(destination, bucket:, **options) + executor = @executor || DefaultExecutor.new + downloader = DirectoryDownloader.new(client: @client, executor: executor) + result = downloader.download(destination, bucket: bucket, **options) + executor.shutdown unless @executor + result + end + # Downloads a file in S3 to a path on disk. # # # small files (< 5MB) are downloaded in a single API call @@ -153,6 +238,120 @@ def download_file(destination, bucket:, key:, **options) true end + # Uploads all files under the given directory to the provided S3 bucket. + # The key name transformation depends on the optional prefix. + # + # By default, all subdirectories will be uploaded non-recursively and symbolic links are not + # followed automatically. Assume you have a local directory `/test` with the following structure: + # + # ``` + # |- test + # |- sample.jpg + # |- photos + # |- 2022 + # |- January + # |- sample.jpg + # |- February + # |- sample1.jpg + # |- sample2.jpg + # |- sample3.jpg + # ``` + # + # Give a request to upload directory `/test` to an S3 bucket on default setting, the target bucket will have the + # following S3 objects: + # + # * sample.jpg + # + # If `:recursive` set to `true`, the target bucket will have the following S3 buckets: + # + # * sample.jpg + # * photos/2022/January/sample.jpg + # * photos/2022/February/sample1.jpg + # * photos/2022/February/sample2.jpg + # * photos/2022/February/sample3.jpg + # + # Only regular files are uploaded; special files (sockets, pipes, devices) are skipped. + # Symlink cycles are detected and skipped when following symlinks. + # Empty directories are not represented in S3. Existing S3 objects with the same key are + # overwritten. + # + # @example Uploading a directory + # tm = TransferManager.new + # tm.upload_directory('/path/to/directory', bucket: 'bucket') + # # => {completed_uploads: 7, failed_uploads: 0} + # + # @example Using filter callback to upload only text files + # tm = TransferManager.new + # filter = proc do |file_path, file_name| + # File.extname(file_name) == '.txt' # Only upload .txt files + # end + # tm.upload_directory('/path/to/directory', bucket: 'bucket', filter_callback: filter) + # + # @param [String, Pathname, File, Tempfile] source + # The source directory to upload. + # + # @param [String] bucket + # The name of the bucket to upload objects to. + # + # @param [Hash] options + # + # @option options [String] :s3_prefix (nil) + # The S3 key prefix to use for each object. If not provided, files will be uploaded to the root of the bucket. + # + # @option options [Boolean] :recursive (false) + # Whether to upload directories recursively: + # + # * `false` (default) - only files in the top-level directory are uploaded, subdirectories are ignored. + # * `true` - all files and subdirectories are uploaded recursively. + # + # @option options [Boolean] :follow_symlinks (false) + # Whether to follow symbolic links when traversing the file tree: + # + # * `false` (default) - symbolic links are ignored and not uploaded. + # * `true` - symbolic links are followed and their target files/directories are uploaded. Symlink cycles + # are detected and skipped. + # + # @option options [Boolean] :ignore_failure (false) + # How to handle individual file upload failures: + # + # * `false` (default) - Cancel all ongoing requests, terminate the directory upload, and raise an exception + # * `true` - Ignore the failure and continue the transfer for other files + # + # @option options [Proc] :filter_callback (nil) + # A Proc to filter which files to upload. Called with `(file_path, file_name)` for each file. + # Return `true` to upload the file, `false` to skip it. + # + # @option options [Proc] :request_callback (nil) + # A Proc to modify upload parameters for each file. Called with `(file_path, params)`. + # Must return the modified parameters. + # + # @option options [Proc] :progress_callback (nil) + # A Proc that will be called as files are uploaded. + # It will be invoked with `transferred_bytes` and `transferred_files`. + # + # @option options [Integer] :http_chunk_size (16384) Size in bytes for each chunk when streaming request bodies + # over HTTP. Controls the buffer size used when sending data to S3. Larger values may improve throughput by + # reducing the number of network writes, but use more memory. Custom values must be at least 16KB. + # Only Ruby MRI is supported. + # + # @raise [DirectoryUploadError] Raised when: + # + # * Upload failure with `ignore_failure: false` (default) + # * Directory traversal failure (permission denied, broken symlink, etc.) + # + # @return [Hash] Returns a hash with upload statistics: + # + # * `:completed_uploads` - Number of files successfully uploaded + # * `:failed_uploads` - Number of files that failed to upload + # * `:errors` - Array of error objects for failed uploads (only present when failures occur) + def upload_directory(source, bucket:, **options) + executor = @executor || DefaultExecutor.new + uploader = DirectoryUploader.new(client: @client, executor: executor) + result = uploader.upload(source, bucket, **options.merge(http_chunk_size: resolve_http_chunk_size(options))) + executor.shutdown unless @executor + result + end + # Uploads a file from disk to S3. # # # a small file are uploaded with PutObject API @@ -226,17 +425,7 @@ def download_file(destination, bucket:, key:, **options) # @see Client#upload_part def upload_file(source, bucket:, key:, **options) upload_opts = options.merge(bucket: bucket, key: key) - http_chunk_size = - if defined?(JRUBY_VERSION) - nil - else - chunk = upload_opts.delete(:http_chunk_size) - if chunk && chunk < Aws::Plugins::ChecksumAlgorithm::DEFAULT_TRAILER_CHUNK_SIZE - raise ArgumentError, ':http_chunk_size must be at least 16384 bytes (16KB)' - end - - chunk - end + http_chunk_size = resolve_http_chunk_size(upload_opts) executor = @executor || DefaultExecutor.new(max_threads: upload_opts.delete(:thread_count)) uploader = FileUploader.new( @@ -316,6 +505,19 @@ def upload_stream(bucket:, key:, **options, &block) executor.shutdown unless @executor true end + + private + + def resolve_http_chunk_size(opts) + return if defined?(JRUBY_VERSION) + + chunk = opts.delete(:http_chunk_size) + if chunk && chunk < Aws::Plugins::ChecksumAlgorithm::DEFAULT_TRAILER_CHUNK_SIZE + raise ArgumentError, ':http_chunk_size must be at least 16384 bytes (16KB)' + end + + chunk + end end end end diff --git a/gems/aws-sdk-s3/spec/directory_downloader_spec.rb b/gems/aws-sdk-s3/spec/directory_downloader_spec.rb new file mode 100644 index 00000000000..25f77e4a9e0 --- /dev/null +++ b/gems/aws-sdk-s3/spec/directory_downloader_spec.rb @@ -0,0 +1,177 @@ +# frozen_string_literal: true + +require_relative 'transfer_manger_spec_helper' + +module Aws + module S3 + describe DirectoryDownloader do + let(:client) { Aws::S3::Client.new(stub_responses: true) } + let(:executor) { DefaultExecutor.new } + let(:downloader) { DirectoryDownloader.new(client: client, executor: executor) } + + describe '#initialize' do + it 'constructs with default options' do + downloader = DirectoryDownloader.new + expect(downloader.abort_requested).to be false + end + + it 'accepts client and executor options' do + expect(downloader.client).to be(client) + expect(downloader.executor).to be(executor) + end + end + + describe '#download' do + let(:temp_dir) { Dir.mktmpdir } + + before do + client.stub_responses( + :list_objects_v2, + { + contents: [ + { key: 'file1.txt', size: 100 }, + { key: 'file2.json', size: 100 }, + { key: 'file3.txt', size: 100 } + ], + is_truncated: false + } + ) + client.stub_responses(:get_object, { body: 'content' }) + end + + after do + FileUtils.rm_rf(temp_dir) + end + + it 'handles empty bucket' do + client.stub_responses(:list_objects_v2, { contents: [], is_truncated: false }) + result = downloader.download(temp_dir, bucket: 'test-bucket') + + expect(result[:completed_downloads]).to eq(0) + expect(result[:failed_downloads]).to eq(0) + end + + it 'raises when given an invalid destination' do + file_path = File.join(temp_dir, 'file.txt') + File.write(file_path, 'content') + + expect do + downloader.download(file_path, bucket: 'test-bucket') + end.to raise_error(ArgumentError, /invalid destination/) + end + + it 'raises when object key contains path traversal sequences' do + client.stub_responses( + :list_objects_v2, + { contents: [{ key: 'foo/../bar.txt', size: 100 }], is_truncated: false } + ) + + expect do + downloader.download(temp_dir, bucket: 'bucket') + end.to raise_error(DirectoryDownloadError, /invalid key/) + end + + context 's3 prefix' do + it 'removes prefixes to all keys when set' do + client.stub_responses( + :list_objects_v2, + { + contents: [ + { key: 'prefix/file1.txt', size: 100 }, + { key: 'prefix/subdir/file2.txt', size: 100 } + ], + is_truncated: false + } + ) + result = downloader.download(temp_dir, bucket: 'test-bucket', s3_prefix: 'prefix') + + expect(result[:completed_downloads]).to eq(2) + expect(File.exist?(File.join(temp_dir, 'file1.txt'))).to be true + expect(File.exist?(File.join(temp_dir, 'subdir', 'file2.txt'))).to be true + end + end + + context 'ignore_failure option' do + it 'stops downloading after failure by default' do + client.stub_responses(:get_object, 'AccessDenied') + + expect do + downloader.download(temp_dir, bucket: 'test-bucket') + end.to raise_error(DirectoryDownloadError) + end + + it 'continues downloading after failure when true' do + client.stub_responses(:get_object, lambda { |context| + if context.params[:key] == 'file2.json' + 'AccessDenied' + else + { body: 'content' } + end + }) + result = downloader.download(temp_dir, bucket: 'test-bucket', ignore_failure: true) + + expect(result[:completed_downloads]).to eq(2) + expect(result[:failed_downloads]).to eq(1) + expect(result[:errors].count).to eq(1) + end + end + + context 'filter callbacks' do + it 'excludes objects' do + filter = ->(key) { key.end_with?('.txt') } + result = downloader.download(temp_dir, bucket: 'test-bucket', filter_callback: filter) + + expect(result[:completed_downloads]).to eq(2) + end + end + + context 'request callbacks' do + it 'modifies download parameters' do + client.stub_responses( + :list_objects_v2, + { + contents: [{ key: 'file.txt', size: 100 }], + is_truncated: false + } + ) + + client.stub_responses( + :get_object, + lambda { |context| + received_params = context.params + expect(received_params[:version_id]).to eq('v1') + { body: 'content' } + } + ) + callback = lambda { |_key, params| + params[:version_id] = 'v1' + params + } + downloader.download(temp_dir, bucket: 'test-bucket', request_callback: callback) + end + end + + context 'progress callbacks' do + it 'reports progress' do + client.stub_responses( + :list_objects_v2, + { + contents: [ + { key: 'file1.txt', size: 100 }, + { key: 'file2.txt', size: 200 } + ], + is_truncated: false + } + ) + client.stub_responses(:get_object, { body: 'x' * 100 }) + progress_calls = [] + callback = ->(bytes, _files) { progress_calls << bytes } + downloader.download(temp_dir, bucket: 'test-bucket', progress_callback: callback) + + expect(progress_calls.length).to eq(2) + end + end + end + end + end +end diff --git a/gems/aws-sdk-s3/spec/directory_uploader_spec.rb b/gems/aws-sdk-s3/spec/directory_uploader_spec.rb new file mode 100644 index 00000000000..68303c5a91a --- /dev/null +++ b/gems/aws-sdk-s3/spec/directory_uploader_spec.rb @@ -0,0 +1,166 @@ +# frozen_string_literal: true + +require_relative 'transfer_manger_spec_helper' + +module Aws + module S3 + describe DirectoryUploader do + let(:client) { Aws::S3::Client.new(stub_responses: true) } + let(:executor) { DefaultExecutor.new } + let(:uploader) { DirectoryUploader.new(client: client, executor: executor) } + + describe '#initialize' do + it 'constructs with default options' do + uploader = DirectoryUploader.new + expect(uploader.abort_requested).to be false + end + + it 'accepts client and executor options' do + expect(uploader.client).to be(client) + expect(uploader.executor).to be(executor) + end + end + + describe '#upload' do + let(:temp_dir) { Dir.mktmpdir } + + before do + TransferManagerSpecHelper.create_test_directory_structure(temp_dir) + end + + after do + FileUtils.rm_rf(temp_dir) + end + + it 'handles empty directory' do + empty_dir = Dir.mktmpdir + result = uploader.upload(empty_dir, 'test-bucket') + + expect(result[:completed_uploads]).to eq(0) + expect(result[:failed_uploads]).to eq(0) + FileUtils.rm_rf(empty_dir) + end + + it 'raises when directory does not exist' do + expect do + uploader.upload('/nonexistent/path', 'test-bucket') + end.to raise_error(ArgumentError, /Invalid directory/) + end + + it 'can be aborted mid-upload' do + call_count = 0 + allow(client).to receive(:put_object) do + call_count += 1 + uploader.request_abort if call_count == 2 + end + + expect do + uploader.upload(temp_dir, 'test-bucket') + end.to raise_error(DirectoryUploadError) + end + + it 'raises when directory traversal fails' do + allow(File).to receive(:lstat).and_call_original + allow(File).to receive(:lstat).with(/small\.txt/).and_raise(Errno::EACCES, 'Permission denied') + expect do + uploader.upload(temp_dir, 'test-bucket', recursive: true, ignore_failure: true) + end.to raise_error(DirectoryUploadError, /Directory traversal failed/) + end + + context 'recursive' do + it 'uploads recursively when true' do + result = uploader.upload(temp_dir, 'test-bucket', recursive: true) + + expect(result[:completed_uploads]).to eq(9) + expect(result[:failed_uploads]).to eq(0) + end + + it 'uploads only direct files when false' do + result = uploader.upload(temp_dir, 'test-bucket') + + expect(result[:completed_uploads]).to eq(5) + end + end + + context 's3 prefix' do + it 'applies prefixes to all keys when set' do + uploaded_keys = [] + allow(client).to receive(:put_object) { |p| uploaded_keys << p[:key] } + result = uploader.upload(temp_dir, 'test-bucket', s3_prefix: 'uploads', recursive: true) + + expect(uploaded_keys).to all(start_with('uploads/')) + expect(uploaded_keys.length).to eq(9) + expect(result[:completed_uploads]).to eq(9) + end + end + + context 'follow_symlinks option' do + it 'follows symlinks when true' do + result = uploader.upload(temp_dir, 'test-bucket', recursive: true, follow_symlinks: true) + expect(result[:completed_uploads]).to eq(14) + expect(result[:failed_uploads]).to eq(0) + end + end + + context 'ignore_failure option' do + it 'stops uploading after failure by default' do + client.stub_responses(:put_object, 'AccessDenied') + expect do + uploader.upload(temp_dir, 'test-bucket', ignore_failure: false) + end.to raise_error(DirectoryUploadError) + end + + it 'continues uploading after failure when true' do + client.stub_responses(:put_object, lambda { |context| + %w[small.txt medium.log].include?(context.params[:key]) ? 'AccessDenied' : {} + }) + + result = uploader.upload(temp_dir, 'test-bucket', ignore_failure: true) + expect(result[:completed_uploads]).to eq(3) + expect(result[:failed_uploads]).to eq(2) + expect(result[:errors].length).to eq(2) + end + end + + context 'filter callbacks' do + it 'excludes files' do + uploaded_keys = [] + allow(client).to receive(:put_object) { |p| uploaded_keys << p[:key] } + filter_callback = ->(_path, file) { !file.end_with?('.bin') } + result = uploader.upload(temp_dir, 'test-bucket', filter_callback: filter_callback) + + expect(uploaded_keys).not_to include('huge.bin') + expect(result[:completed_uploads]).to eq(4) + end + end + + context 'request callbacks', :jruby_flaky do + it 'modifies upload parameters' do + uploaded_params = [] + allow(client).to receive(:put_object) { |p| uploaded_params << p } + request_callback = lambda do |_path, params| + params[:storage_class] = 'GLACIER' + params + end + + uploader.upload(temp_dir, 'test-bucket', request_callback: request_callback) + expect(uploaded_params).to all(include(storage_class: 'GLACIER')) + end + end + + context 'progress callbacks' do + it 'reports progress' do + progress_calls = [] + + callback = proc do |bytes, files| + progress_calls << { total_bytes: bytes, files_completed: files } + end + + uploader.upload(temp_dir, 'test-bucket', recursive: false, progress_callback: callback) + expect(progress_calls.length).to eq(5) + end + end + end + end + end +end diff --git a/gems/aws-sdk-s3/spec/transfer_manager_spec.rb b/gems/aws-sdk-s3/spec/transfer_manager_spec.rb index 659efae471b..bc7a802b2c3 100644 --- a/gems/aws-sdk-s3/spec/transfer_manager_spec.rb +++ b/gems/aws-sdk-s3/spec/transfer_manager_spec.rb @@ -1,8 +1,6 @@ # frozen_string_literal: true -require_relative 'spec_helper' -require 'socket' -require 'tempfile' +require_relative 'transfer_manger_spec_helper' module Aws module S3 @@ -22,6 +20,47 @@ module S3 end end + describe '#download_directory' do + let(:temp_dir) { Dir.mktmpdir } + + before do + client.stub_responses( + :list_objects_v2, + { + contents: [{ key: 'file1.txt', size: 100 }, { key: 'file2.txt', size: 100 }], + is_truncated: false + } + ) + client.stub_responses(:get_object, { body: 'content' }) + end + + after do + FileUtils.rm_rf(temp_dir) + end + + it 'returns results when download succeeds' do + result = subject.download_directory(temp_dir, bucket: 'bucket') + expect(result[:completed_downloads]).to eq(2) + expect(result[:failed_downloads]).to eq(0) + end + + it 'raises when download errors' do + client.stub_responses(:get_object, 'AccessDenied') + + expect do + subject.download_directory(temp_dir, bucket: 'bucket', ignore_failure: false) + end.to raise_error(DirectoryDownloadError) + end + + it 'calls progress callback when given' do + progress_calls = [] + callback = proc { |bytes, files| progress_calls << { bytes: bytes, files: files } } + + subject.download_directory(temp_dir, bucket: 'bucket', progress_callback: callback) + expect(progress_calls.length).to eq(2) + end + end + describe '#download_file', :jruby_flaky do let(:path) { Tempfile.new('destination').path } @@ -51,6 +90,39 @@ module S3 end end + describe '#upload_directory' do + let(:temp_dir) { Dir.mktmpdir } + + before do + TransferManagerSpecHelper.create_test_directory_structure(temp_dir) + end + + after do + FileUtils.rm_rf(temp_dir) + end + + it 'returns upload results when upload succeeds' do + result = subject.upload_directory(temp_dir, bucket: 'bucket') + expect(result[:completed_uploads]).to eq(5) + expect(result[:failed_uploads]).to eq(0) + end + + it 'raises when upload errors' do + client.stub_responses(:put_object, 'AccessDenied') + expect do + subject.upload_directory(temp_dir, bucket: 'bucket', ignore_failure: false) + end.to raise_error(DirectoryUploadError) + end + + it 'calls progress callback when given' do + progress_calls = [] + callback = proc { |bytes, files| progress_calls << { bytes: bytes, files: files } } + + subject.upload_directory(temp_dir, bucket: 'bucket', progress_callback: callback) + expect(progress_calls.length).to eq(5) + end + end + describe '#upload_file' do let(:file) do Tempfile.new('ten-meg-file').tap do |f| @@ -105,43 +177,10 @@ module S3 end end - def start_mirror_server(chunk_size) - server = TCPServer.new('127.0.0.1', 0) - port = server.addr[1] - chunks = [] - - server_thread = Thread.new do - Timeout.timeout(10) do - client = server.accept - headers = '' - while (line = client.gets) - headers += line - break if line.strip.empty? - end - - if headers.include?('Expect: 100-continue') - client.write("HTTP/1.1 100 Continue\r\n\r\n") - - loop do - sleep(0.01) # needs wait between reads - data = client.read_nonblock(chunk_size, exception: false) - break if data == :wait_readable || data.nil? - - chunks << data.size - end - end - client.write("HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n") - ensure - client.close - end - end - [server, server_thread, port] - end - it 'uses the given chunk size when uploading' do WebMock.disable! chunk_size = 32_768 - server, server_thread, port = start_mirror_server(chunk_size) + server, server_thread, port = TransferManagerSpecHelper.start_mirror_server(chunk_size) client = Aws::S3::Client.new( endpoint: "http://127.0.0.1:#{port}", region: 'us-east-1', @@ -155,8 +194,8 @@ def start_mirror_server(chunk_size) .to receive(:custom_stream).and_call_original allow_any_instance_of(Aws::Plugins::ChecksumAlgorithm::AwsChunkedTrailerDigestIO) .to receive(:read).and_wrap_original do |method, size| - read_sizes << size - method.call(size) + read_sizes << size + method.call(size) end tm.upload_file(test_file, bucket: 'test-bucket', key: 'test-key', http_chunk_size: chunk_size) @@ -170,7 +209,7 @@ def start_mirror_server(chunk_size) it 'uses default chunk size' do WebMock.disable! chunk_size = 16_384 - server, server_thread, port = start_mirror_server(chunk_size) + server, server_thread, port = TransferManagerSpecHelper.start_mirror_server(chunk_size) client = Aws::S3::Client.new( endpoint: "http://127.0.0.1:#{port}", region: 'us-east-1', @@ -182,8 +221,8 @@ def start_mirror_server(chunk_size) allow_any_instance_of(Aws::Plugins::ChecksumAlgorithm::AwsChunkedTrailerDigestIO) .to receive(:read).and_wrap_original do |method, size| - read_sizes << size - method.call(size) + read_sizes << size + method.call(size) end tm.upload_file(test_file, bucket: 'test-bucket', key: 'test-key') server_thread.join diff --git a/gems/aws-sdk-s3/spec/transfer_manger_spec_helper.rb b/gems/aws-sdk-s3/spec/transfer_manger_spec_helper.rb new file mode 100644 index 00000000000..26a4fc81e18 --- /dev/null +++ b/gems/aws-sdk-s3/spec/transfer_manger_spec_helper.rb @@ -0,0 +1,84 @@ +# frozen_string_literal: true + +require_relative 'spec_helper' +require 'tempfile' +require 'tmpdir' +require 'socket' + +# Spec helper for transfer manager-related tests +module TransferManagerSpecHelper + class << self + def create_test_directory_structure(base_dir) + # Root files + create_file_with_size(base_dir, 'small.txt', 1024) # 1KB + create_file_with_size(base_dir, 'medium.log', 1024 * 1024) # 1MB + create_file_with_size(base_dir, 'large.dat', 10 * 1024 * 1024) # 10MB + create_file_with_size(base_dir, 'huge.bin', 20 * 1024 * 1024) # 20MB + create_file_with_size(base_dir, 'target.txt', 1024 * 1024) # 1MB + + # Nested directories + subdir1 = File.join(base_dir, 'documents') + Dir.mkdir(subdir1) + create_file_with_size(subdir1, 'readme.md', 2048) # 2KB + create_file_with_size(subdir1, 'backup.zip', 10 * 1024 * 1024) # 10MB + + subdir2 = File.join(base_dir, 'images') + Dir.mkdir(subdir2) + create_file_with_size(subdir2, 'photo1.jpg', 2 * 1024 * 1024) # 2MB + + # Deep nesting + subdir3 = File.join(subdir2, 'thumbnails') + Dir.mkdir(subdir3) + create_file_with_size(subdir3, 'thumb.jpg', 50 * 1024) # 50KB + + # Symlinks for testing + File.symlink(File.join(base_dir, 'small.txt'), File.join(base_dir, 'small_link.txt')) + File.symlink(subdir1, File.join(base_dir, 'docs_link')) + + # Recursive symlink (points back to parent directory) + File.symlink(base_dir, File.join(subdir1, 'parent_link')) + File.symlink(File.join(base_dir, 'target.txt'), File.join(base_dir, 'link1.txt')) + File.symlink(File.join(base_dir, 'link1.txt'), File.join(base_dir, 'link2.txt')) + end + + def start_mirror_server(chunk_size) + server = TCPServer.new('127.0.0.1', 0) + port = server.addr[1] + chunks = [] + + server_thread = Thread.new do + Timeout.timeout(10) do + client = server.accept + headers = '' + while (line = client.gets) + headers += line + break if line.strip.empty? + end + + if headers.include?('Expect: 100-continue') + client.write("HTTP/1.1 100 Continue\r\n\r\n") + + loop do + sleep(0.01) # needs wait between reads + data = client.read_nonblock(chunk_size, exception: false) + break if data == :wait_readable || data.nil? + + chunks << data.size + end + end + client.write("HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n") + ensure + client.close + end + end + [server, server_thread, port] + end + + private + + def create_file_with_size(dir, filename, size_bytes) + file_path = File.join(dir, filename) + File.write(file_path, 'x' * size_bytes) + end + end +end