-
Notifications
You must be signed in to change notification settings - Fork 3k
Core: Support bulk table migration from one catalog to another #5492
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Hopefully later we can call this API from python CLI and make the migration more easier. |
| ExecutorService executorService = null; | ||
| if (maxConcurrentMigrates > 0) { | ||
| executorService = ThreadPools.newWorkerPool("migrate-tables", maxConcurrentMigrates); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'll probably want to add the option to allow the user to pass in an executor service, like is done in several other actions etc.
If this is just a temporary utility -- or I should say isn't intended to be used from Flink ever (which was the motivator for allowing passing in executor services) -- then it might not be an issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just like other CatalogUtil API, this migrate API was intended to work by not depending on any engines.
If we need engine-specific implementation, we can have a spark action or flink action later on.
|
|
||
| private static void validate( | ||
| Catalog sourceCatalog, Catalog targetCatalog, int maxConcurrentMigrates) { | ||
| Preconditions.checkArgument( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there should be tests for all those validation checks
| // register the table to the target catalog | ||
| TableOperations ops = | ||
| ((HasTableOperations) sourceCatalog.loadTable(tableIdentifier)).operations(); | ||
| targetCatalog.registerTable(tableIdentifier, ops.current().metadataFileLocation()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what would happen if tables would be modified in parallel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to educate the users. Updated the doc.
Or do you mean adding a flag at the source catalog to throw an error for new requests when migration is in progress?
93b1680 to
08beb3f
Compare
| * would be dropped from the source catalog. | ||
| * | ||
| * <p>Users must make sure that no in-progress commits on the tables of source catalog during | ||
| * migration. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a major issue. How many people are going to call this without first shutting down all of the running processes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. Many users don't read java doc first :)
What is your opinion on having a flag in catalog, when that is set, Catalog can throw an error that migration is in progress and discard in-progress commits?
| Catalog sourceCatalog, | ||
| Catalog targetCatalog, | ||
| int maxConcurrentMigrations, | ||
| boolean deleteEntriesFromSourceCatalog) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We generally avoid adding boolean arguments to public methods. It's usually better to use different verbs, like migrate (with delete) and register (copy).
| private static void migrate( | ||
| Catalog sourceCatalog, | ||
| Catalog targetCatalog, | ||
| TableIdentifier tableIdentifier, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This uses a different argument order than migrateTables. I generally prefer consistency.
| boolean deleteEntriesFromSourceCatalog) { | ||
| // register the table to the target catalog | ||
| TableOperations ops = | ||
| ((HasTableOperations) sourceCatalog.loadTable(tableIdentifier)).operations(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's usually better to use BaseTable than to use HasTableOperations. That way you're not trying to move a metadata table.
|
Will handle it as a separate project as discussed. |
To support the migration of tables from one catalog to another catalog seamlessly, we have added [#5037 , #4946].
But still, bulk migration is hard as the API user needs to write some extra code as
registerTable()expects the table Identifier and metadata location of each table.Hence, adding a
CatalogUtilAPI that can help in bulk migration of tables from one catalog to another.