Skip to content

Support Direct Recursive Hive File Listings#12443

Merged
arhimondr merged 5 commits intotrinodb:masterfrom
pettyjamesm:recursive-s3-file-listing
Jun 23, 2022
Merged

Support Direct Recursive Hive File Listings#12443
arhimondr merged 5 commits intotrinodb:masterfrom
pettyjamesm:recursive-s3-file-listing

Conversation

@pettyjamesm
Copy link
Copy Markdown
Member

@pettyjamesm pettyjamesm commented May 17, 2022

Description

This change includes three changes that are designed to improve the efficiency of the hive plugin DirectoryLister when used in combination with a FileSystem that supports more efficient recursive listing via FileSystem#listFiles(Path, boolean recursive) like the TrinoS3FileSystem.

  1. The first commit extends the DirectoryLister interface with an additional method that requests that a full recursive file listing instead of a shallow, "files and directories" listing. This necessarily required altering CachingDirectoryLister and TransactionScopeCachingDirectoryLister to be able to store both kinds of listings at the same path, which was done by adding DirectoryListingCacheKey which distinguishes the two kinds of keys via the sign bit in their precomputed hash code.
  2. The second commit modifies HiveFileIterator to conditionally call DirectoryLister.listFilesRecursively when recursive behaviors are requested instead of DirectoryLister.list. This greatly simplifies the the implementation, since now recursion and traversal down into sub-paths are handled by the file system itself, but comes with the cost of attempting to identify "hidden sub-paths" between the top level path and child paths in a slightly more complex fashion.
  3. The third and final commit changes the behavior of HiveFileIterator to avoid eagerly checking FileSystem.exists when ignoreAbsentPartitions is true, instead waiting for the listing to throw an exception and then checking whether the exception was caused by the path existing. This is especially useful for the TrinoS3FileSystem since the implementation of FileSystem.exists itself performs an S3 listing operation.

Overall, this greatly reduces the number of S3 listing operations that will be performed when there is a large number of nested "directories" since we can instead fully enumerate all of the leaf S3 object paths when that's the desired behavior. For instance the ELB access log S3 path structure is described here will generate a separate "directory" per region, per day- which can quickly result in many hundreds of S3 listing calls to recurse through the hierarchy before reaching the actual data files without this improvement.

This change bridges the gap between the BackgroundHiveSplitLoader and the optimized TrinoS3FileSystem#listFiles(Path, boolean recursive) implementation originally contributed in #4825

Documentation

( ) No documentation is needed.
(x) Sufficient documentation is included in this PR.
( ) Documentation PR is available with #prnumber.
( ) Documentation issue #issuenumber is filed, and can be handled later.

Release notes

(x) No release notes entries required.
( ) Release notes entries required with the following suggested text:

@cla-bot cla-bot bot added the cla-signed label May 17, 2022
@pettyjamesm pettyjamesm force-pushed the recursive-s3-file-listing branch from 4be5f05 to cd1f29f Compare May 17, 2022 19:01
@pettyjamesm pettyjamesm marked this pull request as ready for review May 17, 2022 21:19
@pettyjamesm pettyjamesm requested a review from sopel39 May 17, 2022 21:19
@sopel39 sopel39 requested review from ebyhr and findepi and removed request for sopel39 May 17, 2022 22:44
@pettyjamesm pettyjamesm force-pushed the recursive-s3-file-listing branch from cd1f29f to df4839f Compare May 18, 2022 14:12
@pettyjamesm pettyjamesm requested review from dain, findinpath and sopel39 and removed request for dain May 18, 2022 14:12
@findepi findepi requested a review from electrum May 19, 2022 14:10
@pettyjamesm pettyjamesm force-pushed the recursive-s3-file-listing branch from df4839f to 228183a Compare May 20, 2022 18:09
@findepi
Copy link
Copy Markdown
Member

findepi commented May 20, 2022

@findinpath @alexjo2144 PTAL

@findinpath
Copy link
Copy Markdown
Contributor

nit: Use recurisve listing in HiveFileIterator small typo in recurisve

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this correct when we want to list the whole tree under the specified path?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, we want to list all files underneath the specified path (not including the intermediate "directory" entries). This is ultimately the goal of a "recursive listing" when generating splits in Hive, but comes with a caveat- we no longer can identify and skip sub-listing "hidden" directories, so we have to filter out the files that are listed underneath those hidden paths after the fact.

@pettyjamesm pettyjamesm force-pushed the recursive-s3-file-listing branch from 228183a to 14743e5 Compare May 23, 2022 14:37
@pettyjamesm
Copy link
Copy Markdown
Member Author

nit: Use recurisve listing in HiveFileIterator small typo in recurisve

Woops, fixed.

@findinpath
Copy link
Copy Markdown
Contributor

findinpath commented May 23, 2022

@pettyjamesm please setup a test containing hive.recursive-directories set to true which runs on both Hadoop & MinIO/AWS S3 in order to add coverage for regressions in the functionality affected by this PR.
I went over the Trino code and couldn' t find any test making use of recursive DFS directory lister.

We need to add these tests in order guard Trino against eventual future regressions.

@sopel39 sopel39 removed their request for review May 23, 2022 15:52
@pettyjamesm
Copy link
Copy Markdown
Member Author

@findinpath - I might need some help figuring out how to put one of those together, the only existing tests I can see bootstrapping minio components are in the delta lake connector. Is it fine to assume certain bucket names and instance local credentials for the purposes of integrating with S3 in tests?

@findinpath
Copy link
Copy Markdown
Contributor

@pettyjamesm ideally the class io.trino.plugin.deltalake.util.DockerizedMinioDataLake should be extracted to a common place (e.g. : trino-testing-containers).

With this common building block, we could build integration tests for the all Lakehouse connectors.

Is it fine to assume certain bucket names and instance local credentials for the purposes of integrating with S3 in tests?

Yes, it should be fine.
Follow along the lines what is being done in io.trino.plugin.deltalake.DeltaLakeQueryRunner#createS3DeltaLakeQueryRunner(java.lang.String, java.lang.String, java.util.Map<java.lang.String,java.lang.String>, java.util.Map<java.lang.String,java.lang.String>, java.util.Map<java.lang.String,java.lang.String>, java.lang.String, io.trino.plugin.deltalake.util.TestingHadoop, java.util.function.Consumer<io.trino.testing.QueryRunner>) for doing the necessary adaptations on Hive on top of MinIO.

Please do take into account the HDFS backed integration tests as well - ideally we'd have common base class containing the tests for the directory walker functionality on which we could eventually add also Azure/ GCS tests later (not in scope of this PR).

Copy link
Copy Markdown
Member

@alexjo2144 alexjo2144 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly nit-picks

One thing we might want to think about is that there may be a lot of duplicates in this cache if a directory is asked for in both recursive and non-recursive listings

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this ignoreAbsentPartitions && !filesystem.exisists(path) to the top, before constructing the FileStatusIteraor? That way we don't have to rely on Exception handling to resolve the condition

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea behind this change is to avoid that extra check for fileSystem.exists(path) if the listing succeeds

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
catch (Exception ee) {
catch (RuntimeException ee) {

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to handle any Exception type here, since we're promoting it to a TrinoException with a specific error code and message regardless of the underlying cause.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just do something like:

stream(pathSubstring.split(String.valueOf(SEPARATOR_CHAR)))
  .anyMatch(name -> ...));

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could, but you'd end up generating a lot of extra allocations in a performance sensitive path and wouldn't allow isHiddenFileOrDirectory and isHiddenOrWithinHiddenParentDirectory to both share the same logic.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this just here to precompute the hash code? I'd usually just compute it as needed.

Also we mostly we just do Objects.hash(path, recursiveFilesOnly)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Precomputing and storing the hashcode avoids an extra pointer indirection through the path reference, which is significant in terms of latency when, eg: rehashing internal Cache structures which is typically memory access latency bound.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please use Objects.hash(path, recursiveFilesOnly) ?
The result is slightly different, because Boolean.hashCode returns either 1231/1237 instead of 0/1, but it is easier to grasp by the maintainers of the code.

Please do add also a comment in the code explainining why the hash code is precomputed.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
&& hashCode == other.hashCode

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds redundant with isHiddenOrWithinHiddenParentDirectory?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is hidden file or directory only looks at the "name", ie: the last path part and is used when doing shallow listings that don't recurse. When we're donig a recursive listing, we have to check for intermediate hidden folders between the current listing root and the final file name part, but not for shallow listings.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe includesRecursiveFiles or includesRecusiveListing?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see now where the naming you choose comes from : io.trino.plugin.hive.s3.TrinoS3FileSystem.ListingMode#RECURSIVE_FILES_ONLY.

Comment on lines 123 to 126
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not always call fs.listFiles(cachKey.getPath(), cacheKey.recursiveFilesOnly()?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fs.listFiles(Path, boolean recursive) only lists files, not directories (and does so either recursively, or only at the current level), while fs.listLocatedStatus(Path) returns both.

@pettyjamesm
Copy link
Copy Markdown
Member Author

One thing we might want to think about is that there may be a lot of duplicates in this cache if a directory is asked for in both recursive and non-recursive listings

While that's true in theory, I don't think will actually occur in practice- at least in terms of how the code works today. The only time a directory will be listed recursively is inside of the BackgroundHiveSplitLoader when the HiveConfig(hive.recursive-directories=true). Since this is config driven, for any table and any given cluster- the listings should always be either shallow or recursive.

@pettyjamesm pettyjamesm force-pushed the recursive-s3-file-listing branch from 14743e5 to 169096b Compare June 2, 2022 16:54
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understand, both list operations are listing ONLY files (and not sub-directories).

Please take into consideration changing the method name. e.g. : listRecursively

Copy link
Copy Markdown
Member Author

@pettyjamesm pettyjamesm Jun 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That’s incorrect, the list method will list all contents at the requested path (shallowly) including directories, which is why the HiveFileIterator previously handled recursing through child directories (or failing, or ignoring them as per whatever nested directory policy was set). The new method listFilesRecursively will list only files, at any depth below the requested path.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fail to see a constellation in which directories would be returned by the list method with the current NestedDirectoryPolicy values: IGNORED, RECURSE, FAIL.

It would probably make sense to put a separate (preparatory) PR containing the newly added hive test to make sure that there is no regression performed through your latest changes. Just to be on the safe-side.

Copy link
Copy Markdown
Member Author

@pettyjamesm pettyjamesm Jun 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nested directory policy previously only affected how the HiveFileIterator handled encountered directories, but had no bearing on the results of the DirectoryLister. Now, when the mode is set to recurse, we call the appropriate DirectoryLister method which calls the corresponding FileSystem#listFiles(Path, boolean recursive=true) method to handle recursing and returning only files (not directories) for us. Effectively, the directory lister now has two listing modes:

  • shallow, files + directories
  • recursive, files only

The only change here is that we’re now letting the FileSystem handle “recursing” into subdirectories to enumerate the leaf “files” for us, which allows some implementations (like TrinoS3FileSystem, which already has an optimized listFiles overriding implementation from a prior PR but wasn’t previously being used by the DirectoryLister) to provide much more efficient implementations.

Specifically, most blob store APIs (like S3) provide a mechanism to directly list all keys starting with a given prefix, regardless of how many / characters are between the prefix and the end of the key, which greatly reduces the number of unnecessary list calls compared to the behavior before that would issue another batch of listings to traverse into each “sub-directory”.

For most other FileSystem implementations, the behavior should be effectively unchanged, because the FileSystem parent class implementation does essentially the same thing that HiveFileIterator was doing before this change: it builds a Deque of RemoteIterator instances and repeatedly calls FileSystem#listLocatedStatus(Path) to recurse into subdirectories.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I was fixed previously on the HiveFileIterator, but noticed now that the discussion is about the DirectoryLister and its concrete implementation.

@findinpath
Copy link
Copy Markdown
Contributor

This change is specifically targeted at hive.recursive-directories setting hive connector setting.
@pettyjamesm do you have any numbers available to share on how this PR is improving the query times for a fairly large table with and without caching enabled?

@pettyjamesm do you see any timeout related issues related to the recursive listing of a table containing a huge number of files ?

@pettyjamesm
Copy link
Copy Markdown
Member Author

pettyjamesm commented Jun 9, 2022

This change is specifically targeted at hive.recursive-directories setting hive connector setting. @pettyjamesm do you have any numbers available to share on how this PR is improving the query times for a fairly large table with and without caching enabled?

This greatly depends on the level of nesting and how many files are at the “leaves” at each level, but anecdotally if you have a directory layout like the one I mentioned and linked in the PR description for ELB logs (link)- the difference can be enormous. On the order of queries taking 30+ minutes (traversing through layers and layers of listing calls and producing no splits for most of the query duration) to ~30 seconds (splits generated immediately and all files enumerated using orders of magnitude fewer list API calls).

@pettyjamesm do you see any timeout related issues related to the recursive listing of a table containing a huge number of files ?

No timeouts, although the current behavior without this change does increase the risk of throttling because of the extremely high call volume. Queries just take much much longer when recursive listing is performed against S3 because of inefficient use of the API.

As a worked example, assuming the ELB log layout above (constant-prefix/{region}/{YYYY}/{mm}/{dd}/<file name>)- if we have logs from:

  • 1 region, for one year: that's 379 listObjectsV2 calls to enumerate all files before, and <total file count> / 1000 listings afterward.
  • 2 regions 2 years: would be 1,513 listObjectsV2 calls before, and still <total file count> / 1000 after this change.

@findinpath
Copy link
Copy Markdown
Contributor

As discussed on Slack, I would find beneficial adding a new test class TestCachingDirectoryListerRecursiveFilesOnly similar to TestCachingDirectoryLister with the query runner having the property:

"hive.recursive-directories", "true"

in order to ensure the accuracy of the new implementation.

@pettyjamesm pettyjamesm force-pushed the recursive-s3-file-listing branch from c0521f0 to 87fce3d Compare June 13, 2022 17:31
@pettyjamesm
Copy link
Copy Markdown
Member Author

@findepi / @findinpath - can we now consider this ready to merge?

Copy link
Copy Markdown
Contributor

@findinpath findinpath left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code-wise this PR is great.

Functionality-wise it seems to be handling an exotic/not so generic use-case that without proper documentation will be not spotted by the Trino users.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quite interesting.

Without the hive.recursive-directories set to true the reading from such a table would not work at all.

@findepi , @alexjo2144 is this use-case too exotic for trino-hive?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The partitioned table + drop / recreate table unpartitioned is just a convenience for this test. The FileHiveMetastore implementation doesn't allow duplicate tables with the same name, and table names must correspond to their file system location, so I'm using the partitioned table to insert new records into sub-paths, dropping it (without deleting the data) and then creating a new table at the same path location using the same name.

This isn't a typical usage pattern, but it does work to exercise the recursive listing behaviors in combination with caching.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of doing the low-level table dropping and creation, could we create two tables with the same external_location:

  • for writing use the partitioned configuration partitioned_by = ARRAY['day', 'country']
  • for reading use the plain configuration without partioning on which we can use the list recursively files super power

This seems to me (although it screams like an anti-pattern) the way in which such a scenario could be implemented in the real-world.

Please take this note with a grain of salt.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to do something along those lines, but I couldn't get it work very easily in the way the tests are set up- so I just went with this approach for the purposes of this test. It's not intended to represent a realistic usage pattern outside of this test scenario.

@pettyjamesm pettyjamesm force-pushed the recursive-s3-file-listing branch from f15ab74 to e6abd74 Compare June 17, 2022 13:29
Adds support for directly listing all recursive files in directory
listing and associated caches.
Removes a pre-emptive FileSystem#exists(Path) check before attempting
to list path contents when absent partition errors are configured to
be ignored. Instead, ignoring absent partitions can be done as part
of a check only in the case where partition listing actually fails.

For file systems like S3, checking the existence of a "directory"
already incurs an S3 listing call, which are relatively expensive in
terms of API rate limits and latency.
@pettyjamesm pettyjamesm force-pushed the recursive-s3-file-listing branch from e6abd74 to 9ac2278 Compare June 17, 2022 14:15
@pettyjamesm
Copy link
Copy Markdown
Member Author

@findepi - this should be ready for final approval / merge

@sopel39 sopel39 removed their request for review June 23, 2022 08:37
@arhimondr arhimondr merged commit 726fbd7 into trinodb:master Jun 23, 2022
@github-actions github-actions bot added this to the 388 milestone Jun 23, 2022
@pettyjamesm pettyjamesm deleted the recursive-s3-file-listing branch June 23, 2022 18:06
@findinpath
Copy link
Copy Markdown
Contributor

@arhimondr / @findepi this change is worthy of having release notes

On the order of queries taking 30+ minutes (traversing through layers and layers of listing calls and producing no splits for most of the query duration) to ~30 seconds (splits generated immediately and all files enumerated using orders of magnitude fewer list API calls).

@pettyjamesm
Copy link
Copy Markdown
Member Author

pettyjamesm commented Jun 24, 2022

@arhimondr / @findepi this change is worthy of having release notes

Proposed release notes:

Hive Connector:

  • improve efficiency of listing files and generating splits when recursive directory listings are enabled and tables are stored in S3

@colebow
Copy link
Copy Markdown
Member

colebow commented Jun 27, 2022

@arhimondr / @findepi this change is worthy of having release notes

Proposed release notes:

Hive Connector:

  • improve efficiency of listing files and generating splits when recursive directory listings are enabled and tables are stored in S3

In the future, could you please edit the original PR message to include release notes so it's easier for me to find? I still got here, but it took me a few minutes

@pettyjamesm
Copy link
Copy Markdown
Member Author

In the future, could you please edit the original PR message to include release notes so it's easier for me to find? I still got here, but it took me a few minutes

Sure thing, sorry about that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

6 participants