-
Notifications
You must be signed in to change notification settings - Fork 1.7k
AVRO-3594: FsInput to use openFile() API #1807
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
except that hadoop-2 profile still exists, doesn't it? which means that even though hadoop 3 profile is full of features, the 2.x one blocks things from working |
| // Filesystems which don't recognize the options will ignore them | ||
|
|
||
| this.stream = awaitFuture(fileSystem.openFile(path).opt("fs.option.openfile.read.policy", "adaptive") | ||
| .opt("fs.option.openfile.length", Long.toString(len)).build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice optimization,
May be better with usage of Options.OpenFileOptions constants
FS_OPTION_OPENFILE_LENGTH
FS_OPTION_OPENFILE_READ_POLICY
FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we only added those options explicitly in branch-3.3; the code wouldn't compile/link with 3.3.0. hence the strings. Unfortunately it also means you don't get that speedup until 3.3.5 ships ....but the release will be ready.
3.3.0 did add the withFileStatus(FileStatus) param which was my original design for passing in all filestatus info, inc etag and maybe version, so you can go straight from listing to opening.
first in s3a, added abfs in 3.3.5. but it is too brittle because the path checking requires status.getPath to equal the path opened. and hive with its wrapper fs doesn't always do that.
Passing in file length is guaranteed to be ignored or actually used...no brittleness. it also suits hive where workers know the length of the file but don't have a status.
One thing i can add with immediate benefit in 3.3.0 is the initial the fs.s3a.experimental.fadvise option, which again can mandate be adaptive, even on hive clusters where they explicitly set read policy to be random (which some do for max orc/parquet performance). The new opt fs.option.openfile.read.policy is an evolution of that (you can now specify a list of policies and the first one recognised is understood. if someone ever implemented explicit "parquet", "orc", and "avro" for example), you could say the read policy is "orc, random, adaptive" and get the first one known.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, thanks for explanation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note that org.apache.hadoop.fs.AvroFSInput does this
clesaec
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
| // Filesystems which don't recognize the options will ignore them | ||
|
|
||
| this.stream = awaitFuture(fileSystem.openFile(path).opt("fs.option.openfile.read.policy", "adaptive") | ||
| .opt("fs.option.openfile.length", Long.toString(len)).build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, thanks for explanation.
|
@clesaec this can't go in until hadoop2 is cut as a profile. I am actually doing a shim library to do reflection invocation on the newer operations, with fallbacks if not found. but even so, the sooner avro goes to recent hadoop 3.x release only the better. |
Boost performance reading from object stores in hadoop by using the openFile builder API and passing in the file length as an option (can save a HEAD) and asks for adaptive IO (sequential going to random if the client starts seeking)
19c0695 to
d70a1ba
Compare
Change-Id: Ie7099208b601a08823775e901359a6727f0c6fe6
d70a1ba to
a20c7fb
Compare
|
@opwvhk thanks for the review, will do a merge rebase to make sure it is good. |
| // "random" | ||
| // will not suffer when reading large avro files. | ||
| this.stream = awaitFuture(fileSystem.openFile(path) | ||
| .opt(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE).withFileStatus(st).build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to add the same fallback code as in parquet; s3a connector in 3.3.0-3.3.4 validates the whole path, and overreacts to hive's wrapping of the fs by its own VFS; 3.3.5+ only look at the filename.
Change-Id: I4ad736a35d625db9d67deda1d11a2d6e51a789e6
Change-Id: If4c9726a97d1d550d0256d4798ce1b484a72fe3a
* Update for the specific "avro" read policy in Hadoop 3.4.1. (S3A maps to sequential/forward seeking IO) * Add the file status if the path names all match. Change-Id: If755c047a6fbafff20e9eaacee257be72ca76ddb
* spotlessness Change-Id: Ia7b0336853c5b5f24bb015e1d27d3401cfedd39a
Change-Id: I3f3ac8c579d0b4e93789e9fc952a503943e4b659
Key one: what happens when you try to open a file that isn't there. The other two just check semantics of out of bound seeks/reads. Change-Id: I2417506354ae92e27de3b18c357e97ed73369baa
|
new test failure |
Build flagged that two existing tests weren't using the try with resources input stream `in`. This means they weren't actually testing that the constructors produced valid streams. Change-Id: Iee35ef8931c71d940640ad67dc06d963ef42cd63
| // optimize read performance and save on a HEAD request when opening | ||
| // a file. | ||
| final FutureDataInputStreamBuilder builder = fileSystem.openFile(path).opt(FS_OPTION_OPENFILE_READ_POLICY, | ||
| "avro, sequential, adaptive"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since these read policies are going to be used by file sysem to understand the user intended pattern. Do we have a way to standardise these values via means of constants or an enum class that can be used across projects?
I might be missing a similar thing already existing in hadoop. Please let me know if its already there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we do have them in hadoop, but that hard codes to specific versions (the "avro" one is fairly recent. Better for me to put them into an avro class
Boost performance reading from object stores in hadoop 3.3.0+ by using the openFile builder API and passing in the file length as an option (can save a HEAD) and asks for adaptive IO (sequential going to random if the client starts seeking)
saving that HEAD request is a key benefit against s3 as it can save 50-100 mS per file.
Jira
Tests
Integration tests would be the way to do this, but the foundational set up to do this
is pretty complex. My cloudstream project downstream of spark is set up to do this.
Commits
Documentation
no new docs