[GOBBLIN-1779] Ability to filter datasets that contain non optional unions#3648
Conversation
6196a0a to
35c2cee
Compare
| } | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| public static <T extends org.apache.gobblin.dataset.Dataset> DatasetsFinder<T> instantiateDatasetFinder( |
There was a problem hiding this comment.
New method used in DatasetFinderFilteringDecorator to instantiate dataset finder
35c2cee to
3966509
Compare
| stopMetastore(); | ||
| } | ||
| @BeforeClass | ||
| @BeforeSuite |
There was a problem hiding this comment.
Before class causes some strange flakiness with when this executes. Before and After Suite is the correct behavior (it acts as the entry point and exit point for all the test code with hive)
| import org.testng.annotations.Test; | ||
|
|
||
| @Slf4j | ||
| @Test(dependsOnGroups = "icebergMetadataWriterTest") |
There was a problem hiding this comment.
Putting the depends on groups here allows the class to be run on its own, but also be thread safe with the other hive metastore tests.
Adding per method depends makes it so I cannot run the Iceberg tests without running all of the hive tests first (setting a break point in underlying code then becomes a pain)
|
|
||
| @AfterSuite | ||
| public void clean() throws Exception { | ||
| FileUtils.forceDeleteOnExit(tmpDir); |
There was a problem hiding this comment.
Note: I don't need to call stopMetastore here because it should be handled in the afterclass for hivemetastoretest
| } | ||
|
|
||
| private Optional<HiveTable> getTable(T dataset) throws IOException { | ||
| DbAndTable dbAndTable = getDbAndTable(dataset); |
There was a problem hiding this comment.
In initial iterations, I tried using the HiveRegistrationPolicy to get the hivespec based on the dataset urn. But dataset urn isn't necesarily a path. It is dependent on the underlying dataset.
So I expect the user to configure the expected pattern for extracting db and table
There was a problem hiding this comment.
We do something similar in compaction (except hard coded)
3966509 to
181b2cc
Compare
| * </ul> | ||
| */ | ||
| @FunctionalInterface | ||
| public interface CheckedExceptionPredicate<T, E extends Exception> { |
There was a problem hiding this comment.
Overall good thought. Should we consider existing Predicate interfaces in guava or other util libraries?
There was a problem hiding this comment.
Existing interfaces don't cover this case AFAIK (But more than happy to take suggestions). And this is a pretty common solution for working around it.
...in/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnion.java
Outdated
Show resolved
Hide resolved
...in/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnion.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/gobblin/data/management/dataset/DatasetsFinderFilteringDecorator.java
Outdated
Show resolved
Hide resolved
| @Override | ||
| public List<T> findDatasets() throws IOException { | ||
| List<T> datasets = datasetFinder.findDatasets(); | ||
| List<T> allowedDatasets = null; |
There was a problem hiding this comment.
instantiate with Empty list to avoid NPE
There was a problem hiding this comment.
In hindsight, does this value really matter? We can just return allowedDatasets as-is.
But to answer your question. We can never return null from this method. We always throw an exception or allowedDatasets is overwritten by datasets (which cannot be null)
181b2cc to
f260386
Compare
Codecov Report
@@ Coverage Diff @@
## master #3648 +/- ##
============================================
+ Coverage 46.91% 46.98% +0.06%
- Complexity 10756 10776 +20
============================================
Files 2135 2138 +3
Lines 83834 83919 +85
Branches 9320 9324 +4
============================================
+ Hits 39332 39428 +96
+ Misses 40933 40920 -13
- Partials 3569 3571 +2
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
|
+1 LGTM |
| allowedDatasets = datasets.parallelStream() | ||
| .filter(dataset -> allowDatasetPredicates.stream() | ||
| .map(CheckedExceptionPredicate::wrapToTunneled) | ||
| .allMatch(p -> p.test(dataset))) | ||
| .filter(dataset -> denyDatasetPredicates.stream() | ||
| .map(CheckedExceptionPredicate::wrapToTunneled) | ||
| .noneMatch(predicate -> predicate.test(dataset))) | ||
| .collect(Collectors.toList()); |
There was a problem hiding this comment.
I like how clean this is :) and it's parallel which is pretty nice. I recall being told early on that in Java 8 streams use more memory than traditionally looping, so just be wary of that
There was a problem hiding this comment.
Also this is on the dataset level not the file level so it shouldn't be an issue
There was a problem hiding this comment.
Good callout about there being drawbacks to streams. In general, I don't think this code is a hot spot so I am okay with being addicted to the syntactic sugar until the profiler shows otherwise.
Something something premature optimization is the root of all evil. I think even our largest use cases is in the cardinality of thousands, so should be okay for this specific piece.
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
Problem Statement:
Goal is to make it easy for users to filter out datasets with non optional unions. Avro and ORC flexible schema supports non optional unions. But Iceberg does not support these schemas. This can cause mismatched schemas when writing with different writers. e.g. writing with iceberg will write a struct but writing native orc writer will write as a uniontype
Future users may want to implement similar complex filtering on the results of their dataset finders. And this filtering may be very complex. The goal is to make it easy for users to build their own filtering predicates.
Design:
DatasetsFinderthat accepts denylist and allowlist filters. In this case, we can specify that all tables with non optional unions are excluded or should be includedAlternative designs:
Tests
Hive metastore tests
Dataset finder tests
Commits