-
Notifications
You must be signed in to change notification settings - Fork 2.9k
AWS: support force register table in GlueCatalog #6742
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks for picking this up from me, ping a few people for review @amogh-jahagirdar @singhpk234 @rajarshisarkar @aajisaka @JonasJ-ap |
| } | ||
|
|
||
| @Override | ||
| public org.apache.iceberg.Table registerTable(TableIdentifier identifier, String metadataFileLocation) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we might want to have a feature flag like glue.register-table.create-new-metadata (for the lack of a better name) in AwsProperties to distinguish 2 behaviors, to either create a new metadata file or not. If the flag is true (by default), it can call the base class method directly.
| .tableInput(tableInput) | ||
| .build()); | ||
| } catch (software.amazon.awssdk.services.glue.model.AlreadyExistsException e) { | ||
| glue.updateTable(UpdateTableRequest.builder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, I think we should at least make this a feature flag like glue.register-table.replace-if-exists.
We could further argue if we should make this an API feature or not, but that is subject to debate. Any thoughts?
| TableInput tableInput = TableInput.builder() | ||
| .name(IcebergToGlueConverter.getTableName(identifier, awsProperties.glueCatalogSkipNameValidation())) | ||
| .tableType(GlueTableOperations.GLUE_EXTERNAL_TABLE_TYPE) | ||
| .parameters(tableParameters) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think here we need to still get the metadata and use the IcebergToGlueConverter to get the merged schema for display.
#6591 already fixed this part for Glue too right? |
great, did not notice that one! In that case I think the only missing feature is just replace |
There is also related on going work |
|
Nice, I anticipated similar concerns as in that thread, that's why I'd like to just put it up and see how the community reacts to this. I think the conversation there was around the fact that What Theo is trying to achieve (based on my understanding) is a use case of continuous registration, where a user sends notification of the latest metadata location, and then the metadata location is updated for an existing table in Glue. This is developed for migration use cases of, for example, In this use case, atmoic switch of metadata location is a requirement, compared to a drop + register case. And it can clearly be achieved through calling I think it has benefit in OSS, as people naturally think of using Any thoughts? @RussellSpitzer @yabola @flyrain @szehon-ho @rdblue |
|
@jackye1995 Thanks for pinging me. I agree with you : there is no requirement in a recovery use case, but this can be a requirement in automatic switch of metadata location. But I am not sure if it is a custom logic. Looking forward to other people's perspective . If it makes sense, I can continue to complete my PR. |
@jackye1995: Totally agree that we need to support cross catalog migration. |
|
I briefly read the project you linked, that's very cool CLI! But we are not really trying to build a new migration project out of it, the ask is much simpler. What I want to get out of the discussion is really which way we go out of the following:
|
| .databaseName(IcebergToGlueConverter.getDatabaseName(identifier, | ||
| awsProperties.glueCatalogSkipNameValidation())) | ||
| .tableInput(tableInput) | ||
| .build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should first get the last table version to avoid commit conflict
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think glue catalog already handle this internally?
If we call glue.getTable to get the table version first then call glue.updateTable with nextVersionId it will cause concurrentModificationException.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes this is what I mean. It needs to explicitly pass in the version number of the current version to ensure atomic update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack, will update
| glue.createTable( | ||
| CreateTableRequest.builder().databaseName(databaseName).tableInput(tableInput).build()); | ||
| } catch (software.amazon.awssdk.services.glue.model.AlreadyExistsException e) { | ||
| if (awsProperties.glueCatalogForceRegisterTable()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can do something better to keep the original behavior when the flag is off:
if (!awsProperties.glueCatalogForceRegisterTable()) {
return super.registerTable(.....);
}
// the replace-based logic below
...
|
Resolved comments, pinging people for review @jackye1995 @yabola @amogh-jahagirdar @singhpk234 @rajarshisarkar @aajisaka @JonasJ-ap |
amogh-jahagirdar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looking good, I think there's some fixes we need to do in exception handling and the tests we are adding.
| AssertHelpers.assertThrows( | ||
| "should fail to rename", | ||
| ValidationException.class, | ||
| "Input Glue table is not an iceberg table", | ||
| () -> | ||
| glueCatalog.renameTable( | ||
| TableIdentifier.of(namespace, tableName), | ||
| TableIdentifier.of(namespace, newTableName))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this assertion removed? It looks like it's for rename
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assertion was removed due to the logic change in renameTable that allows rename table to use the previous table's Iceberg Properties (metadata location) the related integration test would always fail at this assert.
| Table table = glueCatalog.loadTable(identifier); | ||
| Table table = glueCatalogWithForceRegisterTable.loadTable(identifier); | ||
| String metadataLocation = ((BaseTable) table).operations().current().metadataFileLocation(); | ||
| Assertions.assertThatThrownBy(() -> glueCatalog.registerTable(identifier, metadataLocation)) | ||
| .isInstanceOf(AlreadyExistsException.class); | ||
| Assertions.assertThat(glueCatalog.dropTable(identifier, true)).isTrue(); | ||
| Assertions.assertThat(glueCatalog.dropNamespace(Namespace.of(namespace))).isTrue(); | ||
| Assertions.assertThat(glueCatalogWithForceRegisterTable.dropTable(identifier, false)).isTrue(); | ||
| Table registeredTable = | ||
| glueCatalogWithForceRegisterTable.registerTable(identifier, metadataLocation); | ||
| Assertions.assertThat(registeredTable).isNotNull(); | ||
|
|
||
| GetTableResponse response = | ||
| glue.getTable(GetTableRequest.builder().databaseName(namespace).name(tableName).build()); | ||
| Assert.assertEquals( | ||
| "external table type is set after register", | ||
| "EXTERNAL_TABLE", | ||
| response.table().tableType()); | ||
| String actualMetadataLocation = | ||
| response.table().parameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP); | ||
| Assert.assertEquals( | ||
| "metadata location should be updated with registerTable call", | ||
| metadataLocation, | ||
| actualMetadataLocation); | ||
|
|
||
| // commit new transaction, should create a new metadata file | ||
| DataFile dataFile = | ||
| DataFiles.builder(partitionSpec) | ||
| .withPath("/path/to/data-a.parquet") | ||
| .withFileSizeInBytes(10) | ||
| .withRecordCount(1) | ||
| .build(); | ||
| table.newAppend().appendFile(dataFile).commit(); | ||
|
|
||
| metadataLocation = ((BaseTable) table).operations().current().metadataFileLocation(); | ||
| // update metadata location | ||
| glueCatalogWithForceRegisterTable.registerTable(identifier, metadataLocation); | ||
| response = | ||
| glue.getTable(GetTableRequest.builder().databaseName(namespace).name(tableName).build()); | ||
| String updatedMetadataLocation = | ||
| response.table().parameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP); | ||
| Assert.assertEquals( | ||
| "metadata location should be updated with registerTable call", | ||
| metadataLocation, | ||
| updatedMetadataLocation); | ||
| Assert.assertEquals("Table Version should be updated", "2", response.table().versionId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we'll want separate tests for testRegisterWhenTableExists, one is when force registration is enabled and one when not. we still want to validate the case when force is false works as expected but this change seems to be removing the validating the previous case. Let me know if I missed something when reading the code!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that's a good call out, I have separated into two tests in the new revision commits, thanks!
| throw new NoSuchNamespaceException( | ||
| e, "Namespace %s is not found in Glue", identifier.namespace()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The exception handling seems off, can't EntityNotFoundException also be thrown when the table is not found?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This exception handling is meant for catching exceptions for glue.createTable calls only, for any exception during the getTable & updateTable calls we should have it throw exceptions as expected, thoughts?
Also I think the EntityNotFoundException won't be thrown for table not found in the case of Table AlreadyExistsException.
| this.catalogProperties = ImmutableMap.copyOf(properties); | ||
| this.catalogProperties = new HashMap<>(); | ||
| catalogProperties.putAll(properties); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Making CatalogProperties non-immutable in order to effectively side-load information into the AWS client factory post-initialization is a dangerous precedent to set. In addition, it doesn't seem to actually accomplish anything besides allowing the GlueCatalog to suddenly switch regions post-initialization, which is likely to introduce some dangerous side effects.
|
|
||
| String factoryImpl = | ||
| PropertyUtil.propertyAsString(catalogProperties, AwsProperties.CLIENT_FACTORY, null); | ||
| if (factoryImpl != null && factoryImpl.equals(AssumeRoleAwsClientFactory.class.getName())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It hurts extensibility to make logic specific to a particular implementation class. If, for example, a customer needs to extend AssumeRoleAwsClientFactory to add some functionality for their own use, this logic will break as the class name will no longer match.
| String catalogFileIORegion = awsProperties.getGlueCatalogFileIORegion(); | ||
| if (catalogFileIORegion != null) { | ||
| catalogProperties.put(AwsProperties.CLIENT_ASSUME_ROLE_REGION, catalogFileIORegion); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't this logic and the associated parameter be removed and replaced with just setting AwsProperties.CLIENT_ASSUME_ROLE_REGION directly? This change is not in any way scoped to this call or AWS service, it effectively creates a case where what region the GlueCatalog is calling can suddenly change post-initialization after the first time registerTable is called.
| public static final boolean GLUE_CATALOG_FORCE_REGISTER_TABLE_DEFAULT = false; | ||
|
|
||
| /** Configure the Glue Catalog S3 FileIO Region to allow cross region s3 access */ | ||
| public static final String GLUE_CATALOG_FILE_IO_REGION = "glue.catalog-file-io-region"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we already have client.region which sets the region for all services. I am going to be introducing a change in the near future that will allow setting per-region for the default AWS Client Factory (and we can extend it to the Assume Role client factory as well), so we probably want a more general per-service naming scheme. Based on how existing parameters are formatted where glue. and s3. are already established prefixes, most likely something like: glue.region, s3.region, kms.region, etc...
| TableOperations ops = newTableOps(identifier); | ||
| InputFile metadataFile = ops.io().newInputFile(metadataFileLocation); | ||
| TableMetadata metadata = TableMetadataParser.read(ops.io(), metadataFile); | ||
|
|
||
| Map<String, String> tableParameters = | ||
| ImmutableMap.of( | ||
| BaseMetastoreTableOperations.TABLE_TYPE_PROP, | ||
| BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toLowerCase(Locale.ENGLISH), | ||
| BaseMetastoreTableOperations.METADATA_LOCATION_PROP, | ||
| metadataFileLocation); | ||
|
|
||
| String databaseName = | ||
| IcebergToGlueConverter.getDatabaseName( | ||
| identifier, awsProperties.glueCatalogSkipNameValidation()); | ||
| String tableName = | ||
| IcebergToGlueConverter.getTableName( | ||
| identifier, awsProperties.glueCatalogSkipNameValidation()); | ||
|
|
||
| TableInput tableInput = | ||
| TableInput.builder() | ||
| .applyMutation( | ||
| builder -> | ||
| IcebergToGlueConverter.setTableInputInformation( | ||
| builder, metadata, tableParameters)) | ||
| .name(tableName) | ||
| .tableType(GlueTableOperations.GLUE_EXTERNAL_TABLE_TYPE) | ||
| .parameters(tableParameters) | ||
| .build(); | ||
|
|
||
| try { | ||
| glue.createTable( | ||
| CreateTableRequest.builder().databaseName(databaseName).tableInput(tableInput).build()); | ||
| } catch (software.amazon.awssdk.services.glue.model.AlreadyExistsException e) { | ||
| GetTableResponse response = | ||
| glue.getTable( | ||
| GetTableRequest.builder().databaseName(databaseName).name(tableName).build()); | ||
| String versionId = response.table().versionId(); | ||
| glue.updateTable( | ||
| UpdateTableRequest.builder() | ||
| .databaseName(databaseName) | ||
| .tableInput(tableInput) | ||
| .versionId(versionId) | ||
| .build()); | ||
| } catch (EntityNotFoundException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this logic is a combination of a fork of the logic in BaseMetastoreCatalog.registerTable and GlueTableOperations.persistGlueTable. As GlueTableOperations also has access to AwsProperties, I would recommend refactoring the logic in GlueTableOperations so that persistGlueTable can conditionally fall back to its update mode as that improves code reuse.
If the concern is the creation of an extra metadata file, it looks like GlueTableOperations already checks whether it needs to during the writeNewMetadataIfRequired function and considering tableMetadata is always set for registerTable, it will never choose to write a new metadata file anyways.
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
Add support for registerTable in GlueCatalog.
Customizations:
Reference: #4099