-
Notifications
You must be signed in to change notification settings - Fork 1.2k
S3 Directory Support #3304
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: version-3
Are you sure you want to change the base?
S3 Directory Support #3304
Changes from all commits
8c7ca45
c21969a
39ecf0a
a3f2b9f
3156f7c
84c9966
8e16a3b
db1cb62
f907c3b
7cb940a
4003536
7dddda9
481f198
88bf44a
c1a25cd
7522a16
89cffe7
9eea233
75b0d96
ad943ee
f1fc86a
09eae68
e824de0
c073349
cd91eb7
abf78d6
04a287f
54b9add
ca6c2ae
c9bf8ed
5c6caa7
7eafc7c
d7d5738
845aa13
5201f35
f9e0758
bacc68d
51b76dc
062ff5e
0d0fa59
07d1d43
4e7fddd
ca0b9ca
5d0a773
ee90f2f
f9b686d
fb6761b
787d81a
7c5a803
25c7dd6
9412b63
3a82930
22df10c
3dae2ff
58fcfe5
f83d174
569f1f7
a173b2d
9c331eb
ce9b65f
78fcf15
31867db
853be51
95ec3db
12f0338
7a0f135
e041a1e
a76bd21
a9f89c2
ea4f261
9d9f6c6
c5238ca
735c2b3
9341672
bc13fae
78200c9
36042f2
7812d29
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,16 @@ | ||
| # frozen_string_literal: true | ||
|
|
||
| module Aws | ||
| module S3 | ||
| # Raised when DirectoryDownloader fails to download objects from S3 bucket | ||
| class DirectoryDownloadError < StandardError | ||
| def initialize(message, errors = []) | ||
| @errors = errors | ||
| super(message) | ||
| end | ||
|
|
||
| # @return [Array<StandardError>] The list of errors encountered when downloading objects | ||
| attr_reader :errors | ||
| end | ||
| end | ||
| end |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,212 @@ | ||
| # frozen_string_literal: true | ||
|
|
||
| module Aws | ||
| module S3 | ||
| # @api private | ||
| class DirectoryDownloader | ||
| def initialize(options = {}) | ||
| @client = options[:client] | ||
| @executor = options[:executor] | ||
| @abort_requested = false | ||
| @mutex = Mutex.new | ||
| end | ||
|
|
||
| attr_reader :client, :executor | ||
|
|
||
| def abort_requested | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This might be more clear with a |
||
| @mutex.synchronize { @abort_requested } | ||
| end | ||
|
|
||
| def request_abort | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "request_abort" feels more verbose than just "abort". I think the internal |
||
| @mutex.synchronize { @abort_requested = true } | ||
| end | ||
|
|
||
| def download(destination, bucket:, **options) | ||
| if File.exist?(destination) | ||
| raise ArgumentError, 'invalid destination, expected a directory' unless File.directory?(destination) | ||
| else | ||
| FileUtils.mkdir_p(destination) | ||
| end | ||
|
|
||
| download_opts, producer_opts = build_opts(destination, bucket, options) | ||
| downloader = FileDownloader.new(client: @client, executor: @executor) | ||
| producer = ObjectProducer.new(producer_opts) | ||
| downloads, errors = process_download_queue(producer, downloader, download_opts) | ||
| build_result(downloads, errors) | ||
| ensure | ||
| @abort_requested = false | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we setting @abort_requested to false here?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this need |
||
| end | ||
|
|
||
| private | ||
|
|
||
| def build_opts(destination, bucket, opts) | ||
| download_opts = { | ||
| progress_callback: opts[:progress_callback], | ||
| destination: destination, | ||
| ignore_failure: opts[:ignore_failure] || false | ||
| } | ||
| producer_opts = { | ||
| client: @client, | ||
| directory_downloader: self, | ||
| destination: destination, | ||
| bucket: bucket, | ||
| s3_prefix: opts[:s3_prefix], | ||
| filter_callback: opts[:filter_callback], | ||
| request_callback: opts[:request_callback] | ||
| } | ||
| [download_opts, producer_opts] | ||
| end | ||
|
|
||
| def build_result(download_count, errors) | ||
| if abort_requested | ||
| msg = "directory download failed: #{errors.map(&:message).join('; ')}" | ||
| raise DirectoryDownloadError.new(msg, errors) | ||
| else | ||
| { | ||
| completed_downloads: [download_count - errors.count, 0].max, | ||
| failed_downloads: errors.count, | ||
| errors: errors.any? ? errors : nil | ||
| }.compact | ||
| end | ||
| end | ||
|
|
||
| def download_object(entry, downloader, opts, progress, errors) | ||
| raise entry.error if entry.error | ||
|
|
||
| FileUtils.mkdir_p(File.dirname(entry.path)) unless Dir.exist?(File.dirname(entry.path)) | ||
| downloader.download(entry.path, entry.params) | ||
| progress&.call(File.size(entry.path)) | ||
| rescue StandardError => e | ||
| @mutex.synchronize { errors << e } | ||
| request_abort unless opts[:ignore_failure] | ||
| end | ||
|
|
||
| def process_download_queue(producer, downloader, opts) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In a situation where we need to raise, I decided against doing |
||
| progress = DirectoryProgress.new(opts[:progress_callback]) if opts[:progress_callback] | ||
| queue_executor = DefaultExecutor.new | ||
| completion_queue = Queue.new | ||
| download_attempts = 0 | ||
| errors = [] | ||
| begin | ||
| producer.each do |object| | ||
| break if abort_requested | ||
|
|
||
| download_attempts += 1 | ||
| queue_executor.post(object) do |o| | ||
| download_object(o, downloader, opts, progress, errors) | ||
| ensure | ||
| completion_queue << :done | ||
| end | ||
| end | ||
| rescue StandardError => e | ||
| @mutex.synchronize { errors << e } | ||
| request_abort | ||
| end | ||
| download_attempts.times { completion_queue.pop } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it guaranteed that download_attempts will be less than or equal to completion_queue size? |
||
| [download_attempts, errors] | ||
| ensure | ||
| queue_executor&.shutdown | ||
| end | ||
|
|
||
| # @api private | ||
| class ObjectProducer | ||
| include Enumerable | ||
|
|
||
| DEFAULT_QUEUE_SIZE = 100 | ||
| DONE_MARKER = :done | ||
|
|
||
| def initialize(opts = {}) | ||
| @directory_downloader = opts[:directory_downloader] | ||
| @destination_dir = opts[:destination] | ||
| @bucket = opts[:bucket] | ||
| @client = opts[:client] | ||
| @s3_prefix = opts[:s3_prefix] | ||
| @filter_callback = opts[:filter_callback] | ||
| @request_callback = opts[:request_callback] | ||
| @object_queue = SizedQueue.new(DEFAULT_QUEUE_SIZE) | ||
| end | ||
|
|
||
| def each | ||
| producer_thread = Thread.new do | ||
| stream_objects | ||
| ensure | ||
| @object_queue << DONE_MARKER | ||
| end | ||
|
|
||
| # Yield objects from internal queue | ||
| while (object = @object_queue.shift) != DONE_MARKER | ||
| break if @directory_downloader.abort_requested | ||
|
|
||
| yield object | ||
| end | ||
| ensure | ||
| producer_thread.join | ||
| end | ||
|
|
||
| private | ||
|
|
||
| def apply_request_callback(key, params) | ||
| @request_callback&.call(key, params.dup) | ||
| end | ||
|
|
||
| def build_object_entry(key) | ||
| params = { bucket: @bucket, key: key } | ||
| params = apply_request_callback(key, params) if @request_callback | ||
| normalized_key = normalize_key(key) | ||
| full_path = File.join(@destination_dir, normalized_key) | ||
| error = validate_path(full_path, key) | ||
| DownloadEntry.new(path: full_path, params: params, error: error) | ||
| end | ||
|
|
||
| def include_object?(key) | ||
| return true unless @filter_callback | ||
|
|
||
| @filter_callback.call(key) | ||
| end | ||
|
|
||
| def directory_marker?(obj) | ||
| obj.key.end_with?('/') && obj.size.zero? | ||
| end | ||
|
|
||
| def normalize_key(key) | ||
| if @s3_prefix | ||
| prefix = @s3_prefix.end_with?('/') ? @s3_prefix : "#{@s3_prefix}/" | ||
| key = key.delete_prefix(prefix) | ||
| end | ||
| File::SEPARATOR == '/' ? key : key.tr('/', File::SEPARATOR) | ||
| end | ||
|
|
||
| def stream_objects(continuation_token: nil) | ||
| resp = @client.list_objects_v2(bucket: @bucket, prefix: @s3_prefix, continuation_token: continuation_token) | ||
| resp.contents&.each do |o| | ||
| break if @directory_downloader.abort_requested | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will block on the mutex - I'm not sure we always need to that here. It is definitely safe to do so, but likely has a performance hit. Ditto I think with the check on line 91: Since we're using a SizedQueue for communicating between threads - maybe we could use clear/close on it rather than constantly blocking on the mutex? If we used |
||
|
|
||
| next if directory_marker?(o) | ||
| next unless include_object?(o.key) | ||
|
|
||
| @object_queue << build_object_entry(o.key) | ||
| end | ||
| stream_objects(continuation_token: resp.next_continuation_token) if resp.next_continuation_token | ||
| end | ||
|
|
||
| def validate_path(path, key) | ||
| segments = path.split('/') | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be |
||
| return unless segments.any? { |s| %w[. ..].include?(s) } | ||
|
|
||
| DirectoryDownloadError.new("invalid key '#{key}': contains '.' or '..' path segments") | ||
| end | ||
|
|
||
| # @api private | ||
| class DownloadEntry | ||
| def initialize(opts = {}) | ||
| @path = opts[:path] | ||
| @params = opts[:params] | ||
| @error = opts[:error] | ||
| end | ||
|
|
||
| attr_reader :path, :params, :error | ||
| end | ||
| end | ||
| end | ||
| end | ||
| end | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| # frozen_string_literal: true | ||
|
|
||
| module Aws | ||
| module S3 | ||
| # @api private | ||
| class DirectoryProgress | ||
| def initialize(progress_callback) | ||
| @transferred_bytes = 0 | ||
| @transferred_files = 0 | ||
| @progress_callback = progress_callback | ||
| @mutex = Mutex.new | ||
| end | ||
|
|
||
| def call(bytes_transferred) | ||
| @mutex.synchronize do | ||
| @transferred_bytes += bytes_transferred | ||
| @transferred_files += 1 | ||
|
|
||
| @progress_callback.call(@transferred_bytes, @transferred_files) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see why we're calling the progress_callback inside the |
||
| end | ||
| end | ||
| end | ||
| end | ||
| end | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,16 @@ | ||
| # frozen_string_literal: true | ||
|
|
||
| module Aws | ||
| module S3 | ||
| # Raised when DirectoryUploader fails to upload files to S3 bucket | ||
| class DirectoryUploadError < StandardError | ||
| def initialize(message, errors = []) | ||
| @errors = errors | ||
| super(message) | ||
| end | ||
|
|
||
| # @return [Array<StandardError>] The list of errors encountered when uploading files | ||
| attr_reader :errors | ||
| end | ||
| end | ||
| end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: it looks like default_executor is listed twice here.