Skip to content

Conversation

@rymurr
Copy link
Contributor

@rymurr rymurr commented Nov 18, 2020

I wanted to start a conversation about how to make IcebergSource compatible with custom Iceberg Catalogs in both Spark 2&3.

This first attempt is meant to give a straw man for how this could be done. It uses code from LookupCatalog to fetch a Catalog and Identifier from a path. We then try and extract an Iceberg table from it.

This works by specifying the catalog(iceberg_catalog) below in the path or by setting a default catalog in spark config.
df.write.format("iceberg").mode("append").save("iceberg_catalog.testing.foo")

Several problems:

  1. still have the fragile path check to delegate to the Hadoop catalog
  2. not strictly backwards compatible one has to specify a catalog as part of the path or a default catalog
  3. not 100% sure this will work for Spark2
  4. Copies code from LookupCatalog it would be nice to use that (or similar) directly but that would involve putting scala code in the code path.
  5. Hard to pass parameters to the catalog as we do now

I am soliciting opinions on what might be a better way of doing custom catalogs, has anyone else been thinking about this?

cc @rdblue @jacques-n @laurentgo

@github-actions github-actions bot added the spark label Nov 18, 2020
@jackye1995
Copy link
Contributor

Why not do something like the following, which can make the whole thing more backwards compatible:

df.write.format("iceberg")
  .mode("append")
  .option("catalog-impl", "com.my.own.CatalogImpl")
  .save("testing.foo")

@rymurr
Copy link
Contributor Author

rymurr commented Nov 19, 2020

Why not do something like the following, which can make the whole thing more backwards compatible:

I was thinking similar but as per #1587 we don't want to hold catalog configs in the hadoop Configuration. Logically we would then put the catalog config into options. To me this leads to verbose/repetitive read/write statements (see below). I am looking for a solution that still enables session wide catalog configs to be set and this comment implied that it is technically possible.

df.write.format("iceberg")
  .mode("append")
  .option("catalog-impl", "com.my.own.CatalogImpl")
  .option("catalog-option1", "value1")
  .option("catalog-option2", "value2")
  ...
  .save("testing.foo")

@rymurr
Copy link
Contributor Author

rymurr commented Nov 20, 2020

@jackye1995 I think I misunderstood your original comment. When you said:

df.write.format("iceberg")
  .mode("append")
  .option("catalog-impl", "com.my.own.CatalogImpl")
  .save("testing.foo")

Did you mean then use catalog-impl parameter to look up an existing iceberg catalog by type? Which (hopefully) would have been constructed previously, so we wouldn't have to set extra options like I mentioned above. Does that sound right? While much simpler I think its important that the user doesn't have to set options in the read/write spark commands, rather set them once in SparkConf.

I wonder if we could set a default catalog in SparkConf`` (eg spark.iceberg.default_catalog=com.my.own.CatalogImpl)? If set the readers/writers will use that unless an option` is passed. If neither is passed it would default to the current Hive/HDFS impl.

I believe that this still causes an issue in Spark2 as it doesn't have the same (spark) catalog support.

Thoughts?

@rdblue
Copy link
Contributor

rdblue commented Nov 20, 2020

I agree that we will need to mimic the behavior of LookupCatalog and use the catalog manager. I think we can fix some of the problems with this by using the current catalog from the catalog manager if there is no catalog identifier in the table name.

Here's what I think is the correct thing to do for table identifiers in Spark 3:

  1. Parse the identifier into parts
  2. Follow the logic in LookupCatalog: if there is only one part, use the current catalog and current namespace. If there are multiple parts, check whether the first one is a catalog and use it if it is. After this, we have a catalog and a table identifier.
  3. Check that the catalog is an Iceberg catalog. If not, throw an exception. Not entirely sure about this one.
  4. Return the catalog and identifier through SupportsCatalogOptions.

That way, we're just using the source to translate back to catalogs.

The problem with this is the behavior when there is no catalog specified in the identifier. The current behavior is to use a HiveCatalog that connects to the URI from hive-site.xml like the built-in Spark session catalog. That conflicts with the SQL behavior of using the current catalog, but may be reasonable to keep compatibility. But then the problem is that we may not have a registered Iceberg catalog that uses that URI. If not, then there is no catalog for the source to delegate to and we need to create one because a catalog is required if the source implements SupportsCatalogOptions.

Another option is to change the current behavior slightly and go with the "correct" logic for Spark 3 and delegate to the current catalog. That would mean we always have a catalog to delegate to without creating one. But the trade-off is that when the current catalog has changed, the table loaded by IcebergSource would change, too. I'd be open to this option.

Last, how to handle path URIs: I talked with Anton and Russell about this yesterday and we think that we should make it so that every Spark catalog can load special path identifiers, just like we do today in IcebergSource. To do this, we would need to have a way to pass an identifier that signals this behavior back from SupportsCatalogOptions, like iceberg.hdfs://nn:8020/path/to/table. Then detect those and return Hadoop tables for them.

What do you think?

@aokolnychyi
Copy link
Contributor

A couple of questions before we try to come up with a solution.

  1. How important or even valid is it to load non-path based tables through our source in Spark 3? We decided to interpret path option as a table identifier in 2.4 to avoid limitations of the DS V2 API at that point. Since we have proper catalog API in Spark 3, shall we assume all catalog-based tables should be loaded through it and not through source? We've replicated the LookupCatalog logic in Spark3Util but I am not sure it is a good idea to use that in the source.

  2. It is a bit unclear to me how we are going to produce the catalog name from SupportsCatalogOptions. The catalog manager does not provide a list of catalogs that were registered. Will we require a catalog name in options?

@rdblue
Copy link
Contributor

rdblue commented Nov 23, 2020

How important or even valid is it to load non-path based tables through our source in Spark 3?

The purpose is to maintain compatibility with 2.4. If you have a working job that used IcebergSource to load a table from the Hive catalog, then that should continue to work.

It is a bit unclear to me how we are going to produce the catalog name from SupportsCatalogOptions.

The same way that we lookup catalogs in Spark: If the first part is a catalog, return it. Otherwise, use a default.

The problem with this is how we want to maintain compatibility with Spark 2.4. In 2.4, the behavior is that the default catalog is the Hive session catalog and that can't be changed. In 3, I think there is a good argument that since the session catalog is the default unless you configure a different one (spark.sql.defaultCatalog) then we can respect the default catalog.

@aokolnychyi
Copy link
Contributor

Last, how to handle path URIs: I talked with Anton and Russell about this yesterday and we think that we should make it so that every Spark catalog can load special path identifiers, just like we do today in IcebergSource. To do this, we would need to have a way to pass an identifier that signals this behavior back from SupportsCatalogOptions, like iceberg.hdfs://nn:8020/path/to/table. Then detect those and return Hadoop tables for them.

+1 to that. I think we have enough info to get #1306 done. While it definitely makes sense to accept iceberg.path in the session catalog, shall we also allow just path alone in non-session catalogs? Otherwise, it is going to be weird if the catalog name is iceberg too. The identifier will be iceberg.iceberg.path then.

@aokolnychyi
Copy link
Contributor

The same way that we lookup catalogs in Spark: If the first part is a catalog, return it. Otherwise, use a default.

I like this approach but will it be backward compatible if the user does not set the session catalog as Iceberg session catalog?

@rdblue
Copy link
Contributor

rdblue commented Nov 23, 2020

I like this approach but will it be backward compatible if the user does not set the session catalog as Iceberg session catalog?

It's hard to define because we would be respecting a setting that wasn't present in 2.4. I think it's a reasonable thing to do as long as no catalog setting changes maintain compatibility with 2.4. Otherwise, the user is opting for different behavior by changing the default catalog.

@rymurr
Copy link
Contributor Author

rymurr commented Nov 25, 2020

Thanks all for the input.

So I think there is enough info that I can clean up this PR to use SupportsCatalogOptions and the process described above to extract a table/catalog from the source.

  • If there is a catalog in the identifier -> use that catalog
  • if no catalog in the identifier -> use session/default catalog (favouring Spark3 convention over IcebergSource in 2.4 convention)
  • if catalog in the identifier isn't an iceberg catalog -> throw
  • if path then handle with iceberg.path option as per above. Seems like should be done as a separate patch?

Does that make sense? Am I missing anything?

PS Any thoughts on custom catalogs in Spark2.4? I don't know what the migration plan looks like but I feel like Spark2.4 will be a thing for some time.

@rdblue
Copy link
Contributor

rdblue commented Nov 25, 2020

if path then handle with iceberg.path option as per above. Seems like should be done as a separate patch?

I think that the problem earlier was that once you implement SupportsCatalogOptions, you can't also return a table. You have to return a catalog and identifier for everything.

I think that will require that all of our catalogs support an identifier that contains a path and use that path to load the table.

Any thoughts on custom catalogs in Spark2.4?

We could do this in the IcebergSource for 2.4. I think that could be a nice feature.

@rymurr
Copy link
Contributor Author

rymurr commented Nov 25, 2020

I think that will require that all of our catalogs support an identifier that contains a path and use that path to load the table.

Agreed. I was going to open a PR for that to merge before this one. Unless you want it all in 1 PR?

@rdblue
Copy link
Contributor

rdblue commented Nov 25, 2020

Separate PRs works for me. I think you'll need to add that first, or else you won't be able to get tests passing.

@rymurr rymurr force-pushed the iceberg-source-catalog branch from 85ed069 to 2e5ab04 Compare November 30, 2020 17:30
@rymurr
Copy link
Contributor Author

rymurr commented Nov 30, 2020

FYI this has been updated based on our discussions last week. Build is still failing but once #1843 is closed it should be good to go.

@rymurr rymurr force-pushed the iceberg-source-catalog branch from 2e5ab04 to ac33ea3 Compare December 1, 2020 13:00
@@ -1,23 +0,0 @@
/*
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A potential complication: SupportsCatalogOptions doesn't allow for specifying the schema. I don't know how much people rely on this feature but it is a breaking change for the IcebergSource

} catch (NoSuchTableException e) {
throw new org.apache.iceberg.exceptions.NoSuchTableException(e, "Cannot find table for %s.", ident);
}
throw new org.apache.iceberg.exceptions.NoSuchTableException("Cannot find table for %s.", ident);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the correct exception is the Spark exception since this is going to be called from Spark code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Spark NoSuchTableException is typed which would change the interface. It appears there is no well defined way to return from this function w/o a table being found. It NPEs if you return null so an untyped exception seems to be the expected way to denote no table found. I thought the Iceberg NoSuchTableException was the best compromise here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, I agree in that case. Thanks for explaining! You may want to add a comment here to explain in the code as well.

throw new RuntimeException(e);
}
}
if (ident.size() == 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: could you separate the control flow statements with a newline?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@rymurr
Copy link
Contributor Author

rymurr commented Dec 2, 2020

Thanks for the comments @rdblue . Think this is close once we get #1804 merged and this build passing :-)

@rymurr rymurr force-pushed the iceberg-source-catalog branch from 3aa4d29 to a4c08d1 Compare December 22, 2020 16:46
@rdblue
Copy link
Contributor

rdblue commented Dec 22, 2020

Thanks for checking, @xabriel!

I'll merge this when tests are passing.

@rdblue rdblue merged commit 2f136e9 into apache:master Dec 22, 2020
@rdblue
Copy link
Contributor

rdblue commented Dec 22, 2020

Thanks for all your hard work on this, @rymurr! Great to have this working!

@rymurr rymurr deleted the iceberg-source-catalog branch December 22, 2020 18:11
wypoon added a commit to wypoon/iceberg that referenced this pull request Jan 26, 2021
The changes are mostly in spark3. They are necessitated by the catalog
support introduced by apache#1783.
As the spark3 IcebergSource now implements SupportsCatalogOptions,
DataFrameReader#load no longer calls IcebergSource#getTable but calls
SparkCatalog#loadTable directly. In order for the SparkTable returned by
SparkCatalog#loadTable(Identifier) to be aware of the snapshot, the
information about the snapshot needs to be present in the Identifier.
For this reason, we introduce a SnapshotAwareIdentifier interface
extending Identifier.
As SupportsCatalogOptions does not allow a schema to be specified
(requested), SparkTable no longer needs a requestedSchema field, so
some dead code is removed from it.
wypoon added a commit to wypoon/iceberg that referenced this pull request Jan 26, 2021
The changes are mostly in spark3. They are necessitated by the catalog
support introduced in apache#1783.
As the spark3 IcebergSource now implements SupportsCatalogOptions,
DataFrameReader#load no longer calls IcebergSource#getTable but calls
SparkCatalog#loadTable directly. In order for the SparkTable returned by
SparkCatalog#loadTable(Identifier) to be aware of the snapshot, the
information about the snapshot needs to be present in the Identifier.
For this reason, we introduce a SnapshotAwareIdentifier interface
extending Identifier.
As SupportsCatalogOptions does not allow a schema to be specified
(requested), SparkTable no longer needs a requestedSchema field, so
some dead code is removed from it.
wypoon added a commit to wypoon/iceberg that referenced this pull request Jul 13, 2021
The changes are mostly in spark3. They are necessitated by the catalog
support introduced in apache#1783.
As the spark3 IcebergSource now implements SupportsCatalogOptions,
DataFrameReader#load no longer calls IcebergSource#getTable but calls
SparkCatalog#loadTable directly. In order for the SparkTable returned by
SparkCatalog#loadTable(Identifier) to be aware of the snapshot, the
information about the snapshot needs to be present in the Identifier.
For this reason, we introduce a SnapshotAwareIdentifier interface
extending Identifier.
As SupportsCatalogOptions does not allow a schema to be specified
(requested), SparkTable no longer needs a requestedSchema field, so
some dead code is removed from it.
wypoon added a commit to wypoon/iceberg that referenced this pull request Sep 7, 2021
The changes are mostly in spark3. They are necessitated by the catalog
support introduced in apache#1783.
As the spark3 IcebergSource now implements SupportsCatalogOptions,
DataFrameReader#load no longer calls IcebergSource#getTable but calls
SparkCatalog#loadTable directly. In order for the SparkTable returned by
SparkCatalog#loadTable(Identifier) to be aware of the snapshot, the
information about the snapshot needs to be present in the Identifier.
For this reason, we introduce a SnapshotAwareIdentifier interface
extending Identifier.
As SupportsCatalogOptions does not allow a schema to be specified
(requested), SparkTable no longer needs a requestedSchema field, so
some dead code is removed from it.
wypoon added a commit to wypoon/iceberg that referenced this pull request Sep 28, 2021
The changes are mostly in spark3. They are necessitated by the catalog
support introduced in apache#1783.
As the spark3 IcebergSource now implements SupportsCatalogOptions,
DataFrameReader#load no longer calls IcebergSource#getTable but calls
SparkCatalog#loadTable directly. In order for the SparkTable returned by
SparkCatalog#loadTable(Identifier) to be aware of the snapshot, the
information about the snapshot needs to be present in the Identifier.
For this reason, we introduce a SnapshotAwareIdentifier interface
extending Identifier.
As SupportsCatalogOptions does not allow a schema to be specified
(requested), SparkTable no longer needs a requestedSchema field, so
some dead code is removed from it.
wypoon added a commit to wypoon/iceberg that referenced this pull request Oct 17, 2021
The changes are mostly in spark3. They are necessitated by the catalog
support introduced in apache#1783.
As the spark3 IcebergSource now implements SupportsCatalogOptions,
DataFrameReader#load no longer calls IcebergSource#getTable but calls
SparkCatalog#loadTable directly. In order for the SparkTable returned by
SparkCatalog#loadTable(Identifier) to be aware of the snapshot, the
information about the snapshot needs to be present in the Identifier.
For this reason, we introduce a SnapshotAwareIdentifier interface
extending Identifier.
As SupportsCatalogOptions does not allow a schema to be specified
(requested), SparkTable no longer needs a requestedSchema field, so
some dead code is removed from it.
wypoon added a commit to wypoon/iceberg that referenced this pull request Oct 19, 2021
The changes are mostly in spark3. They are necessitated by the catalog
support introduced in apache#1783.
As the spark3 IcebergSource now implements SupportsCatalogOptions,
DataFrameReader#load no longer calls IcebergSource#getTable but calls
SparkCatalog#loadTable directly. In order for the SparkTable returned by
SparkCatalog#loadTable(Identifier) to be aware of the snapshot, the
information about the snapshot needs to be present in the Identifier.
For this reason, we introduce a SnapshotAwareIdentifier interface
extending Identifier.
As SupportsCatalogOptions does not allow a schema to be specified
(requested), SparkTable no longer needs a requestedSchema field, so
some dead code is removed from it.
wypoon added a commit to wypoon/iceberg that referenced this pull request Oct 19, 2021
The changes are mostly in spark3. They are necessitated by the catalog
support introduced in apache#1783.
As the spark3 IcebergSource now implements SupportsCatalogOptions,
DataFrameReader#load no longer calls IcebergSource#getTable but calls
SparkCatalog#loadTable directly. In order for the SparkTable returned by
SparkCatalog#loadTable(Identifier) to be aware of the snapshot, the
information about the snapshot needs to be present in the Identifier.
For this reason, we introduce a SnapshotAwareIdentifier interface
extending Identifier.
As SupportsCatalogOptions does not allow a schema to be specified
(requested), SparkTable no longer needs a requestedSchema field, so
some dead code is removed from it.
wypoon added a commit to wypoon/iceberg that referenced this pull request Oct 21, 2021
The changes are mostly in spark3. They are necessitated by the catalog
support introduced in apache#1783.
As the spark3 IcebergSource now implements SupportsCatalogOptions,
DataFrameReader#load no longer calls IcebergSource#getTable but calls
SparkCatalog#loadTable directly. In order for the SparkTable returned by
SparkCatalog#loadTable(Identifier) to be aware of the snapshot, the
information about the snapshot needs to be present in the Identifier.
For this reason, we introduce a SnapshotAwareIdentifier interface
extending Identifier.
As SupportsCatalogOptions does not allow a schema to be specified
(requested), SparkTable no longer needs a requestedSchema field, so
some dead code is removed from it.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants