Skip to content

Commit

Permalink
Add tests and some refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
kroushan-nit committed Dec 26, 2024
1 parent b4f74e0 commit 71ed9f0
Show file tree
Hide file tree
Showing 6 changed files with 280 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@

package org.apache.xtable.catalog;

import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
import static org.apache.xtable.catalog.Constants.PROP_SPARK_SQL_SOURCES_PROVIDER;

import java.util.Map;

import org.apache.iceberg.TableProperties;

import com.google.common.base.Strings;

import org.apache.xtable.exception.NotSupportedException;
import org.apache.xtable.model.storage.TableFormat;

Expand All @@ -32,6 +37,7 @@ public static String getTableDataLocation(
switch (tableFormat) {
case TableFormat.ICEBERG:
return getIcebergDataLocation(tableLocation, properties);
case TableFormat.DELTA:
case TableFormat.HUDI:
return tableLocation;
default:
Expand All @@ -54,4 +60,17 @@ private static String getIcebergDataLocation(
}
return dataLocation;
}

// Get table format name from table properties
public static String getTableFormat(Map<String, String> properties) {
// - In case of ICEBERG, table_type param will give the table format
// - In case of DELTA, table_type or spark.sql.sources.provider param will give the table
// format
// - In case of HUDI, spark.sql.sources.provider param will give the table format
String tableFormat = properties.get(TABLE_TYPE_PROP);
if (Strings.isNullOrEmpty(tableFormat)) {
tableFormat = properties.get(PROP_SPARK_SQL_SOURCES_PROVIDER);
}
return tableFormat;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.xtable.catalog.glue;

import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;

import java.util.Locale;
import java.util.Properties;

Expand All @@ -36,6 +34,7 @@
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.glue.model.GetTableRequest;
import software.amazon.awssdk.services.glue.model.GetTableResponse;
import software.amazon.awssdk.services.glue.model.GlueException;
import software.amazon.awssdk.services.glue.model.Table;

public class GlueCatalogConversionSource implements CatalogConversionSource {
Expand Down Expand Up @@ -65,12 +64,13 @@ public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) {
.build());
Table table = response.table();
if (table == null) {
throw new IllegalStateException(String.format("table: %s not found", tableIdentifier));
throw new IllegalStateException(String.format("table: %s is null", tableIdentifier));
}

String tableFormat = table.parameters().get(TABLE_TYPE_PROP);
if (!Strings.isNullOrEmpty(tableFormat)) {
throw new IllegalStateException("TableFormat must not be null or empty");
String tableFormat = TableFormatUtils.getTableFormat(table.parameters());
if (Strings.isNullOrEmpty(tableFormat)) {
throw new IllegalStateException(
String.format("TableFormat is null or empty for table: %s", tableIdentifier));
}
tableFormat = tableFormat.toUpperCase(Locale.ENGLISH);

Expand All @@ -87,7 +87,7 @@ public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) {
.formatName(tableFormat)
.additionalProperties(tableProperties)
.build();
} catch (Exception e) {
} catch (GlueException e) {
throw new CatalogSyncException("Failed to get table: " + tableIdentifier, e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,25 @@

package org.apache.xtable.catalog;

import static org.apache.xtable.catalog.Constants.PROP_SPARK_SQL_SOURCES_PROVIDER;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.junit.jupiter.api.Test;

import org.apache.iceberg.TableProperties;

import org.apache.xtable.model.storage.TableFormat;

public class TestTableFormatUtils {
class TestTableFormatUtils {

@Test
void testGetTableDataLocation_Hudi() {
// For Hudi, data location should always be tableLocation
void testGetTableDataLocation_HudiDelta() {
// For Hudi and Delta, data location should be tableLocation
String tableLocation = "base-path";
assertEquals(
tableLocation,
Expand All @@ -49,7 +53,7 @@ void testGetTableDataLocation_Hudi() {
@Test
void testGetTableDataLocation_Iceberg() {
// For Iceberg, data location will be WRITE_DATA_LOCATION / OBJECT_STORE_PATH param or
// tableLocation/data
// "tableLocation/data"
String tableLocation = "base-path";

// no params is set
Expand All @@ -76,4 +80,21 @@ void testGetTableDataLocation_Iceberg() {
tableLocation,
Collections.singletonMap(TableProperties.OBJECT_STORE_PATH, objectStorePath)));
}

@Test
void testGetTableFormat() {
Map<String, String> params = new HashMap<>();

// table format is null when table type param in not present
assertNull(TableFormatUtils.getTableFormat(params));

// "table_type" is set
params.put("table_type", TableFormat.ICEBERG);
assertEquals(TableFormat.ICEBERG, TableFormatUtils.getTableFormat(params));

params.clear();
// "spark.sql.sources.provider" is set
params.put(PROP_SPARK_SQL_SOURCES_PROVIDER, TableFormat.DELTA);
assertEquals(TableFormat.DELTA, TableFormatUtils.getTableFormat(params));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import software.amazon.awssdk.services.glue.model.DeleteTableRequest;
import software.amazon.awssdk.services.glue.model.GetDatabaseRequest;
import software.amazon.awssdk.services.glue.model.GetTableRequest;
import software.amazon.awssdk.services.glue.model.GlueException;
import software.amazon.awssdk.services.glue.model.StorageDescriptor;
import software.amazon.awssdk.services.glue.model.Table;
import software.amazon.awssdk.services.glue.model.TableInput;
Expand Down Expand Up @@ -84,6 +85,8 @@ public class GlueCatalogSyncTestBase {
.catalogProperties(Collections.emptyMap())
.build();
protected static final TableInput TEST_TABLE_INPUT = TableInput.builder().build();
protected static final GlueException TEST_GLUE_EXCEPTION =
(GlueException) GlueException.builder().message("something went wrong").build();

protected GetDatabaseRequest getDbRequest(String dbName) {
return GetDatabaseRequest.builder().catalogId(TEST_GLUE_CATALOG_ID).name(dbName).build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.catalog.glue;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import org.apache.xtable.conversion.SourceTable;
import org.apache.xtable.exception.CatalogSyncException;
import org.apache.xtable.model.catalog.CatalogTableIdentifier;
import org.apache.xtable.model.storage.TableFormat;

import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
import software.amazon.awssdk.services.glue.model.GetTableRequest;
import software.amazon.awssdk.services.glue.model.GetTableResponse;
import software.amazon.awssdk.services.glue.model.GlueException;
import software.amazon.awssdk.services.glue.model.StorageDescriptor;
import software.amazon.awssdk.services.glue.model.Table;

@ExtendWith(MockitoExtension.class)
public class TestGlueCatalogConversionSource {

@Mock private GlueCatalogConfig mockCatalogConfig;
@Mock private GlueClient mockGlueClient;
private GlueCatalogConversionSource catalogConversionSource;
private static final String GLUE_DB = "glue_db";
private static final String GLUE_TABLE = "glue_tbl";
private static final String TABLE_BASE_PATH = "/var/data/table";
private static final String GLUE_CATALOG_ID = "aws-account-id";
private static final CatalogTableIdentifier tableIdentifier =
CatalogTableIdentifier.builder().databaseName(GLUE_DB).tableName(GLUE_TABLE).build();
private static final GetTableRequest getTableRequest =
GetTableRequest.builder()
.catalogId(GLUE_CATALOG_ID)
.databaseName(GLUE_DB)
.name(GLUE_TABLE)
.build();

@BeforeEach
void init() {
when(mockCatalogConfig.getCatalogId()).thenReturn(GLUE_CATALOG_ID);
catalogConversionSource = new GlueCatalogConversionSource(mockCatalogConfig, mockGlueClient);
}

@Test
void testGetSourceTable_errorGettingTableFromGlue() {
// error getting table from glue
when(mockGlueClient.getTable(getTableRequest))
.thenThrow(GlueException.builder().message("something went wrong").build());
assertThrows(
CatalogSyncException.class, () -> catalogConversionSource.getSourceTable(tableIdentifier));

verify(mockGlueClient, times(1)).getTable(getTableRequest);
}

@Test
void testGetSourceTable_tableNotFoundInGlue() {
// table not found in glue
when(mockGlueClient.getTable(getTableRequest))
.thenThrow(EntityNotFoundException.builder().message("table not found").build());
assertThrows(
CatalogSyncException.class, () -> catalogConversionSource.getSourceTable(tableIdentifier));

verify(mockGlueClient, times(1)).getTable(getTableRequest);
}

@Test
void testGetSourceTable_tableFormatNotPresent() {
// table format not present in table properties
when(mockGlueClient.getTable(getTableRequest))
.thenReturn(GetTableResponse.builder().table(Table.builder().build()).build());
IllegalStateException exception =
assertThrows(
IllegalStateException.class,
() -> catalogConversionSource.getSourceTable(tableIdentifier));
assertEquals(
"TableFormat is null or empty for table: glue_db.glue_tbl", exception.getMessage());

verify(mockGlueClient, times(1)).getTable(getTableRequest);
}

@ParameterizedTest
@CsvSource(value = {"ICEBERG", "HUDI", "DELTA"})
void testGetSourceTable(String tableFormat) {
StorageDescriptor sd = StorageDescriptor.builder().location(TABLE_BASE_PATH).build();
Map<String, String> tableParams = new HashMap<>();
if (tableFormat.equals(TableFormat.ICEBERG)) {
tableParams.put("write.data.path", String.format("%s/iceberg", TABLE_BASE_PATH));
tableParams.put("table_type", tableFormat);
} else {
tableParams.put("spark.sql.sources.provider", tableFormat);
}

String dataPath =
tableFormat.equals(TableFormat.ICEBERG)
? String.format("%s/iceberg", TABLE_BASE_PATH)
: TABLE_BASE_PATH;
SourceTable expected =
newSourceTable(GLUE_TABLE, TABLE_BASE_PATH, dataPath, tableFormat, tableParams);
when(mockGlueClient.getTable(getTableRequest))
.thenReturn(
GetTableResponse.builder()
.table(newGlueTable(GLUE_DB, GLUE_TABLE, tableParams, sd))
.build());
SourceTable output = catalogConversionSource.getSourceTable(tableIdentifier);
assertEquals(expected, output);

verify(mockGlueClient, times(1)).getTable(getTableRequest);
}

private Table newGlueTable(
String dbName, String tableName, Map<String, String> params, StorageDescriptor sd) {
return Table.builder()
.databaseName(dbName)
.name(tableName)
.parameters(params)
.storageDescriptor(sd)
.build();
}

private SourceTable newSourceTable(
String tblName,
String basePath,
String dataPath,
String tblFormat,
Map<String, String> params) {
Properties tblProperties = new Properties();
tblProperties.putAll(params);
return SourceTable.builder()
.name(tblName)
.basePath(basePath)
.dataPath(dataPath)
.formatName(tblFormat)
.additionalProperties(tblProperties)
.build();
}
}
Loading

0 comments on commit 71ed9f0

Please sign in to comment.