Skip to content

Commit

Permalink
Changes post rebasing remote changes
Browse files Browse the repository at this point in the history
  • Loading branch information
kroushan-nit committed Dec 27, 2024
1 parent 5daece8 commit 47f49fa
Show file tree
Hide file tree
Showing 14 changed files with 126 additions and 63 deletions.
8 changes: 4 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
<junit.version>5.9.0</junit.version>
<lombok.version>1.18.30</lombok.version>
<lombok-maven-plugin.version>1.18.20.0</lombok-maven-plugin.version>
<hadoop.version>3.4.0</hadoop.version>
<hadoop.version>3.3.6</hadoop.version>
<hudi.version>0.14.0</hudi.version>
<aws.version>2.28.22</aws.version>
<maven-source-plugin.version>3.3.1</maven-source-plugin.version>
Expand Down Expand Up @@ -373,9 +373,9 @@
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bundle</artifactId>
<version>1.12.328</version>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bundle</artifactId>
<version>${aws.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ public class ConversionConfig {
SyncMode syncMode) {
this.sourceTable = sourceTable;
this.targetTables = targetTables;
this.targetCatalogs = targetCatalogs;
Preconditions.checkArgument(
targetTables != null && !targetTables.isEmpty(),
"Please provide at-least one format to sync");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ private <TABLE> CatalogSyncStatus syncCatalog(
CatalogSyncClient<TABLE> catalogSyncClient,
CatalogTableIdentifier tableIdentifier,
InternalTable table) {
log.info(
"Running catalog sync for table {} with format {} using catalogSync {}",
table.getBasePath(),
table.getTableFormat(),
catalogSyncClient.getClass().getName());
if (!catalogSyncClient.hasDatabase(tableIdentifier)) {
catalogSyncClient.createDatabase(tableIdentifier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,23 @@ class TestHierarchicalTableIdentifier {
@Test
void testToString() {
HierarchicalTableIdentifier catalogTableIdentifier =
new HierarchicalTableIdentifier("catalogName.databaseName.tableName");
ThreePartHierarchicalTableIdentifier.fromDotSeparatedIdentifier(
"catalogName.databaseName.tableName");
assertEquals("catalogName.databaseName.tableName", catalogTableIdentifier.getId());
}

@Test
void testConstructorForHierarchicalTableIdentifier() {
Assertions.assertDoesNotThrow(
() -> new HierarchicalTableIdentifier("catalogName.databaseName.tableName"));
Assertions.assertDoesNotThrow(() -> new HierarchicalTableIdentifier("databaseName.tableName"));
() ->
ThreePartHierarchicalTableIdentifier.fromDotSeparatedIdentifier(
"catalogName.databaseName.tableName"));
Assertions.assertDoesNotThrow(
() ->
ThreePartHierarchicalTableIdentifier.fromDotSeparatedIdentifier(
"databaseName.tableName"));
Assertions.assertThrows(
IllegalArgumentException.class, () -> new HierarchicalTableIdentifier("tableName"));
IllegalArgumentException.class,
() -> ThreePartHierarchicalTableIdentifier.fromDotSeparatedIdentifier("tableName"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public CatalogSyncClient createCatalogSyncClient(
return ReflectionUtils.createInstanceOfClass(
targetCatalogConfig.getCatalogSyncClientImpl(),
targetCatalogConfig,
tableFormat,
configuration);
configuration,
tableFormat);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.catalog;

import org.apache.xtable.model.catalog.CatalogTableIdentifier;
import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;

public class CatalogUtils {

public static HierarchicalTableIdentifier castToHierarchicalTableIdentifier(
CatalogTableIdentifier tableIdentifier) {
if (tableIdentifier instanceof HierarchicalTableIdentifier) {
return (HierarchicalTableIdentifier) tableIdentifier;
}
throw new IllegalArgumentException("Invalid tableIdentifier implementation");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@

package org.apache.xtable.catalog.glue;

import static org.apache.xtable.catalog.CatalogUtils.castToHierarchicalTableIdentifier;

import java.util.Locale;
import java.util.Properties;

import javax.security.auth.login.Configuration;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;

Expand All @@ -29,6 +33,7 @@
import org.apache.xtable.conversion.SourceTable;
import org.apache.xtable.exception.CatalogSyncException;
import org.apache.xtable.model.catalog.CatalogTableIdentifier;
import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
import org.apache.xtable.spi.extractor.CatalogConversionSource;

import software.amazon.awssdk.services.glue.GlueClient;
Expand All @@ -41,7 +46,8 @@ public class GlueCatalogConversionSource implements CatalogConversionSource {
private final GlueClient glueClient;
private final GlueCatalogConfig glueCatalogConfig;

public GlueCatalogConversionSource(ExternalCatalogConfig catalogConfig) {
public GlueCatalogConversionSource(
ExternalCatalogConfig catalogConfig, Configuration configuration) {
this.glueCatalogConfig = GlueCatalogConfig.of(catalogConfig.getCatalogProperties());
this.glueClient = new DefaultGlueClientFactory(glueCatalogConfig).getGlueClient();
}
Expand All @@ -54,13 +60,14 @@ public GlueCatalogConversionSource(GlueCatalogConfig glueCatalogConfig, GlueClie

@Override
public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) {
HierarchicalTableIdentifier tblIdentifier = castToHierarchicalTableIdentifier(tableIdentifier);
try {
GetTableResponse response =
glueClient.getTable(
GetTableRequest.builder()
.catalogId(glueCatalogConfig.getCatalogId())
.databaseName(tableIdentifier.getDatabaseName())
.name(tableIdentifier.getTableName())
.databaseName(tblIdentifier.getDatabaseName())
.name(tblIdentifier.getTableName())
.build());
Table table = response.table();
if (table == null) {
Expand All @@ -70,7 +77,7 @@ public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) {
String tableFormat = TableFormatUtils.getTableFormat(table.parameters());
if (Strings.isNullOrEmpty(tableFormat)) {
throw new IllegalStateException(
String.format("TableFormat is null or empty for table: %s", tableIdentifier));
String.format("TableFormat is null or empty for table: %s", tableIdentifier.getId()));
}
tableFormat = tableFormat.toUpperCase(Locale.ENGLISH);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.xtable.catalog.glue;

import static org.apache.xtable.catalog.CatalogUtils.castToHierarchicalTableIdentifier;

import java.time.ZonedDateTime;

import lombok.extern.log4j.Log4j2;
Expand All @@ -31,6 +33,8 @@
import org.apache.xtable.exception.CatalogSyncException;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.catalog.CatalogTableIdentifier;
import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier;
import org.apache.xtable.spi.sync.CatalogSyncClient;

import software.amazon.awssdk.services.glue.GlueClient;
Expand Down Expand Up @@ -96,7 +100,8 @@ public String getStorageLocation(Table table) {
}

@Override
public boolean hasDatabase(String databaseName) {
public boolean hasDatabase(CatalogTableIdentifier tableIdentifier) {
String databaseName = castToHierarchicalTableIdentifier(tableIdentifier).getDatabaseName();
try {
return glueClient
.getDatabase(
Expand All @@ -114,7 +119,8 @@ public boolean hasDatabase(String databaseName) {
}

@Override
public void createDatabase(String databaseName) {
public void createDatabase(CatalogTableIdentifier tableIdentifier) {
String databaseName = castToHierarchicalTableIdentifier(tableIdentifier).getDatabaseName();
try {
glueClient.createDatabase(
CreateDatabaseRequest.builder()
Expand All @@ -132,52 +138,55 @@ public void createDatabase(String databaseName) {

@Override
public Table getTable(CatalogTableIdentifier tableIdentifier) {
HierarchicalTableIdentifier tblIdentifier = castToHierarchicalTableIdentifier(tableIdentifier);
try {
GetTableResponse response =
glueClient.getTable(
GetTableRequest.builder()
.catalogId(glueCatalogConfig.getCatalogId())
.databaseName(tableIdentifier.getDatabaseName())
.name(tableIdentifier.getTableName())
.databaseName(tblIdentifier.getDatabaseName())
.name(tblIdentifier.getTableName())
.build());
return response.table();
} catch (EntityNotFoundException e) {
return null;
} catch (Exception e) {
throw new CatalogSyncException("Failed to get table: " + tableIdentifier, e);
throw new CatalogSyncException("Failed to get table: " + tblIdentifier.getId(), e);
}
}

@Override
public void createTable(InternalTable table, CatalogTableIdentifier tableIdentifier) {
HierarchicalTableIdentifier tblIdentifier = castToHierarchicalTableIdentifier(tableIdentifier);
TableInput tableInput = tableBuilder.getCreateTableRequest(table, tableIdentifier);
try {
glueClient.createTable(
CreateTableRequest.builder()
.catalogId(glueCatalogConfig.getCatalogId())
.databaseName(tableIdentifier.getDatabaseName())
.databaseName(tblIdentifier.getDatabaseName())
.tableInput(tableInput)
.build());
} catch (Exception e) {
throw new CatalogSyncException("Failed to create table: " + tableIdentifier, e);
throw new CatalogSyncException("Failed to create table: " + tblIdentifier.getId(), e);
}
}

@Override
public void refreshTable(
InternalTable table, Table catalogTable, CatalogTableIdentifier tableIdentifier) {
HierarchicalTableIdentifier tblIdentifier = castToHierarchicalTableIdentifier(tableIdentifier);
TableInput tableInput =
tableBuilder.getUpdateTableRequest(table, catalogTable, tableIdentifier);
try {
glueClient.updateTable(
UpdateTableRequest.builder()
.catalogId(glueCatalogConfig.getCatalogId())
.databaseName(tableIdentifier.getDatabaseName())
.databaseName(tblIdentifier.getDatabaseName())
.skipArchive(true)
.tableInput(tableInput)
.build());
} catch (Exception e) {
throw new CatalogSyncException("Failed to refresh table: " + tableIdentifier, e);
throw new CatalogSyncException("Failed to refresh table: " + tblIdentifier.getId(), e);
}
}

Expand All @@ -191,15 +200,16 @@ public void createOrReplaceTable(InternalTable table, CatalogTableIdentifier tab

@Override
public void dropTable(InternalTable table, CatalogTableIdentifier tableIdentifier) {
HierarchicalTableIdentifier tblIdentifier = castToHierarchicalTableIdentifier(tableIdentifier);
try {
glueClient.deleteTable(
DeleteTableRequest.builder()
.catalogId(glueCatalogConfig.getCatalogId())
.databaseName(tableIdentifier.getDatabaseName())
.name(tableIdentifier.getTableName())
.databaseName(tblIdentifier.getDatabaseName())
.name(tblIdentifier.getTableName())
.build());
} catch (Exception e) {
throw new CatalogSyncException("Failed to drop table: " + tableIdentifier, e);
throw new CatalogSyncException("Failed to drop table: " + tableIdentifier.getId(), e);
}
}

Expand All @@ -217,13 +227,11 @@ public void close() throws Exception {
*/
private void validateTempTableCreation(
InternalTable table, CatalogTableIdentifier tableIdentifier) {
HierarchicalTableIdentifier tblIdentifier = castToHierarchicalTableIdentifier(tableIdentifier);
String tempTableName =
tableIdentifier.getTableName() + TEMP_SUFFIX + ZonedDateTime.now().toEpochSecond();
CatalogTableIdentifier tempTableIdentifier =
CatalogTableIdentifier.builder()
.tableName(tempTableName)
.databaseName(tableIdentifier.getDatabaseName())
.build();
tblIdentifier.getTableName() + TEMP_SUFFIX + ZonedDateTime.now().toEpochSecond();
ThreePartHierarchicalTableIdentifier tempTableIdentifier =
new ThreePartHierarchicalTableIdentifier(tblIdentifier.getDatabaseName(), tempTableName);
createTable(table, tempTableIdentifier);
dropTable(table, tempTableIdentifier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP;
import static org.apache.iceberg.BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP;
import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
import static org.apache.xtable.catalog.CatalogUtils.castToHierarchicalTableIdentifier;
import static org.apache.xtable.catalog.glue.GlueCatalogSyncClient.GLUE_EXTERNAL_TABLE_TYPE;

import java.util.HashMap;
Expand All @@ -37,6 +38,7 @@
import org.apache.xtable.catalog.glue.GlueSchemaExtractor;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.catalog.CatalogTableIdentifier;
import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
import org.apache.xtable.model.storage.TableFormat;

import software.amazon.awssdk.services.glue.model.StorageDescriptor;
Expand Down Expand Up @@ -64,9 +66,10 @@ public IcebergGlueCatalogTableBuilder(Configuration configuration) {
@Override
public TableInput getCreateTableRequest(
InternalTable table, CatalogTableIdentifier tableIdentifier) {
HierarchicalTableIdentifier tblIdentifier = castToHierarchicalTableIdentifier(tableIdentifier);
BaseTable fsTable = loadTableFromFs(table.getBasePath());
return TableInput.builder()
.name(tableIdentifier.getTableName())
.name(tblIdentifier.getTableName())
.tableType(GLUE_EXTERNAL_TABLE_TYPE)
.parameters(getTableParameters(fsTable))
.storageDescriptor(
Expand All @@ -80,14 +83,15 @@ public TableInput getCreateTableRequest(
@Override
public TableInput getUpdateTableRequest(
InternalTable table, Table catalogTable, CatalogTableIdentifier tableIdentifier) {
HierarchicalTableIdentifier tblIdentifier = castToHierarchicalTableIdentifier(tableIdentifier);
BaseTable icebergTable = loadTableFromFs(table.getBasePath());
Map<String, String> parameters = new HashMap<>(catalogTable.parameters());
parameters.put(PREVIOUS_METADATA_LOCATION_PROP, parameters.get(METADATA_LOCATION_PROP));
parameters.put(METADATA_LOCATION_PROP, getMetadataFileLocation(icebergTable));
parameters.putAll(icebergTable.properties());

return TableInput.builder()
.name(tableIdentifier.getTableName())
.name(tblIdentifier.getTableName())
.tableType(GLUE_EXTERNAL_TABLE_TYPE)
.parameters(parameters)
.storageDescriptor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

import org.apache.xtable.conversion.ExternalCatalogConfig;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.catalog.CatalogTableIdentifier;
import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.storage.CatalogType;
import org.apache.xtable.model.storage.TableFormat;
Expand Down Expand Up @@ -72,11 +72,8 @@ public class GlueCatalogSyncTestBase {
.tableFormat(TableFormat.HUDI)
.readSchema(InternalSchema.builder().fields(Collections.emptyList()).build())
.build();
protected static final CatalogTableIdentifier TEST_CATALOG_TABLE_IDENTIFIER =
CatalogTableIdentifier.builder()
.databaseName(TEST_GLUE_DATABASE)
.tableName(TEST_GLUE_TABLE)
.build();
protected static final ThreePartHierarchicalTableIdentifier TEST_CATALOG_TABLE_IDENTIFIER =
new ThreePartHierarchicalTableIdentifier(TEST_GLUE_DATABASE, TEST_GLUE_TABLE);
protected static final ExternalCatalogConfig catalogConfig =
ExternalCatalogConfig.builder()
.catalogId(TEST_CATALOG_NAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

import org.apache.xtable.conversion.SourceTable;
import org.apache.xtable.exception.CatalogSyncException;
import org.apache.xtable.model.catalog.CatalogTableIdentifier;
import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier;
import org.apache.xtable.model.storage.TableFormat;

import software.amazon.awssdk.services.glue.GlueClient;
Expand All @@ -59,8 +59,8 @@ public class TestGlueCatalogConversionSource {
private static final String GLUE_TABLE = "glue_tbl";
private static final String TABLE_BASE_PATH = "/var/data/table";
private static final String GLUE_CATALOG_ID = "aws-account-id";
private static final CatalogTableIdentifier tableIdentifier =
CatalogTableIdentifier.builder().databaseName(GLUE_DB).tableName(GLUE_TABLE).build();
private static final ThreePartHierarchicalTableIdentifier tableIdentifier =
new ThreePartHierarchicalTableIdentifier(GLUE_DB, GLUE_TABLE);
private static final GetTableRequest getTableRequest =
GetTableRequest.builder()
.catalogId(GLUE_CATALOG_ID)
Expand Down
Loading

0 comments on commit 47f49fa

Please sign in to comment.