-
Notifications
You must be signed in to change notification settings - Fork 3k
AWS Glue catalog and table operations #1633
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| } | ||
|
|
||
| /** | ||
| * Use the same format as Hive for default warehouse location |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This java doc isn't clear to me. I think the summary here should describe what the method is doing "Returns the default warehouse location as stored in the glue service (?). If not present, uses X .... "
Then under return "default warehouse path as a string using Hive's formatting"
Does that sound right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah let me just be precise with the behavior of it instead of saying it's the same as Hive.
| // should be safe to list all before returning the list, instead of dynamically load the list. | ||
| String nextToken = null; | ||
| List<TableIdentifier> results = new ArrayList<>(); | ||
| while (true) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tend to be afraid of while(true) break, I would prefer this was written in a different style. One quick change here which wouldn't be that different would be a "do while" which would let you keep basically everything the same but you could drop the "break".
Ideally I like to avoid "break" if at all possible and especially in loops, but ymmv.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will use a do...while syntax for this.
| try { | ||
| dropTable(newTableOps(identifier), identifier, purge); | ||
| return true; | ||
| } catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any possibility we might want to log the contents of this Exception? Feel's like we may want to surface some classes of errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I'd definitely like to log the contents of this exception. It would greatly help debug issues like whether or not the table drop failed due to invalid permissions, possibly the wrong role is being assumed at this point in time, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I definitely forgot to log here. Will be more detailed on this part.
| this.conf = conf; | ||
| this.glue = glue; | ||
| this.catalogId = catalogId; | ||
| this.warehousePath = warehousePath; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Can we add Preconditions.checkNotNull on anything that is nullable here so we can avoid nulll checks later?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warehousePath is checked by validateWarehousePath in the constructor, and catalogId can be null. I can add the precondition for conf and glue.
| * and DeleteTable or BatchDeleteTable, to delete any resources that belong to the database. | ||
| * @param namespace a namespace. {@link Namespace} | ||
| * @return always true | ||
| * @throws NamespaceNotEmptyException never thrown |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little nervous about this implementation not following the pattern of the other catalogs and refusing to drop if there is something in the namespace. Downstream code may be relying on this to prevent destructive operations ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, this is definitely a different behavior between Glue and Hive. I can do a ListTable before dropping.
aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
Outdated
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
Outdated
Show resolved
Hide resolved
| persistGlueTable(isUpdate, parameters); | ||
| exceptionThrown = false; | ||
| } catch (ConcurrentModificationException e) { | ||
| throw new CommitFailedException(e, "Glue detected concurrent update to table %s", tableName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More error messages which should be in the form Cannot x because y. I think the concurrent update to table error should also include the information
""" * For users that need high frequency and high concurrency update,
* please consider requesting rate increase for Glue UpdateTable API,
* or use an external lock system such as DynamoDB lock."""
Which an end user will usually not see
aws/src/main/java/org/apache/iceberg/aws/glue/GlueToIcebergConverter.java
Outdated
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/glue/IcebergToGlueConverter.java
Outdated
Show resolved
Hide resolved
|
Looking good! Supporting more catalogs is great for the project! |
| .catalogId(catalogId) | ||
| .databaseName(IcebergToGlueConverter.toDatabaseName(namespace)) | ||
| .nextToken(nextToken) | ||
| .build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might need a try ... catch block here as the call to getTables can throw iirc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not catch any exception because glue client only throw RuntimeException, and it seems to be okay in other catalog implementations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It depends on the exception. If the namespace is missing, then we have an exception for that so users can handle it. We would want to translate the AWS exception to the Iceberg one. But for connection issues, it's okay to throw the original exception.
Since namespaceExists is called above, I'm guessing that case is already handled, though that doesn't account for concurrent deletes.
| try { | ||
| dropTable(newTableOps(identifier), identifier, purge); | ||
| return true; | ||
| } catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I'd definitely like to log the contents of this exception. It would greatly help debug issues like whether or not the table drop failed due to invalid permissions, possibly the wrong role is being assumed at this point in time, etc.
| glue.createDatabase(CreateDatabaseRequest.builder() | ||
| .catalogId(catalogId) | ||
| .databaseInput(IcebergToGlueConverter.toDatabaseInput(namespace, metadata)) | ||
| .build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this operation can throw as well. Since the BaseMetastoreCatalog doesn't declare thi with a checked exception, you might want to document what it can throw that you know of or wrap in try...catch (though arguably that could be up the calling code).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on my observation of other catalogs, we are not handling runtime exceptions, and aws clients only throw runtime exception. That is why I do not try catch anything here. I can probably add a check for AlreadyExistsException.
| public boolean setProperties(Namespace namespace, Map<String, String> properties) throws NoSuchNamespaceException { | ||
| Map<String, String> parameter = Maps.newHashMap(); | ||
| parameter.putAll(loadNamespaceMetadata(namespace)); | ||
| parameter.putAll(properties); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I'm just totally missing the obvious but I don't see where parameter is being used. Loks like it's built and then the database is updated directly via propeties.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's used in persistGlueTable in doCommit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. But to me, it seems like the locally declared parameter isn't used in this function.
Are you sure you didn't intend to use the merged parameter map instead of properties here when calling IcebergToGlueConverter.toDatabaseInput(namespace, properties)? https://github.com/apache/iceberg/pull/1633/files#diff-51a208168318cb652ad5beb86d3433d714aca916f9816c4cffcc88e273daf6afR287?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Conversely, the updated local metadata map is used in removeProperties instead of the passed in properties. So I still feel like potentially this variable is unused / was potentially meant to be passed into the IcebergToGlueConverter.toDatabaseInput instead of passing in the properties map.
Here's the usage in removeProperties that follows a similar pattern:
- Build an updated table properties map.
- Use the updated map when passing to the iceberg to glue converted for
databaseInput
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah sorry that one was actually my typo, thank you for pointing out. Somehow I was reading the wrong line when doing the reply...
I have pushed the latest code that has the fix and also with the integration test against actual Glue service that detected those bugs.
| Namespace.of("db2") | ||
| ), | ||
| glueCatalog.listNamespaces() | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really see much value in this test given how heavily mocked it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100% agree, I am thinking about also publish the actual Glue tests, but I have to also mark them as Ignored because it needs AWS credentials to run.
aws/src/test/java/org/apache/iceberg/aws/glue/TestGlueCatalogInteg.java
Outdated
Show resolved
Hide resolved
aws/src/test/java/org/apache/iceberg/aws/glue/TestGlueCatalogInteg.java
Outdated
Show resolved
Hide resolved
| Preconditions.checkArgument(warehousePath != null && warehousePath.length() > 0, | ||
| "Cannot initialize GlueCatalog because warehousePath must not be null"); | ||
| Preconditions.checkArgument(warehousePath.charAt(warehousePath.length() - 1) != '/', | ||
| "Cannot initialize GlueCatalog because warehousePath %s must not end with /", warehousePath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just remove the trailing /?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah that's actually better, let me do that
| * All-arg constructor | ||
| * @param conf Hadoop config, passed in for users of HadoopIO | ||
| * @param glue Glue client | ||
| * @param catalogId Glue catalog ID, which is the AWS account ID. When null, it uses the account of the Glue client. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can Glue only have one catalog per account? Saying that the catalog ID "is" the AWS account ID seems to imply that.
Assuming that the account ID is a default catalog ID, I think that this should be more clear that the account ID is used by default, and that is the ID for the account's default catalog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a very confusing concept within Glue... technically speaking each AWS account has one AWS Glue Data Catalog per AWS region. However, the GlueClient is always initialized with an AWS region, so the account ID is the only identifier needed to contact a different catalog.
As a result, the catalogId in Glue always mean the AWS account ID, and that is what the user would like to configure outside the client. I can rewrite the doc to be more clear on this.
| this.skipArchive = skipArchive; | ||
| this.databaseName = IcebergToGlueConverter.getDatabaseName(tableIdentifier); | ||
| this.tableName = IcebergToGlueConverter.getTableName(tableIdentifier); | ||
| this.fullName = String.format("glue.%s.%s.%s", catalogId, databaseName, tableName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The full name here should be the name of the catalog, then the db and table name. This view is good for debugging (and may be what you want for toString) but the name returned for the table should be the name used to load it. So if I've called my Glue catalog prodglue, then it would be something like prodglue.rdblue.test_table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. I am basically using glue.<catalogId> as the name, that is why I am writing it like this. Sounds like the name concept is used here differently, let me add this additional field then.
aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
Outdated
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
Outdated
Show resolved
Hide resolved
| public void testCreateBadName() { | ||
| AssertHelpers.assertThrows("should not create namespace with bad name", | ||
| NoSuchNamespaceException.class, | ||
| () -> glueCatalog.createNamespace(Namespace.of("db-1"))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this should throw NoSuchNamespaceException. What about ValidationException?
Also, tests should generally check the error message as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this also test a namespace with more than 1 part?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added. More details are directly tested in unit test of IcebergToGlueConverter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes ValidationException sounds better for the situations, I will update.
| namespaces.add(namespace); | ||
| glueCatalog.createNamespace(Namespace.of(namespace)); | ||
| Database database = glue.getDatabase(GetDatabaseRequest.builder().name(namespace).build()).database(); | ||
| Assert.assertEquals(namespace, database.name()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I usually like to check operations like create by checking the opposite state first, so asserting that the namespace does not exist using the glue client.
I also think it is a best practice to provide context in most assertions, like assertEquals("Random name should match loaded when directly", namespace, database.name()).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will add messages to assertions.
| s3.headObject(HeadObjectRequest.builder().bucket(testBucketName).key(key).build()); | ||
| Table table = glueCatalog.loadTable(TableIdentifier.of(namespace, tableName)); | ||
| Assert.assertEquals(partitionSpec, table.spec()); | ||
| Assert.assertEquals(schema.toString(), table.schema().toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a bit more whitespace to break up sections of this test? It seems a bit dense.
aws/src/integration/java/org/apache/iceberg/aws/glue/GlueCatalogTableTest.java
Outdated
Show resolved
Hide resolved
| AssertHelpers.assertThrows("should not have table", | ||
| NoSuchTableException.class, | ||
| () -> glueCatalog.renameTable( | ||
| TableIdentifier.of(namespace, tableName), TableIdentifier.of(namespace, tableName + "-3"))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is a rename rollback? Why does this test fail? The table looks like it should exist.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry the message is vague, and the test here is not very intuitive, I will update.
Because there is no actual rename operation in Glue, rename means deleting an old table and creating a new table in Glue pointing to the same metadata. If the creation process fails, it should "rollback" and keep the old table.
| * So for streaming use case with lots of commits, it is recommended to set this value to true. | ||
| * @param fileIO file IO to use. | ||
| */ | ||
| public GlueCatalog(GlueClient glue, String catalogName, String catalogId, String warehousePath, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be public? It makes sense to have it for tests, but I would probably avoid public constructors that we need to support later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since all this does is call initialize, maybe we don't even need it for tests. That method could be package-private.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will just have a package private constructor taking the glue client, and always call the initialize method then, which should simply the situation here a lot.
aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
Outdated
Show resolved
Hide resolved
| name, | ||
| properties.get(CatalogProperties.WAREHOUSE_LOCATION), | ||
| new AwsProperties(properties), | ||
| fileIOImpl == null ? new S3FileIO() : CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a blocker, but I wonder if we should choose the FileIO for a table based on the table location. Any s3 URI would use S3FileIO, but other URIs would still use the default HadoopFileIO.
Another thing I've been considering is using a delegating FileIO that checks whether S3FileIO is available and loads it. If it is available, then it is used for all s3 paths, but HadoopFileIO is used for the others. This may be over-complicated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I am also thinking about a delegating fileIO, but in another PR. I can switch default back to HadoopFileIO if that is more inclusive of all file path types.
| Table glueTable = null; | ||
| try { | ||
| Optional<Table> glueTableOptional = getGlueTable(); | ||
| if (glueTableOptional.isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is used in Hive because when running in Hive, HiveMetaHook is used to create the Iceberg table and by the time it runs the commit, Hive has already created the metadata in the HiveMetaStore.
This shouldn't be needed for Glue because we don't expect any other process to create the Glue table. Concurrent table creation should always call create, which may fail with an AlreadyExistsException.
aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
Outdated
Show resolved
Hide resolved
| "Cannot commit %s because Glue detected concurrent update", tableName()); | ||
| } catch (software.amazon.awssdk.services.glue.model.AlreadyExistsException e) { | ||
| throw new AlreadyExistsException(e, | ||
| "Cannot commit %s because its Glue table already exists when trying to create one", tableName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this block know that create was called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because createTable throws AlreadyExistsException, updateTable throws ConcurrentModificationException
aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
Outdated
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
Outdated
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
Outdated
Show resolved
Hide resolved
| Map<String, String> properties = isUpdate ? Maps.newHashMap(glueTable.parameters()) : Maps.newHashMap(); | ||
| properties.put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH)); | ||
| properties.put(METADATA_LOCATION_PROP, newMetadataLocation); | ||
| if (currentMetadataLocation() != null && !currentMetadataLocation().isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If currentMetadataLocation is an empty string, then there is a problem. Empty strings shouldn't be handles like null strings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I am not very sure if this is a potential edge case to hit, so I am basically following the same way as HiveTableOperations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That looks wrong to me, but if that's the currently released behavior then let's keep it.
| import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest; | ||
| import software.amazon.awssdk.services.glue.model.UpdateDatabaseResponse; | ||
|
|
||
| public class GlueCatalogTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests are okay, but it still doesn't seem like this is testing much. Most of the tests are responding to Mockito.any(GetDatabaseRequest.class) or similar, so it doesn't really test what gets passed to Glue. Better mocking would be great, but I'm starting to think that it may just be easier to maintain an in-memory catalog and a Glue client implementation that updates it. I think that would work better to catch simple problems.
|
Thanks @jackye1995! I think this is looking close. I had a few comments on the implementation in Also, what is the plan for running the integration tests for this? I'm assuming that they pass when configured correctly. How can I run them and do you think we can make them part of CI? |
|
@rdblue Thanks for the comments, I have been debating with myself about the rename order, and I have changed the dropTable to be the last operation. For the commit part, I have also simplified the logic based on the suggestions. For the integration test part, one can run it by setting AWS credentials and a test bucket using environment variable |
|
Merged! Thanks @jackye1995! |
GlueCatalogandGlueTableOperationsimplementationsS3FileIOand add more tests:integrationTesttask to run integration tests for theawsmodule.