-
Notifications
You must be signed in to change notification settings - Fork 3k
Nessie support for core #1587
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Nessie support for core #1587
Conversation
| } | ||
|
|
||
| private String getWarehouseLocation() { | ||
| String nessieWarehouseDir = config.get("nessie.warehouse.dir"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure if this is the best way to get hold of a directory to write tables into. Anyone have any suggestions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, I would discourage depending so heavily on Hadoop Configuration. Spark and Flink have a way to pass catalog-specific options, which is the best way to configure catalogs.
There is some discussion about this in #1640. I think that catalogs should primarily depend on config passed in a string map, and should only use Hadoop Configuration when dependencies (like HadoopFileIO or HiveClient) require it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have cleaned this up a bit and tried to follow the pattern you suggested in #1640
| private NessieClient client; | ||
| private String branch; | ||
|
|
||
| private Configuration getConfig() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The nessie specific tests all modify spark settings and reset the settings at the end. This is to interfere as little as possible w/ the 'normal' iceberg path.
| if (path.get().contains("/")) { | ||
| HadoopTables tables = new HadoopTables(conf); | ||
| return tables.load(path.get()); | ||
| if (nessie(options.asMap(), conf)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We identify Nessie as the core catalog/source when there are specific parameters available on the classpath or hadoop config. The idea here is to be fully backwards compatible w/ Hive and Hadoop catalogs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably an area to revisit. Right now, this is written to have minimal changes between 2.4.x and 3.0.x, but I think we will probably want to route all loading from here through a catalog. That will allow us to delegate all of this to Nessie or Hive the same way.
| this.client = new NessieClient(NessieClient.AuthType.NONE, path, null, null); | ||
| try { | ||
| try { | ||
| this.client.getTreeApi().createEmptyBranch(branch); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All Nessie tests are run in their own branch to not interfere with parallel test execution
| @Test | ||
| public void testCreateNamespace() { | ||
| // Nessie namespaces are explicit and do not need to be explicitly managed | ||
| Assume.assumeFalse(catalogName.endsWith("testnessie")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The concept of a namespace is implicit in Nessie and are therefore not managed through the normal SupportsNamespaces interface. We skip tests of this interface when the catalog is a NessieCatalog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a lot of tests that need this. Should we separate the test cases into different suites?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, the hadoop catalog is also skipped for most of these. Makes sense to have separate tests
| /** | ||
| * Nessie implementation of Iceberg Catalog. | ||
| */ | ||
| public class NessieCatalog extends BaseMetastoreCatalog implements AutoCloseable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not extend SupportsNamespaces as a Nessie object store supports the concept of namespaces implicitly. A Nessie namespace can be arbitrarily deep but is not explicitly created or stored. Similar to empty folders in git.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be fine, but I think the trade-off is that you won't be able to list namespaces in a namespace. It will be harder to find the namespaces themselves.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will take another pass at this today, I can see totally valid reasons to support listing namespaces if they have tables in them. The problem as I see it comes from creating or deleting namespaces, and storing namespace metadata.
-
create/delete: in Nessie (similar to git) a namespace would be created implicitly with the first table in that namespace tree and deleted with the last table in that namespace tree. Separate crerate/delete options in nessie are either no-ops or require a dummy to be placed in that namespace. Both of which are odd operations. eg if its a no-op then creating namespace
foo.barthen asking iffoo.barexists will returnfalse. -
namespace metadata: What is the use case envisioned for those operations? I think for Nessie we would start with the same behaviour as the hdfs catalog but am curious to know the benefit of supporting those apis.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having another look we could add valid impls for namespaceExists and listNamespaces and do no-op or throw for the others. Then the clients can still navigate namespaces. Thoughts?
|
Opening this up as a reviewable PR to get early feedback. |
| } | ||
|
|
||
| project(':iceberg-nessie') { | ||
| apply plugin: 'org.projectnessie' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's happening in the Nessie plugin?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It uses quarkusAppRunnerConfig dependencies to discover the Nessie Quarkus server and its dependencies then uses that to start a server. Some of the operations to discover all runtime dependencies are non-trivial and require a full gradle dependency graph, hence why its non-trivial to do in a test suite. I believe the primary reason for all this is to facilitate easily building graalvm native images.
See https://github.com/projectnessie/nessie/tree/main/tools/apprunner-gradle-plugin for the actual code
build.gradle
Outdated
| maxHeapSize '2500m' | ||
| } | ||
| // start and stop quarkus for nessie tests | ||
| tasks.test.dependsOn("quarkus-start").finalizedBy("quarkus-stop") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the comments in the Iceberg sync, it sounds like this is running a stand-alone Nessie server? Is that something we could handle like the current Hive MetaStore tests, where each test suite creates a new metastore and tears it down after the suite runs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quarkus (which is the underlying http framework) behaviour is slightly counterintuitive in that it doesn't offer an option to start Nessie like you can start the hive metastore. Hence we start it once per module and test suites are responsible for cleanup
| .stream() | ||
| .filter(namespacePredicate(namespace)) | ||
| .map(NessieCatalog::toIdentifier) | ||
| .collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this will return all tables underneath the given namespace, even if they are nested in other namespaces?
I haven't tested this in spark, does it work as expected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are correct, it will return everythiing in and below namespace. What is the contract supposed to be? Only tables in this namespace?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just checked and the contract is Return all the identifiers under this namespace. I took this to mean everything under this and all sub namespaces. If that was not the intention of the method I will fix the predicate.
| .map(NessieCatalog::toIdentifier) | ||
| .collect(Collectors.toList()); | ||
| } catch (NessieNotFoundException ex) { | ||
| throw new RuntimeException("Unable to list tables due to missing ref.", ex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably shouldn't use RuntimeException here. How about NoSuchNamespaceException?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
| try { | ||
| Contents contents = client.getContentsApi().getContents(key, reference.getHash()); | ||
| this.table = contents.unwrap(IcebergTable.class) | ||
| .orElseThrow(() -> new IllegalStateException("Nessie points to a non-Iceberg object for that path.")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style: Most Iceberg error messages use the form Cannot <some action>: <reason> (<workaround>). Consistency here tends to make at least Iceberg errors more readable and easy to consume.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
| .orElseThrow(() -> new IllegalStateException("Nessie points to a non-Iceberg object for that path.")); | ||
| metadataLocation = table.getMetadataLocation(); | ||
| } catch (NessieNotFoundException ex) { | ||
| this.table = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should throw NoSuchTableException if the existing metadata is not null because the table was deleted under the reference. You'll probably want to follow the same behavior as the Hive catalog.
| client.getContentsApi().setContents(key, | ||
| reference.getAsBranch().getName(), | ||
| reference.getHash(), | ||
| String.format("iceberg commit%s", applicationId()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't look like the format here is quite correct. Missing a space?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good eye, the first char of the applicationId is a newline. I've put no space between commit and %s to not have extra trailing whitespace in message.
Also note that the handling of commit messages in nessie is still fairly primitive. This should get replaced by a structured object in the near future.
| reference.getHash(), | ||
| String.format("iceberg commit%s", applicationId()), | ||
| newTable); | ||
| } catch (NessieNotFoundException | NessieConflictException ex) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this right for NotFoundException? Iceberg will retry failed commits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good eye, cleaned up exception message and handled throwing better
| sparkEnvMethod = sparkEnvClazz.getMethod("get"); | ||
| Class sparkConfClazz = Class.forName("org.apache.spark.SparkConf"); | ||
| sparkConfMethod = sparkEnvClazz.getMethod("conf"); | ||
| appIdMethod = sparkConfClazz.getMethod("getAppId"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use the DynFields helpers to do this a bit more easily.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
| ParsedTableIdentifier.getParsedTableIdentifier(path, new HashMap<>()); | ||
| } | ||
|
|
||
| @Test(expected = IllegalArgumentException.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We prefer using AssertHelpers.assertThrows so that state after the exception was thrown can be validated. For example, testing catalog.createTable(invalid) would not only check ValidationException but also verify that the table was not created.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
| case "nessie": | ||
| String defaultBranch = options.getOrDefault("nessie_ref", "main"); | ||
| String nessieUrl = options.get("nessie_url"); | ||
| return new NessieCatalog(name, conf, defaultBranch, nessieUrl); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please have a look at #1640, I'd like to standardize how we do this. I do like using type = nessie, so we may want to have a lookup that points to the NessieCatalog implementation.
| @After | ||
| public void removeTables() { | ||
| sql("DROP TABLE IF EXISTS %s", tableName); | ||
| sql("DROP TABLE IF EXISTS %s", sourceName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way I was running in the test made it get deleted on the backend nessie server but not in the cached spark context I will clean this up as part of the Spark rework
|
Thanks, @rymurr! This looks like a great start. I commented in a few places where I noticed some things. Overall, you're going in the right direction. How do you want to start getting this in? I think it would be good to break it up a bit into smaller commits with a few tests. That way, we can iterate more quickly and we reduce the amount of scope that reviewers need to keep track of. Would it be possible to add just the Nessie module with a few tests and then move on to updating Spark modules? |
Thanks a lot for the feedback @rdblue I will rework this PR to be just the nessie module and will open another for Spark. Will follow the pattern from #1640 for the spark PR. |
834c354 to
da33d55
Compare
|
Hey @rdblue I have addressed the bulk of your comments above. Left to do:
We should be publishing 0.2.0 of nessie in the next day or two. Once that is pushed I will update this PR with the new versions and we should have a green build. |
da33d55 to
4ac611f
Compare
|
@rymurr, I did another thorough review with more time looking through the tests. Looking close, but I found a few things. |
* nessie catalog/table ops * modifications to catalog/source for spark * add nessie to tests left to do: * support namespaces * start/stop nessie as part of gradle build
* remove namespace for nessie, handled implicitly * add gradle plugin
b46dbf7 to
f9bc7ba
Compare
|
Thanks again for the thorough review @rdblue I have updated w/ your suggestions and rebased. hope I didn't miss anything! |
| * </p> | ||
| */ | ||
| public class NessieCatalog extends BaseMetastoreCatalog implements AutoCloseable, SupportsNamespaces, Configurable { | ||
| private static final Logger logger = LoggerFactory.getLogger(NessieCatalog.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: static final constants should use upper case names, like LOGGER. I'm not sure why style checks didn't catch this.
(Not a blocker)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed. I was just arguing w/ @jacques-n on this point on Fri ;-) He sided with you.
| .stopRetryOn(NessieNotFoundException.class) | ||
| .throwFailureWhenFinished() | ||
| .run(this::dropTableInner, BaseNessieClientServerException.class); | ||
| threw = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: threw is no longer needed so this could be simply return true. That simplifies the logic at the end of the method to just return false.
Up to you whether to change this or not. I know some people strongly prefer only one exit point from a method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed. I like your way better too..just a hangover from the refactor
| if (warehouseLocation == null) { | ||
| throw new IllegalStateException("Parameter warehouse not set, nessie can't store data."); | ||
| } | ||
| final String requestedRef = options.get(removePrefix.apply(NessieClient.CONF_NESSIE_REF)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you intend to change this to "ref"? Your reply seemed to imply that: #1587 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is just ref now, the removePrefix method strips the nessie.from the constant in the nessie class. Didn't want to duplicate the constants already in NessieClient
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I missed the removePrefix call. Thanks!
|
@rymurr, thanks for all of the test changes, it is now much easier to understand! I don't see any blockers, although it looks like you may have intended to change the Thanks for all your hard work getting this ready! I actually quite like the way the Nessie reference works and simplifies assumptions in the catalog and table operations. |
|
Thanks for the merge @rdblue!! Super pumped to have this merged. I have the last round of changes ready and will post them with the PR to support timestamps in the table name asap. |
… (apache#1587) Co-authored-by: Tom Tanaka <[email protected]>
As per the mailing list annoucenment we would like to contribute integration between Iceberg and Nessie to the Iceberg project.
This PR does the following:
NessieCatalogfor core iceberg acid operationsPlease have a look at Iceberg Spark for a more complete description of Nessie's capabilities with iceberg and Nessie Features for a broader introduction to Nessie.
Note this is currently in draft until a gradle plugin required for testing Nessie has been published.