Skip to content

Conversation

@rymurr
Copy link
Contributor

@rymurr rymurr commented Nov 27, 2020

No description provided.

@github-actions github-actions bot added the spark label Nov 27, 2020
@rymurr rymurr changed the title WIP create and drop support for file paths WIP Support for file paths in SparkCatalogs via HadoopTables Nov 27, 2020
@rymurr
Copy link
Contributor Author

rymurr commented Nov 27, 2020

This is still a bit rough, doesn't pass 1 test and I haven't tested ALTER commands but I wanted to get it out for review early.

One key issue:
It appears that deleting tables is only a V1 Datasource operation from the Session Catalog currently in Spark (see here). So when trying to drop a table via file path eg: DROP TABLE `file://path/to/iceberg/table\` when using a Session catalog you get an error. This is because the V1 Session Catalog looks in Hive and doesn't find the table.

The DROP commands work when using a HiveCatalog and the Session Catalog because the table was created via a HiveCatalog and our SparkSessionCatalog and deleted via a V1 Session Catalog but both are talking to Hive so we don't notice.

In my mind this means the Spark3 tests that run against the session catalog are (partially) broken as the DROP command won't work for anything other than a Hive based table.

So:

  • should we document this limitation and ignore?
  • should we fix it on the Spark side?
  • Should we fix it on our side (I honestly don't know how to though)?

@rdblue @aokolnychyi any thoughts?

@RussellSpitzer
Copy link
Member

RussellSpitzer commented Nov 27, 2020

I was thinking about this a bit differently, rather than providing a full catalog I was thinking that we just have the various spark catalogs treat file types as namespaces. Then we just have them switch to returning Hadoop tables in the load table method.

Here is an example @aokolnychyi was typing up on the create ticket

// Case 2.1: migrate a location using HadoopTables (or path-based catalog) for target

MIGRATE TABLE parquet.`path/to/table`
USING iceberg
TBLPROPERTIES (
  'key' 'value'
)

@rymurr
Copy link
Contributor Author

rymurr commented Nov 27, 2020

I was thinking about this a bit differently, rather than providing a full catalog I was thinking that we just have the various spark catalogs treat file types as namespaces. Then we just have them switch to returning Hadoop tables in the load table method.

I was thinking this way originally as well. My first attempt at it made for weird interactions between BaseMetastoreCatalog and the individual catalogs. And a bunch of dupe code in the various Catalog impls. I am still not satisfied w/ this solution as the hadoop delegate catalog is a bit strange but at least it centralises the logic a bit.

@rymurr
Copy link
Contributor Author

rymurr commented Nov 27, 2020

Here is an example @aokolnychyi was typing up on the create ticket

// Case 2.1: migrate a location using HadoopTables (or path-based catalog) for target

MIGRATE TABLE parquet.`path/to/table`
USING iceberg
TBLPROPERTIES (
  'key' 'value'
)

I like this example too. A very useful feature. This is slightly different/more complicated though right? That would add a sql extension and a separate impl to do write the metadata of the new table.

Or do you mean the path based table is already an iceberg table and the migrate is effectively reduced to a rename from path to tablename ?

@rymurr
Copy link
Contributor Author

rymurr commented Nov 27, 2020

I was thinking about this a bit differently, rather than providing a full catalog I was thinking that we just have the various spark catalogs treat file types as namespaces. Then we just have them switch to returning Hadoop tables in the load table method.

I was thinking this way originally as well. My first attempt at it made for weird interactions between BaseMetastoreCatalog and the individual catalogs. And a bunch of dupe code in the various Catalog impls. I am still not satisfied w/ this solution as the hadoop delegate catalog is a bit strange but at least it centralises the logic a bit.

I re-read what you said earlier @RussellSpitzer and I realized I mis-read it. Your suggestion to just modify the spark catalogs may be easier but it still has a lot of duplication and would require a mechanism to handle StagedTables with HadoopTables. I believe the change will end up very similar to my current patch. I can still do the change if you want to see the difference?

Note regardless of how we implement the change DROP still causes a problem in session catalogs, drop never passes through our session catalog.

@github-actions github-actions bot added the core label Nov 30, 2020
@rymurr rymurr changed the title WIP Support for file paths in SparkCatalogs via HadoopTables Support for file paths in SparkCatalogs via HadoopTables Dec 1, 2020
@rymurr
Copy link
Contributor Author

rymurr commented Dec 1, 2020

I have added ALTER tests and have skipped DROP when using SessionCatalog. I have also used @RussellSpitzer's suggestion for implementation. Its a lot cleaner and smaller but means we cant perform certain operations on filesystem tables: some transactions, renames and list by namespace.

I am not a huge fan of identifying a path based table by simply checking if there is a / in the name. Any thoughts on how to make this less fragile?

@RussellSpitzer
Copy link
Member

I'm not sure we want to allow those other operations, since I'm not sure they actually have a lot of meaning. Like a rename on a path based table in a filesystem without renames means a full copy of the entire dataset. So it may be best to treat file based path's as static or at least immovable.

I think we probably could identify path based tables based on the namespace too, like "parquet or file" or something. I'll take a look today

@rdblue
Copy link
Contributor

rdblue commented Dec 1, 2020

In my mind this means the Spark3 tests that run against the session catalog are (partially) broken as the DROP command won't work for anything other than a Hive based table.

I ran into this and ended up skipping non-Hive tests in other suites. We still have fairly good coverage because the session catalog is simply delegating. So as long as it works for a SparkCatalog backed by Hive, it will work for other catalog types.

Fixing this in Spark would be great.

@rdblue
Copy link
Contributor

rdblue commented Dec 1, 2020

I agree with the approach to use HadoopTables to return tables when a path identifier is found. What I'm not sure about is how to pass a path as an identifier. Supporting a namespace element that has the file format is useful for imports, but not for the IcebergSource because I don't think that we want the source to support those identifiers.

For table imports, I think we have more options because we're controlling the parsed statement or the stored procedure. That procedure could use optional arguments like file_format or location. Similarly, a parsed statement can be transformed with rules to pass the reference as something other than an Identifier. What is relevant here is that we have options for those cases, so I think we should focus in this PR on how to pass paths from IcebergSource to a catalog, and loading in that catalog.

Looking at the Spark, the identifier is immediately used to load the table, or is added to a CTAS plan. I don't think that the identifier is modified after it is returned. We could use a custom Identifier implementation, PathIdentifier, to signal that a path should be loaded.

public class PathIdentifier implements Identifier {
  private final String location;
  private final String name;

  public PathIdentifier(String location) {
    this.location = location;
    this.name = Iterables.getLastElement(Splitter.on("/").splitToList(location));
  }

  public String location() {
    return location;
  }

  public String namespace() {
    return new String[] { location };
  }

  public String name() {
    return name;
  }
}

This uses the last part of the location string as the table name, so that the default subquery aliases added in Spark work.

Then each method in SparkCatalog would just need to check instanceof PathIdentifier, which is a bit cleaner than the / check -- although that would still need to be done in IcebergSource.

Map<String, String> properties) throws TableAlreadyExistsException {
Schema icebergSchema = SparkSchemaUtil.convert(schema);
try {
// can't stage a hadoop table
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is okay, but if staging isn't supported then a path identifier should fail instead of going ahead without using the path. Otherwise, this will attempt to create a table in the Iceberg catalog with a crazy name, which would probably fail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking about this a little more, I think we will need to support some form of staging for Hadoop tables. Because this catalog implements the atomic operation mix-in, the staging calls will be used for all CTAS plans. Using SupportsCatalogOptions would mean that save() gets turned into a CTAS. So if we don't want the existing creates to fail, we have to support a staged table.

We can do that in a couple of ways. First, we could create a table builder based on HadoopCatalogTableBuilder that supports a location. Second, we could reuse the fake staged table from the session catalog (for non-Iceberg tables). I'd prefer to create a builder that can construct the transactions for path tables. We could add it to HadoopTables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to create a builder that can construct the transactions for path tables. We could add it to HadoopTables.

Agreed. Added in most recent update

@rymurr
Copy link
Contributor Author

rymurr commented Dec 2, 2020

Just a bit of clarification on PathIdentifier and your above comment @rdblue:

You are suggesting that the change in #1783 returns a PathIdentifier from the SupportsCatalogOptions.extractIdentifier method? I like that and it simplifies some of the logic in this PR. If I understand correctly in the case of stored procedures or other places where we have control we can use PathIdentifier or use the other mechanisms available to pass path information. Correct?

What I am unsure about is how to handle path based CREATE, DROP, ALTER comamands. We don't have much control over the Identifier and we would still have to inspect if its a path. Unless we do a CREAT TABLE x AS iceberg LOCATION path, which works but the table name is redundant/ignored. Or are you suggesting this PR doesn't handle path based DDL coming from Spark?

FWIW I agree that we shouldn't have a 'special' namespace for importing path based tables, however my approach of ignoring namespaces if the name is a path probably isn't quite right either.

@jackye1995
Copy link
Contributor

Sorry I think I am pretty late in the whole conversation, completely forgot to follow up on #1783 after my initial comment, still trying to catch up...

I also like the idea of a custom PathIdentifier, but it seems to only work for loading an existing table through SupportsCatalogOptions, and as @rymurr says I don't think it handles those TableCatalog operations well. The Spark plan would directly use Identifier.of(...) which uses its default IdentifierImpl.

For these DDL, I actually like the approach that Anton mentioned in #1306 about wrapping HadoopTables into a catalog, for example HadoopPathCatalog, so that we can run something like CREATE TABLE catalog_name."path/to/table" USING iceberg. I don't think using LOCATION keyword is an option for us, not because of the dummy table name, but because not all ALTER and DELETE statements support it natively in open source Spark. That proposal was the last conversation of the issue and never got follow up, was there any concern with that approach not mentioned there?

@rymurr
Copy link
Contributor Author

rymurr commented Dec 3, 2020

Hey @jackye1995 thanks for the comments! The first approach in this patch was trying to tackle the problem with a new catalog. You can see it here. On the plus side it keeps all the filesystem handling in 1 class but it does have the downside of creating a lot of duplicate code and wasn't super nice. @RussellSpitzer suggested the approach in this PR and I think I prefer it now too, it keeps the scope if this change very small too.

What I am struggling w/ at the moment is:

  • how to robustly identify a path in an Identifier (eg not rely solely on the presence of /)
  • how to correctly handle edge cases like catalog.namespace.`file://path/to/table` . What does a namespace mean in this case and how do we represent a path as an Identifier or TableIdentifier.

For the second point I think we should reject any namespace elements and treat a path as a namespaceless table with the full table path as the table/identifier name. For the first part I think the only reliable way is to try to do IO with the path: can we get an FS from the path, is it the local fs, is it a relative path etc.

Thoughts?

@jackye1995
Copy link
Contributor

how to robustly identify a path in an Identifier (eg not rely solely on the presence of /)

With all the given constraints, I think we have to rely on some string pattern to determine if it is a path or table. Use / is probably not a too bad choice considering that people won't normally put / in the table name. If we restrict the path to always be a full file path and do not allow relative path, then we can use a URI check (or IO check as you said for extra safety) to see if it is a Hadoop table. Is there a use case for relative path for Hadoop tables?

And with whatever way we go for the check, I think it should be somewhere in the core package, instead of a protected method isHadoopTable, so that it can be used across compute engines for consistent behavior. One way to go is to add it as a method in HadoopTables.

how to correctly handle edge cases like catalog.namespace.file://path/to/table. What does a namespace mean in this case and how do we represent a path as an Identifier or TableIdentifier.

+1 for

reject any namespace elements and treat a path as a namespaceless table with the full table path as the table/identifier name

@rymurr
Copy link
Contributor Author

rymurr commented Dec 4, 2020

Thanks for the comment @jackye1995 , I have added a check to reject the path table if it has a namespace. I have also moved the checks to HadoopTables, good suggestion.

Re path identifiers. Delta allows absolute paths only so I don't think its outrageous if we limit paths to absolute paths only. I am inclined to use a check similar to what they do. Thoughts?

@rdblue
Copy link
Contributor

rdblue commented Dec 4, 2020

I think we should keep the scope of this small. There isn't a need to address how to embed path-based tables in SQL commands right now. However we choose to do that would probably be a different solution. Right now, we need to unblock multicatalog support in IcebergSource by having a way to pass the path through as a table identifier, and the PathIdentifier option works well for that.

For the larger question about ALTER, DROP, etc., I think the short-term solution is to define a HadoopCatalog that can handle those SQL commands.

In the long term, I've advocated that Spark should have some way of identifying and passing path identifiers to plugins. The problem here is that no one seems to know what the behavior or path-based tables in SQL is or should be. But I think that a similar PathIdentifier approach would work once that is addressed in the upstream community.

Last, if we want to support identifiers with a catalog and a quoted path, we can do that any time by choosing when to interpret the catalog name as a quoted path.

@rdblue
Copy link
Contributor

rdblue commented Dec 4, 2020

Delta allows absolute paths only . . .

Considering the behavior for SQL is undefined, I would rather not add support for this right away. Delta needs this because it already supported these identifiers in v1 and has to be compatible. But I think we should have a use case that requires this before adding support.

/**
* Check to see if the location is a potential Hadoop table by checking if its an absolute path on some filesystem.
*/
public static boolean isHadoopTable(String location) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about isValidLocation?

}

private String[] currentNamespace() {
return SparkSession.active().sessionState().catalogManager().currentNamespace();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this uses a session, then it should be identified when the catalog is created and stored as an instance field. Catalogs are specific to a SQL session, so they should not use the active session except for in initialization. After that, the same session should always be used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed. Just for my learning why is it ok to get hadoop config from active session but not the current namespace? I have been following the commonly used SparkSession.active().sessionState().newHadoopConf() pattern

@jackye1995
Copy link
Contributor

Considering the behavior for SQL is undefined, I would rather not add support for this right away. Delta needs this because it already supported these identifiers in v1 and has to be compatible. But I think we should have a use case that requires this before adding support.

👍

@rymurr
Copy link
Contributor Author

rymurr commented Dec 5, 2020

Thanks for the comments @rdblue, I have reduced the scope as suggested and updated the path identifier check.

@Override
public void renameTable(Identifier from, Identifier to) throws NoSuchTableException, TableAlreadyExistsException {
try {
// can't rename hadoop tables
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should throw an exception if it can't rename, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should have a method like checkNotPathIdentifier that throws an IllegalArgumentException if a PathIdentifier is passed, so that we can ensure that they aren't passed to methods that don't support paths.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


@Override
public Identifier[] listTables(String[] namespace) {
// no way to identify if this is a path and we should use tables instead of catalog.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should also be no way to pass a path as a namespace.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure how to identify a path here? Single element w/ '/' in it? Seems arbitrary. The HadoopCatalog treats directory names as namespaces so there would be no way to identify that here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I wasn't very clear. If anything calls this, then it went through a path for normal identifiers. So I don't think that there is a need to worry about paths here because there isn't a way to pass a path in here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see, have removed the comment.

@rdblue
Copy link
Contributor

rdblue commented Dec 7, 2020

Should we also respect spark.sql.runSQLOnFiles in Spark? I am not sure whether we should match built-in sources here.

I don't think so. That setting is for v1. Instead, Spark needs to define how it will pass path-based tables to catalogs, and what the behavior requirements are for those tables. Right now, no one has done the work to find out what the behavior of v1 is or to try to build consensus in the Spark community about what it should be. I think that means that Iceberg should avoid Spark's path-based syntax for now and focus in this PR on the narrower case of passing identifiers for paths used in DataFrameReader.

@rdblue rdblue closed this Dec 7, 2020
@rymurr
Copy link
Contributor Author

rymurr commented Dec 7, 2020

just rebased w/ your caching catalog change @rdblue. Hopefully the build shouild be green now

@rdblue rdblue reopened this Dec 7, 2020
@rdblue
Copy link
Contributor

rdblue commented Dec 7, 2020

Looks good overall. I think I'd prefer to refactor the load and builder creation a little bit, and add that check to the Hadoop builder. Thanks @rymurr!

@RussellSpitzer
Copy link
Member

What are the plans for a test suite on this? I think this is all going in the right direction I just want to make sure we have a good set of checks to go along with it.

@rymurr
Copy link
Contributor Author

rymurr commented Dec 7, 2020

@RussellSpitzer Ive been wondering the same. It is hard to test till #1783 is merged. I will have a think tho

@rymurr
Copy link
Contributor Author

rymurr commented Dec 7, 2020

Thanks @rdblue! both the check in HadoopTables and the simplification in SparkCatalog have been added.

@rdblue
Copy link
Contributor

rdblue commented Dec 7, 2020

What are the plans for a test suite on this?

We can always add a suite that passes PathIdentifier to a catalog. I was also thinking that once this is included with #1783, all of the existing tests for IcebergSource with path tables will be applied.

@rymurr
Copy link
Contributor Author

rymurr commented Dec 8, 2020

I have added a simple test to ensure the correct behaviour of the SparkCatalog when passed a PathIdentifier. It smoked out one minor bug so thanks @RussellSpitzer!

All is up to date now and I think we are just about ready!

public Catalog.TableBuilder withLocation(String newLocation) {
Preconditions.checkArgument(newLocation == null || location.equals(newLocation),
String.format("Table location %s differs from the table location (%s) from the PathIdentifier",
newLocation, location));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: preconditions already support argument formatting, so String.format is redundant.

@rdblue rdblue merged commit f3dc93f into apache:master Dec 8, 2020
@rdblue
Copy link
Contributor

rdblue commented Dec 8, 2020

Thanks for the fixes! I merged this.

rymurr pushed a commit to rymurr/iceberg that referenced this pull request Dec 9, 2020
rymurr pushed a commit to rymurr/iceberg that referenced this pull request Dec 10, 2020
rymurr pushed a commit to rymurr/iceberg that referenced this pull request Dec 22, 2020
Comment on lines +342 to +344
metadata = ops.current().buildReplacement(schema, spec, SortOrder.unsorted(), location, properties);
} else {
metadata = tableMetadata(schema, spec, null, properties, location);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rymurr Here we use the SortOrder.unsorted or null to set the sort order rather than the sortOrder set from here ? Is it correct ? IMO, we've ignored the user-provied sort order ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good spot @openinx not sure how that sneaked in! I have rasied #2009 to address

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants