-
Notifications
You must be signed in to change notification settings - Fork 3k
Core : Catalog Tables Migration API #5297
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,10 +20,16 @@ | |
| package org.apache.iceberg; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.Locale; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
| import java.util.stream.Collectors; | ||
| import org.apache.iceberg.catalog.Catalog; | ||
| import org.apache.iceberg.catalog.Namespace; | ||
| import org.apache.iceberg.catalog.SupportsNamespaces; | ||
| import org.apache.iceberg.catalog.TableIdentifier; | ||
| import org.apache.iceberg.common.DynClasses; | ||
| import org.apache.iceberg.common.DynConstructors; | ||
| import org.apache.iceberg.common.DynMethods; | ||
|
|
@@ -33,6 +39,7 @@ | |
| import org.apache.iceberg.io.FileIO; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Joiner; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Iterables; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.MapMaker; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Sets; | ||
|
|
@@ -346,4 +353,63 @@ public static void configureHadoopConf(Object maybeConfigurable, Object conf) { | |
|
|
||
| setConf.invoke(conf); | ||
| } | ||
|
|
||
| /** | ||
| * Used to migrate tables from one catalog(source catalog) to another catalog(target catalog). | ||
| * Also, the table would be dropped off from the source catalog once the migration is successful. | ||
| * | ||
| * @param tableIdentifiers a list of tableIdentifiers for the tables required to be migrated, | ||
| * if not specified all the tables would be migrated | ||
| * @param sourceCatalogProperties Source Catalog Properties | ||
| * @param targetCatalogProperties Target Catalog Properties | ||
| * @param sourceHadoopConfig Source Catalog Hadoop Configuration | ||
| * @param targetHadoopConfig Target Catalog Hadoop Configuration | ||
| * @return list of table identifiers for successfully migrated tables | ||
| */ | ||
| public static List<TableIdentifier> migrateTables(List<TableIdentifier> tableIdentifiers, | ||
| Map<String, String> sourceCatalogProperties, Map<String, String> targetCatalogProperties, | ||
| Object sourceHadoopConfig, Object targetHadoopConfig) { | ||
| if (tableIdentifiers != null) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's leave out the catalog instantiation and configuration here completely. I suspect that users have at least one of these catalogs already handy - and setting up "the same" catalog twice is superfluous. |
||
| tableIdentifiers.forEach(tableIdentifier -> Preconditions.checkArgument( | ||
| tableIdentifier != null, "Invalid identifier: %s", tableIdentifier)); | ||
| } | ||
| Catalog sourceCatalog; | ||
| try { | ||
| sourceCatalog = loadCatalog(sourceCatalogProperties.get("catalogImpl"), | ||
| sourceCatalogProperties.get("catalogName"), sourceCatalogProperties, sourceHadoopConfig); | ||
| } catch (IllegalArgumentException e) { | ||
| throw new IllegalArgumentException(String.format( | ||
| "Cannot initialize Source Catalog implementation %s: %s", sourceCatalogProperties.get("catalogImpl"), | ||
| e.getMessage()), e); | ||
| } | ||
| Catalog targetCatalog; | ||
| try { | ||
| targetCatalog = loadCatalog(targetCatalogProperties.get("catalogImpl"), | ||
| targetCatalogProperties.get("catalogName"), targetCatalogProperties, targetHadoopConfig); | ||
| } catch (IllegalArgumentException e) { | ||
| throw new IllegalArgumentException(String.format( | ||
| "Cannot initialize Target Catalog implementation %s: %s", targetCatalogProperties.get("catalogImpl"), | ||
| e.getMessage()), e); | ||
| } | ||
| List<TableIdentifier> allIdentifiers = tableIdentifiers; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think, this code should probably live in |
||
| if (tableIdentifiers == null || tableIdentifiers.isEmpty()) { | ||
| List<Namespace> namespaces = (sourceCatalog instanceof SupportsNamespaces) ? | ||
| ((SupportsNamespaces) sourceCatalog).listNamespaces() : ImmutableList.of(Namespace.empty()); | ||
| allIdentifiers = namespaces.stream().flatMap(ns -> | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suspect this will run for a very long time, like when there a a lot of tables. |
||
| sourceCatalog.listTables(ns).stream()).collect(Collectors.toList()); | ||
| } | ||
| List<TableIdentifier> migratedTableIdentifiers = new ArrayList<TableIdentifier>(); | ||
| allIdentifiers.forEach(tableIdentifier -> { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suspect this will run for a very long time, like when there a a lot of tables. Not sure whether it is actually possible to properly handle the case when |
||
| final Table icebergTable = sourceCatalog.loadTable(tableIdentifier); | ||
| TableOperations ops = ((HasTableOperations) icebergTable).operations(); | ||
| String metadataLocation = ops.current().metadataFileLocation(); | ||
| targetCatalog.registerTable(tableIdentifier, metadataLocation); | ||
| migratedTableIdentifiers.add(tableIdentifier); | ||
| if (!(sourceCatalogProperties.get("catalogImpl").equals("org.apache.iceberg.hadoop.HadoopCatalog"))) { | ||
| // Hadoop dropTable deletes the table completely even if the purge is false, would update in follow-up PR | ||
| sourceCatalog.dropTable(tableIdentifier, false); | ||
| } | ||
| }); | ||
| return migratedTableIdentifiers; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -37,10 +37,12 @@ | |
| import org.apache.iceberg.CatalogProperties; | ||
| import org.apache.iceberg.DataFile; | ||
| import org.apache.iceberg.DataFiles; | ||
| import org.apache.iceberg.HasTableOperations; | ||
| import org.apache.iceberg.PartitionSpec; | ||
| import org.apache.iceberg.Schema; | ||
| import org.apache.iceberg.SortOrder; | ||
| import org.apache.iceberg.Table; | ||
| import org.apache.iceberg.TableOperations; | ||
| import org.apache.iceberg.Transaction; | ||
| import org.apache.iceberg.catalog.CatalogTests; | ||
| import org.apache.iceberg.catalog.Namespace; | ||
|
|
@@ -58,6 +60,7 @@ | |
| import org.apache.iceberg.transforms.Transform; | ||
| import org.apache.iceberg.transforms.Transforms; | ||
| import org.apache.iceberg.types.Types; | ||
| import org.assertj.core.api.Assertions; | ||
| import org.junit.Assert; | ||
| import org.junit.jupiter.api.BeforeEach; | ||
| import org.junit.jupiter.api.Test; | ||
|
|
@@ -639,4 +642,29 @@ public void testConversions() { | |
| Assert.assertEquals(ns, JdbcUtil.stringToNamespace(nsString)); | ||
| } | ||
|
|
||
| @Test | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should these tests better live in |
||
| public void testRegisterTable() { | ||
| TableIdentifier identifier = TableIdentifier.of("a", "t1"); | ||
| catalog.createTable(identifier, SCHEMA); | ||
| Table registeringTable = catalog.loadTable(identifier); | ||
| catalog.dropTable(identifier, false); | ||
| TableOperations ops = ((HasTableOperations) registeringTable).operations(); | ||
| String metadataLocation = ((JdbcTableOperations) ops).currentMetadataLocation(); | ||
| Assertions.assertThat(catalog.registerTable(identifier, metadataLocation)).isNotNull(); | ||
| Assertions.assertThat(catalog.loadTable(identifier)).isNotNull(); | ||
| Assertions.assertThat(catalog.dropTable(identifier)).isTrue(); | ||
| } | ||
|
|
||
| @Test | ||
| public void testRegisterExistingTable() { | ||
| TableIdentifier identifier = TableIdentifier.of("a", "t1"); | ||
| catalog.createTable(identifier, SCHEMA); | ||
| Table registeringTable = catalog.loadTable(identifier); | ||
| TableOperations ops = ((HasTableOperations) registeringTable).operations(); | ||
| String metadataLocation = ((JdbcTableOperations) ops).currentMetadataLocation(); | ||
| Assertions.assertThatThrownBy(() -> catalog.registerTable(identifier, metadataLocation)) | ||
| .isInstanceOf(AlreadyExistsException.class) | ||
| .hasMessage("Table already exists: a.t1"); | ||
| Assertions.assertThat(catalog.dropTable(identifier)).isTrue(); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This pair of tests is repeated (in a very similar way) across multiple catalogs. Can those be centralized somewhere?
CatalogTestsmaybe?