diff --git a/pom.xml b/pom.xml index dac44eac..fa94f6f1 100644 --- a/pom.xml +++ b/pom.xml @@ -61,7 +61,7 @@ 5.9.0 1.18.30 1.18.20.0 - 3.4.0 + 3.3.6 0.14.0 2.28.22 3.3.1 @@ -373,9 +373,9 @@ runtime - com.amazonaws - aws-java-sdk-bundle - 1.12.328 + software.amazon.awssdk + bundle + ${aws.version} runtime diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java index 0d50537b..63e9d673 100644 --- a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java @@ -51,7 +51,6 @@ public class ConversionConfig { SyncMode syncMode) { this.sourceTable = sourceTable; this.targetTables = targetTables; - this.targetCatalogs = targetCatalogs; Preconditions.checkArgument( targetTables != null && !targetTables.isEmpty(), "Please provide at-least one format to sync"); diff --git a/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java index e76d5906..e52c203b 100644 --- a/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java +++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java @@ -83,6 +83,11 @@ private CatalogSyncStatus syncCatalog( CatalogSyncClient
catalogSyncClient, CatalogTableIdentifier tableIdentifier, InternalTable table) { + log.info( + "Running catalog sync for table {} with format {} using catalogSync {}", + table.getBasePath(), + table.getTableFormat(), + catalogSyncClient.getClass().getName()); if (!catalogSyncClient.hasDatabase(tableIdentifier)) { catalogSyncClient.createDatabase(tableIdentifier); } diff --git a/xtable-api/src/test/java/org/apache/xtable/model/catalog/TestHierarchicalTableIdentifier.java b/xtable-api/src/test/java/org/apache/xtable/model/catalog/TestHierarchicalTableIdentifier.java index 63cf1181..f773c279 100644 --- a/xtable-api/src/test/java/org/apache/xtable/model/catalog/TestHierarchicalTableIdentifier.java +++ b/xtable-api/src/test/java/org/apache/xtable/model/catalog/TestHierarchicalTableIdentifier.java @@ -28,16 +28,23 @@ class TestHierarchicalTableIdentifier { @Test void testToString() { HierarchicalTableIdentifier catalogTableIdentifier = - new HierarchicalTableIdentifier("catalogName.databaseName.tableName"); + ThreePartHierarchicalTableIdentifier.fromDotSeparatedIdentifier( + "catalogName.databaseName.tableName"); assertEquals("catalogName.databaseName.tableName", catalogTableIdentifier.getId()); } @Test void testConstructorForHierarchicalTableIdentifier() { Assertions.assertDoesNotThrow( - () -> new HierarchicalTableIdentifier("catalogName.databaseName.tableName")); - Assertions.assertDoesNotThrow(() -> new HierarchicalTableIdentifier("databaseName.tableName")); + () -> + ThreePartHierarchicalTableIdentifier.fromDotSeparatedIdentifier( + "catalogName.databaseName.tableName")); + Assertions.assertDoesNotThrow( + () -> + ThreePartHierarchicalTableIdentifier.fromDotSeparatedIdentifier( + "databaseName.tableName")); Assertions.assertThrows( - IllegalArgumentException.class, () -> new HierarchicalTableIdentifier("tableName")); + IllegalArgumentException.class, + () -> ThreePartHierarchicalTableIdentifier.fromDotSeparatedIdentifier("tableName")); } } diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java index 37778eb8..7b4a0ba3 100644 --- a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java @@ -57,7 +57,7 @@ public CatalogSyncClient createCatalogSyncClient( return ReflectionUtils.createInstanceOfClass( targetCatalogConfig.getCatalogSyncClientImpl(), targetCatalogConfig, - tableFormat, - configuration); + configuration, + tableFormat); } } diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogUtils.java b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogUtils.java new file mode 100644 index 00000000..7692824e --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogUtils.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog; + +import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.model.catalog.HierarchicalTableIdentifier; + +public class CatalogUtils { + + public static HierarchicalTableIdentifier castToHierarchicalTableIdentifier( + CatalogTableIdentifier tableIdentifier) { + if (tableIdentifier instanceof HierarchicalTableIdentifier) { + return (HierarchicalTableIdentifier) tableIdentifier; + } + throw new IllegalArgumentException("Invalid tableIdentifier implementation"); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/glue/GlueCatalogConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/catalog/glue/GlueCatalogConversionSource.java index 7da4ff55..67f5b317 100644 --- a/xtable-core/src/main/java/org/apache/xtable/catalog/glue/GlueCatalogConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/glue/GlueCatalogConversionSource.java @@ -18,9 +18,13 @@ package org.apache.xtable.catalog.glue; +import static org.apache.xtable.catalog.CatalogUtils.castToHierarchicalTableIdentifier; + import java.util.Locale; import java.util.Properties; +import javax.security.auth.login.Configuration; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; @@ -29,6 +33,7 @@ import org.apache.xtable.conversion.SourceTable; import org.apache.xtable.exception.CatalogSyncException; import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.model.catalog.HierarchicalTableIdentifier; import org.apache.xtable.spi.extractor.CatalogConversionSource; import software.amazon.awssdk.services.glue.GlueClient; @@ -41,7 +46,8 @@ public class GlueCatalogConversionSource implements CatalogConversionSource { private final GlueClient glueClient; private final GlueCatalogConfig glueCatalogConfig; - public GlueCatalogConversionSource(ExternalCatalogConfig catalogConfig) { + public GlueCatalogConversionSource( + ExternalCatalogConfig catalogConfig, Configuration configuration) { this.glueCatalogConfig = GlueCatalogConfig.of(catalogConfig.getCatalogProperties()); this.glueClient = new DefaultGlueClientFactory(glueCatalogConfig).getGlueClient(); } @@ -54,13 +60,14 @@ public GlueCatalogConversionSource(GlueCatalogConfig glueCatalogConfig, GlueClie @Override public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) { + HierarchicalTableIdentifier tblIdentifier = castToHierarchicalTableIdentifier(tableIdentifier); try { GetTableResponse response = glueClient.getTable( GetTableRequest.builder() .catalogId(glueCatalogConfig.getCatalogId()) - .databaseName(tableIdentifier.getDatabaseName()) - .name(tableIdentifier.getTableName()) + .databaseName(tblIdentifier.getDatabaseName()) + .name(tblIdentifier.getTableName()) .build()); Table table = response.table(); if (table == null) { @@ -70,7 +77,7 @@ public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) { String tableFormat = TableFormatUtils.getTableFormat(table.parameters()); if (Strings.isNullOrEmpty(tableFormat)) { throw new IllegalStateException( - String.format("TableFormat is null or empty for table: %s", tableIdentifier)); + String.format("TableFormat is null or empty for table: %s", tableIdentifier.getId())); } tableFormat = tableFormat.toUpperCase(Locale.ENGLISH); diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/glue/GlueCatalogSyncClient.java b/xtable-core/src/main/java/org/apache/xtable/catalog/glue/GlueCatalogSyncClient.java index 0470dd10..1e565492 100644 --- a/xtable-core/src/main/java/org/apache/xtable/catalog/glue/GlueCatalogSyncClient.java +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/glue/GlueCatalogSyncClient.java @@ -18,6 +18,8 @@ package org.apache.xtable.catalog.glue; +import static org.apache.xtable.catalog.CatalogUtils.castToHierarchicalTableIdentifier; + import java.time.ZonedDateTime; import lombok.extern.log4j.Log4j2; @@ -31,6 +33,8 @@ import org.apache.xtable.exception.CatalogSyncException; import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.model.catalog.HierarchicalTableIdentifier; +import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier; import org.apache.xtable.spi.sync.CatalogSyncClient; import software.amazon.awssdk.services.glue.GlueClient; @@ -96,7 +100,8 @@ public String getStorageLocation(Table table) { } @Override - public boolean hasDatabase(String databaseName) { + public boolean hasDatabase(CatalogTableIdentifier tableIdentifier) { + String databaseName = castToHierarchicalTableIdentifier(tableIdentifier).getDatabaseName(); try { return glueClient .getDatabase( @@ -114,7 +119,8 @@ public boolean hasDatabase(String databaseName) { } @Override - public void createDatabase(String databaseName) { + public void createDatabase(CatalogTableIdentifier tableIdentifier) { + String databaseName = castToHierarchicalTableIdentifier(tableIdentifier).getDatabaseName(); try { glueClient.createDatabase( CreateDatabaseRequest.builder() @@ -132,52 +138,55 @@ public void createDatabase(String databaseName) { @Override public Table getTable(CatalogTableIdentifier tableIdentifier) { + HierarchicalTableIdentifier tblIdentifier = castToHierarchicalTableIdentifier(tableIdentifier); try { GetTableResponse response = glueClient.getTable( GetTableRequest.builder() .catalogId(glueCatalogConfig.getCatalogId()) - .databaseName(tableIdentifier.getDatabaseName()) - .name(tableIdentifier.getTableName()) + .databaseName(tblIdentifier.getDatabaseName()) + .name(tblIdentifier.getTableName()) .build()); return response.table(); } catch (EntityNotFoundException e) { return null; } catch (Exception e) { - throw new CatalogSyncException("Failed to get table: " + tableIdentifier, e); + throw new CatalogSyncException("Failed to get table: " + tblIdentifier.getId(), e); } } @Override public void createTable(InternalTable table, CatalogTableIdentifier tableIdentifier) { + HierarchicalTableIdentifier tblIdentifier = castToHierarchicalTableIdentifier(tableIdentifier); TableInput tableInput = tableBuilder.getCreateTableRequest(table, tableIdentifier); try { glueClient.createTable( CreateTableRequest.builder() .catalogId(glueCatalogConfig.getCatalogId()) - .databaseName(tableIdentifier.getDatabaseName()) + .databaseName(tblIdentifier.getDatabaseName()) .tableInput(tableInput) .build()); } catch (Exception e) { - throw new CatalogSyncException("Failed to create table: " + tableIdentifier, e); + throw new CatalogSyncException("Failed to create table: " + tblIdentifier.getId(), e); } } @Override public void refreshTable( InternalTable table, Table catalogTable, CatalogTableIdentifier tableIdentifier) { + HierarchicalTableIdentifier tblIdentifier = castToHierarchicalTableIdentifier(tableIdentifier); TableInput tableInput = tableBuilder.getUpdateTableRequest(table, catalogTable, tableIdentifier); try { glueClient.updateTable( UpdateTableRequest.builder() .catalogId(glueCatalogConfig.getCatalogId()) - .databaseName(tableIdentifier.getDatabaseName()) + .databaseName(tblIdentifier.getDatabaseName()) .skipArchive(true) .tableInput(tableInput) .build()); } catch (Exception e) { - throw new CatalogSyncException("Failed to refresh table: " + tableIdentifier, e); + throw new CatalogSyncException("Failed to refresh table: " + tblIdentifier.getId(), e); } } @@ -191,15 +200,16 @@ public void createOrReplaceTable(InternalTable table, CatalogTableIdentifier tab @Override public void dropTable(InternalTable table, CatalogTableIdentifier tableIdentifier) { + HierarchicalTableIdentifier tblIdentifier = castToHierarchicalTableIdentifier(tableIdentifier); try { glueClient.deleteTable( DeleteTableRequest.builder() .catalogId(glueCatalogConfig.getCatalogId()) - .databaseName(tableIdentifier.getDatabaseName()) - .name(tableIdentifier.getTableName()) + .databaseName(tblIdentifier.getDatabaseName()) + .name(tblIdentifier.getTableName()) .build()); } catch (Exception e) { - throw new CatalogSyncException("Failed to drop table: " + tableIdentifier, e); + throw new CatalogSyncException("Failed to drop table: " + tableIdentifier.getId(), e); } } @@ -217,13 +227,11 @@ public void close() throws Exception { */ private void validateTempTableCreation( InternalTable table, CatalogTableIdentifier tableIdentifier) { + HierarchicalTableIdentifier tblIdentifier = castToHierarchicalTableIdentifier(tableIdentifier); String tempTableName = - tableIdentifier.getTableName() + TEMP_SUFFIX + ZonedDateTime.now().toEpochSecond(); - CatalogTableIdentifier tempTableIdentifier = - CatalogTableIdentifier.builder() - .tableName(tempTableName) - .databaseName(tableIdentifier.getDatabaseName()) - .build(); + tblIdentifier.getTableName() + TEMP_SUFFIX + ZonedDateTime.now().toEpochSecond(); + ThreePartHierarchicalTableIdentifier tempTableIdentifier = + new ThreePartHierarchicalTableIdentifier(tblIdentifier.getDatabaseName(), tempTableName); createTable(table, tempTableIdentifier); dropTable(table, tempTableIdentifier); } diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/glue/table/IcebergGlueCatalogTableBuilder.java b/xtable-core/src/main/java/org/apache/xtable/catalog/glue/table/IcebergGlueCatalogTableBuilder.java index cbfc5189..81b9449a 100644 --- a/xtable-core/src/main/java/org/apache/xtable/catalog/glue/table/IcebergGlueCatalogTableBuilder.java +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/glue/table/IcebergGlueCatalogTableBuilder.java @@ -21,6 +21,7 @@ import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP; import static org.apache.iceberg.BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP; import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; +import static org.apache.xtable.catalog.CatalogUtils.castToHierarchicalTableIdentifier; import static org.apache.xtable.catalog.glue.GlueCatalogSyncClient.GLUE_EXTERNAL_TABLE_TYPE; import java.util.HashMap; @@ -37,6 +38,7 @@ import org.apache.xtable.catalog.glue.GlueSchemaExtractor; import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.model.catalog.HierarchicalTableIdentifier; import org.apache.xtable.model.storage.TableFormat; import software.amazon.awssdk.services.glue.model.StorageDescriptor; @@ -64,9 +66,10 @@ public IcebergGlueCatalogTableBuilder(Configuration configuration) { @Override public TableInput getCreateTableRequest( InternalTable table, CatalogTableIdentifier tableIdentifier) { + HierarchicalTableIdentifier tblIdentifier = castToHierarchicalTableIdentifier(tableIdentifier); BaseTable fsTable = loadTableFromFs(table.getBasePath()); return TableInput.builder() - .name(tableIdentifier.getTableName()) + .name(tblIdentifier.getTableName()) .tableType(GLUE_EXTERNAL_TABLE_TYPE) .parameters(getTableParameters(fsTable)) .storageDescriptor( @@ -80,6 +83,7 @@ public TableInput getCreateTableRequest( @Override public TableInput getUpdateTableRequest( InternalTable table, Table catalogTable, CatalogTableIdentifier tableIdentifier) { + HierarchicalTableIdentifier tblIdentifier = castToHierarchicalTableIdentifier(tableIdentifier); BaseTable icebergTable = loadTableFromFs(table.getBasePath()); Map parameters = new HashMap<>(catalogTable.parameters()); parameters.put(PREVIOUS_METADATA_LOCATION_PROP, parameters.get(METADATA_LOCATION_PROP)); @@ -87,7 +91,7 @@ public TableInput getUpdateTableRequest( parameters.putAll(icebergTable.properties()); return TableInput.builder() - .name(tableIdentifier.getTableName()) + .name(tblIdentifier.getTableName()) .tableType(GLUE_EXTERNAL_TABLE_TYPE) .parameters(parameters) .storageDescriptor( diff --git a/xtable-core/src/test/java/org/apache/xtable/catalog/glue/GlueCatalogSyncTestBase.java b/xtable-core/src/test/java/org/apache/xtable/catalog/glue/GlueCatalogSyncTestBase.java index 7291dd6a..59b533f3 100644 --- a/xtable-core/src/test/java/org/apache/xtable/catalog/glue/GlueCatalogSyncTestBase.java +++ b/xtable-core/src/test/java/org/apache/xtable/catalog/glue/GlueCatalogSyncTestBase.java @@ -28,7 +28,7 @@ import org.apache.xtable.conversion.ExternalCatalogConfig; import org.apache.xtable.model.InternalTable; -import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier; import org.apache.xtable.model.schema.InternalSchema; import org.apache.xtable.model.storage.CatalogType; import org.apache.xtable.model.storage.TableFormat; @@ -72,11 +72,8 @@ public class GlueCatalogSyncTestBase { .tableFormat(TableFormat.HUDI) .readSchema(InternalSchema.builder().fields(Collections.emptyList()).build()) .build(); - protected static final CatalogTableIdentifier TEST_CATALOG_TABLE_IDENTIFIER = - CatalogTableIdentifier.builder() - .databaseName(TEST_GLUE_DATABASE) - .tableName(TEST_GLUE_TABLE) - .build(); + protected static final ThreePartHierarchicalTableIdentifier TEST_CATALOG_TABLE_IDENTIFIER = + new ThreePartHierarchicalTableIdentifier(TEST_GLUE_DATABASE, TEST_GLUE_TABLE); protected static final ExternalCatalogConfig catalogConfig = ExternalCatalogConfig.builder() .catalogId(TEST_CATALOG_NAME) diff --git a/xtable-core/src/test/java/org/apache/xtable/catalog/glue/TestGlueCatalogConversionSource.java b/xtable-core/src/test/java/org/apache/xtable/catalog/glue/TestGlueCatalogConversionSource.java index b658418b..1a1c99c2 100644 --- a/xtable-core/src/test/java/org/apache/xtable/catalog/glue/TestGlueCatalogConversionSource.java +++ b/xtable-core/src/test/java/org/apache/xtable/catalog/glue/TestGlueCatalogConversionSource.java @@ -38,7 +38,7 @@ import org.apache.xtable.conversion.SourceTable; import org.apache.xtable.exception.CatalogSyncException; -import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier; import org.apache.xtable.model.storage.TableFormat; import software.amazon.awssdk.services.glue.GlueClient; @@ -59,8 +59,8 @@ public class TestGlueCatalogConversionSource { private static final String GLUE_TABLE = "glue_tbl"; private static final String TABLE_BASE_PATH = "/var/data/table"; private static final String GLUE_CATALOG_ID = "aws-account-id"; - private static final CatalogTableIdentifier tableIdentifier = - CatalogTableIdentifier.builder().databaseName(GLUE_DB).tableName(GLUE_TABLE).build(); + private static final ThreePartHierarchicalTableIdentifier tableIdentifier = + new ThreePartHierarchicalTableIdentifier(GLUE_DB, GLUE_TABLE); private static final GetTableRequest getTableRequest = GetTableRequest.builder() .catalogId(GLUE_CATALOG_ID) diff --git a/xtable-core/src/test/java/org/apache/xtable/catalog/glue/TestGlueCatalogSyncClient.java b/xtable-core/src/test/java/org/apache/xtable/catalog/glue/TestGlueCatalogSyncClient.java index 9915f584..f09f50e7 100644 --- a/xtable-core/src/test/java/org/apache/xtable/catalog/glue/TestGlueCatalogSyncClient.java +++ b/xtable-core/src/test/java/org/apache/xtable/catalog/glue/TestGlueCatalogSyncClient.java @@ -46,7 +46,7 @@ import org.apache.xtable.catalog.CatalogTableBuilder; import org.apache.xtable.exception.CatalogSyncException; -import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier; import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest; import software.amazon.awssdk.services.glue.model.CreateDatabaseResponse; @@ -97,8 +97,7 @@ void testHasDatabase(boolean isDbPresent) { when(mockGlueClient.getDatabase(dbRequest)) .thenThrow(EntityNotFoundException.builder().message("db not found").build()); } - boolean output = - glueCatalogSyncClient.hasDatabase(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName()); + boolean output = glueCatalogSyncClient.hasDatabase(TEST_CATALOG_TABLE_IDENTIFIER); if (isDbPresent) { assertTrue(output); } else { @@ -115,8 +114,7 @@ void testHasDatabaseFailure() { CatalogSyncException exception = assertThrows( CatalogSyncException.class, - () -> - glueCatalogSyncClient.hasDatabase(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName())); + () -> glueCatalogSyncClient.hasDatabase(TEST_CATALOG_TABLE_IDENTIFIER)); assertEquals( String.format("Failed to get database: %s", TEST_GLUE_DATABASE), exception.getMessage()); verify(mockGlueClient, times(1)).getDatabase(dbRequest); @@ -185,16 +183,14 @@ void testCreateDatabase(boolean shouldFail) { CatalogSyncException exception = assertThrows( CatalogSyncException.class, - () -> - glueCatalogSyncClient.createDatabase( - TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName())); + () -> glueCatalogSyncClient.createDatabase(TEST_CATALOG_TABLE_IDENTIFIER)); assertEquals( String.format("Failed to create database: %s", TEST_GLUE_DATABASE), exception.getMessage()); } else { when(mockGlueClient.createDatabase(dbRequest)) .thenReturn(CreateDatabaseResponse.builder().build()); - glueCatalogSyncClient.createDatabase(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName()); + glueCatalogSyncClient.createDatabase(TEST_CATALOG_TABLE_IDENTIFIER); } verify(mockGlueClient, times(1)).createDatabase(dbRequest); } @@ -364,11 +360,9 @@ void testCreateOrReplaceTable() { TEST_CATALOG_TABLE_IDENTIFIER.getTableName() + "_temp" + ZonedDateTime.now().toEpochSecond(); - CatalogTableIdentifier tempTableIdentifier = - CatalogTableIdentifier.builder() - .databaseName(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName()) - .tableName(tempTableName) - .build(); + ThreePartHierarchicalTableIdentifier tempTableIdentifier = + new ThreePartHierarchicalTableIdentifier( + TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), tempTableName); TableInput tableInput = TableInput.builder().name(TEST_GLUE_TABLE).build(); TableInput tempTableInput = TableInput.builder().name(tempTableName).build(); CreateTableRequest origCreateTableRequest = diff --git a/xtable-utilities/pom.xml b/xtable-utilities/pom.xml index 25d55973..6fabfde7 100644 --- a/xtable-utilities/pom.xml +++ b/xtable-utilities/pom.xml @@ -107,8 +107,8 @@ hadoop-aws - com.amazonaws - aws-java-sdk-bundle + software.amazon.awssdk + bundle diff --git a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java index 2f60f33f..7317d265 100644 --- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java +++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java @@ -30,6 +30,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.function.Function; import java.util.stream.Collectors; @@ -58,6 +59,7 @@ import org.apache.xtable.conversion.SourceTable; import org.apache.xtable.conversion.TargetCatalogConfig; import org.apache.xtable.conversion.TargetTable; +import org.apache.xtable.hudi.HudiSourceConfig; import org.apache.xtable.model.catalog.CatalogTableIdentifier; import org.apache.xtable.model.catalog.HierarchicalTableIdentifier; import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier; @@ -139,15 +141,17 @@ public static void main(String[] args) throws Exception { datasetConfig.getTargetCatalogs().stream() .map(RunCatalogSync::populateCatalogImplementations) .collect(Collectors.toMap(ExternalCatalogConfig::getCatalogId, Function.identity())); - CatalogConversionSource catalogConversionSource = - CatalogConversionFactory.createCatalogConversionSource( - datasetConfig.getSourceCatalog(), hadoopConf); ConversionController conversionController = new ConversionController(hadoopConf); for (DatasetConfig.Dataset dataset : datasetConfig.getDatasets()) { SourceTable sourceTable = null; if (dataset.getSourceCatalogTableIdentifier().getStorageIdentifier() != null) { StorageIdentifier storageIdentifier = dataset.getSourceCatalogTableIdentifier().getStorageIdentifier(); + Properties sourceProperties = new Properties(); + if (storageIdentifier.getPartitionSpec() != null) { + sourceProperties.put( + HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG, storageIdentifier.getPartitionSpec()); + } sourceTable = SourceTable.builder() .name(storageIdentifier.getTableName()) @@ -158,8 +162,12 @@ public static void main(String[] args) throws Exception { : storageIdentifier.getNamespace().split("\\.")) .dataPath(storageIdentifier.getTableDataPath()) .formatName(storageIdentifier.getTableFormat()) + .additionalProperties(sourceProperties) .build(); } else { + CatalogConversionSource catalogConversionSource = + CatalogConversionFactory.createCatalogConversionSource( + datasetConfig.getSourceCatalog(), hadoopConf); sourceTable = catalogConversionSource.getSourceTable( getCatalogTableIdentifier( @@ -175,6 +183,7 @@ public static void main(String[] args) throws Exception { .basePath(sourceTable.getBasePath()) .namespace(sourceTable.getNamespace()) .formatName(targetCatalogTableIdentifier.getTableFormat()) + .additionalProperties(sourceTable.getAdditionalProperties()) .build(); targetTables.add(targetTable); if (!targetCatalogs.containsKey(targetTable)) {