diff --git a/core/src/main/java/org/apache/iceberg/CatalogMigrateUtil.java b/core/src/main/java/org/apache/iceberg/CatalogMigrateUtil.java new file mode 100644 index 000000000000..4fc9e7340432 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/CatalogMigrateUtil.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CatalogMigrateUtil { + private static final Logger LOG = LoggerFactory.getLogger(CatalogMigrateUtil.class); + + private CatalogMigrateUtil() {} + + /** + * Migrates tables from one catalog(source catalog) to another catalog(target catalog). After + * successful migration, deletes the table entry from source catalog(not applicable for + * HadoopCatalog). + * + *

Supports bulk migrations with a multi-thread execution. + * + *

Users must make sure that no in-progress commits on the tables of source catalog during + * migration. + * + * @param tableIdentifiers a list of {@link TableIdentifier} for the tables required to be + * migrated. If not specified, all the tables would be migrated + * @param sourceCatalog Source {@link Catalog} from which the tables are chosen + * @param targetCatalog Target {@link Catalog} to which the tables need to be migrated + * @param maxThreadPoolSize Size of the thread pool used for migrate tables (If set to 0, no + * thread pool is used) + * @return Collection of table identifiers for successfully migrated tables + */ + public static Collection migrateTables( + List tableIdentifiers, + Catalog sourceCatalog, + Catalog targetCatalog, + int maxThreadPoolSize) { + return migrateTables(tableIdentifiers, sourceCatalog, targetCatalog, maxThreadPoolSize, true); + } + + /** + * Register tables from one catalog(source catalog) to another catalog(target catalog). User has + * to take care of deleting the tables from source catalog after registration. + * + *

Supports bulk registration with a multi-thread execution. + * + *

Users must make sure that no in-progress commits on the tables of source catalog during + * registration. + * + * @param tableIdentifiers a list of {@link TableIdentifier} for the tables required to be + * registered. If not specified, all the tables would be registered + * @param sourceCatalog Source {@link Catalog} from which the tables are chosen + * @param targetCatalog Target {@link Catalog} to which the tables need to be registered + * @param maxThreadPoolSize Size of the thread pool used for registering tables (If set to 0, no + * thread pool is used) + * @return Collection of table identifiers for successfully registered tables + */ + public static Collection registerTables( + List tableIdentifiers, + Catalog sourceCatalog, + Catalog targetCatalog, + int maxThreadPoolSize) { + return migrateTables(tableIdentifiers, sourceCatalog, targetCatalog, maxThreadPoolSize, false); + } + + private static Collection migrateTables( + List tableIdentifiers, + Catalog sourceCatalog, + Catalog targetCatalog, + int maxThreadPoolSize, + boolean deleteEntriesFromSourceCatalog) { + validate(sourceCatalog, targetCatalog, maxThreadPoolSize); + + List identifiers; + if (tableIdentifiers == null || tableIdentifiers.isEmpty()) { + // fetch all the table identifiers from all the namespaces. + List namespaces = + (sourceCatalog instanceof SupportsNamespaces) + ? ((SupportsNamespaces) sourceCatalog).listNamespaces() + : ImmutableList.of(Namespace.empty()); + identifiers = + namespaces.stream() + .flatMap(namespace -> sourceCatalog.listTables(namespace).stream()) + .collect(Collectors.toList()); + } else { + identifiers = tableIdentifiers; + } + + ExecutorService executorService = null; + if (maxThreadPoolSize > 0) { + executorService = ThreadPools.newWorkerPool("migrate-tables", maxThreadPoolSize); + } + + try { + Collection migratedTableIdentifiers = new ConcurrentLinkedQueue<>(); + Tasks.foreach(identifiers.stream().filter(Objects::nonNull)) + .retry(3) + .stopRetryOn(NoSuchTableException.class, NoSuchNamespaceException.class) + .suppressFailureWhenFinished() + .executeWith(executorService) + .onFailure( + (tableIdentifier, exc) -> + LOG.warn("Unable to migrate table {}", tableIdentifier, exc)) + .run( + tableIdentifier -> { + migrate( + tableIdentifier, sourceCatalog, targetCatalog, deleteEntriesFromSourceCatalog); + migratedTableIdentifiers.add(tableIdentifier); + }); + return migratedTableIdentifiers; + } finally { + if (executorService != null) { + executorService.shutdown(); + } + } + } + + private static void validate( + Catalog sourceCatalog, Catalog targetCatalog, int maxThreadPoolSize) { + Preconditions.checkArgument( + maxThreadPoolSize >= 0, + "maxThreadPoolSize should have value >= 0, value: " + maxThreadPoolSize); + Preconditions.checkArgument(sourceCatalog != null, "Invalid source catalog: null"); + Preconditions.checkArgument(targetCatalog != null, "Invalid target catalog: null"); + Preconditions.checkArgument( + !targetCatalog.equals(sourceCatalog), "target catalog is same as source catalog"); + } + + private static void migrate( + TableIdentifier tableIdentifier, + Catalog sourceCatalog, + Catalog targetCatalog, + boolean deleteEntriesFromSourceCatalog) { + // register the table to the target catalog + TableOperations ops = ((BaseTable) sourceCatalog.loadTable(tableIdentifier)).operations(); + targetCatalog.registerTable(tableIdentifier, ops.current().metadataFileLocation()); + + if (deleteEntriesFromSourceCatalog && !(sourceCatalog instanceof HadoopCatalog)) { + // HadoopCatalog dropTable will delete the table files completely even when purge is false. + // So, skip dropTable for HadoopCatalog. + sourceCatalog.dropTable(tableIdentifier, false); + } + } +} diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java index 231f6144e500..a79d12b72f0c 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java @@ -32,14 +32,18 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.IOException; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.stream.IntStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.CachingCatalog; +import org.apache.iceberg.CatalogMigrateUtil; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.DataFile; @@ -65,8 +69,10 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.transforms.Transform; import org.apache.iceberg.transforms.Transforms; @@ -772,4 +778,58 @@ public void testDatabaseLocationWithSlashInWarehouseDir() { Assert.assertEquals("s3://bucket/database.db", database.getLocationUri()); } + + @Test + public void testMigrateTablesFromHadoopCatalogToHiveCatalog() throws IOException { + Map hadoopCatalogProperties = + ImmutableMap.of( + "type", "hadoop", + "catalogImpl", "org.apache.iceberg.hadoop.HadoopCatalog", + "warehouse", temp.newFolder().getAbsolutePath()); + HadoopCatalog hadoopCatalog = + (HadoopCatalog) + CatalogUtil.buildIcebergCatalog("hadoop", hadoopCatalogProperties, new Configuration()); + + List identifiers = createTables(hadoopCatalog, 10); + + // migrate specific tables based on identifiers + migrateAndValidate(hadoopCatalog, identifiers.subList(5, 9), identifiers.subList(5, 9)); + + // migrate all the tables by without specifying the identifiers + migrateAndValidate(hadoopCatalog, identifiers, null); + } + + private void migrateAndValidate( + HadoopCatalog hadoopCatalog, + List expectedIdentifiers, + List sourceIdentifiers) { + Collection migratedIdentifiers = + CatalogMigrateUtil.migrateTables(sourceIdentifiers, hadoopCatalog, catalog, 3); + + org.assertj.core.api.Assertions.assertThat(expectedIdentifiers) + .containsExactlyInAnyOrderElementsOf(migratedIdentifiers); + migratedIdentifiers.forEach( + tableIdentifier -> { + // test by accessing the migrated tables from target catalog + Assert.assertNotNull(catalog.loadTable(tableIdentifier)); + // drop table from target catalog after validation + Assert.assertTrue(catalog.dropTable(tableIdentifier, false)); + }); + } + + private static List createTables(Catalog catalog, int numOfTables) { + Schema schema = + new Schema( + required(1, "id", Types.IntegerType.get(), "unique ID"), + required(2, "data", Types.StringType.get())); + List identifiers = Lists.newArrayList(); + IntStream.range(0, numOfTables) + .forEach( + index -> { + TableIdentifier identifier = TableIdentifier.of(DB_NAME, "tbl" + index); + catalog.createTable(identifier, schema); + identifiers.add(identifier); + }); + return identifiers; + } }