Skip to content

Commit

Permalink
Introduce catalogSyncClientImpl and catalogConversionSourceImpl
Browse files Browse the repository at this point in the history
  • Loading branch information
vinishjail97 committed Dec 19, 2024
1 parent ced6288 commit 6e09f5d
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@
import lombok.Value;

/**
* Defines the configuration for an external catalog, user needs to populate at-least one of
* catalogType or catalogImpl
* Defines the configuration for an external catalog, user needs to populate at-least one of {@link
* ExternalCatalogConfig#catalogType} or {@link ExternalCatalogConfig#catalogSyncClientImpl}
*/
@Value
@Builder
public class ExternalCatalogConfig {
/** The name of the catalog, it also acts as a unique identifier for each catalog */
/**
* A user-defined unique identifier for the catalog, allows user to sync table to multiple
* catalogs of the same name/type eg: HMS catalog with url1, HMS catalog with url2
*/
@NonNull String catalogId;

/**
Expand All @@ -42,13 +45,21 @@ public class ExternalCatalogConfig {
String catalogType;

/**
* (Optional) A fully qualified class name that implements the interfaces for CatalogSyncClient,
* it can be used if the implementation for catalogType doesn't exist in XTable.
* (Optional) A fully qualified class name that implements the interface for {@link
* org.apache.xtable.spi.sync.CatalogSyncClient}, it can be used if the implementation for
* catalogType doesn't exist in XTable.
*/
String catalogSyncClientImpl;

/**
* (Optional) A fully qualified class name that implements the interface for {@link
* org.apache.xtable.spi.extractor.CatalogConversionSource} it can be used if the implementation
* for catalogType doesn't exist in XTable.
*/
String catalogImpl;
String catalogConversionSourceImpl;

/**
* The properties for each catalog, used for providing any custom behaviour during catalog sync
*/
@NonNull @Builder.Default Map<String, String> catalogOptions = Collections.emptyMap();
@NonNull @Builder.Default Map<String, String> catalogProperties = Collections.emptyMap();
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public static CatalogConversionFactory getInstance() {
public static CatalogConversionSource createCatalogConversionSource(
ExternalCatalogConfig sourceCatalogConfig, Configuration configuration) {
return ReflectionUtils.createInstanceOfClass(
sourceCatalogConfig.getCatalogImpl(), sourceCatalogConfig, configuration);
sourceCatalogConfig.getCatalogConversionSourceImpl(), sourceCatalogConfig, configuration);
}

/**
Expand All @@ -53,8 +53,11 @@ public static CatalogConversionSource createCatalogConversionSource(
* @param configuration hadoop configuration
*/
public CatalogSyncClient createCatalogSyncClient(
ExternalCatalogConfig targetCatalogConfig, Configuration configuration) {
ExternalCatalogConfig targetCatalogConfig, String tableFormat, Configuration configuration) {
return ReflectionUtils.createInstanceOfClass(
targetCatalogConfig.getCatalogImpl(), targetCatalogConfig, configuration);
targetCatalogConfig.getCatalogSyncClientImpl(),
targetCatalogConfig,
tableFormat,
configuration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@ public class ExternalCatalogConfigFactory {
public static ExternalCatalogConfig fromCatalogType(
String catalogType, String catalogId, Map<String, String> properties) {
// TODO: Choose existing implementation based on catalogType.
String catalogImpl = "";
String catalogSyncClientImpl = "";
String catalogConversionSourceImpl = "";
return ExternalCatalogConfig.builder()
.catalogImpl(catalogImpl)
.catalogType(catalogType)
.catalogSyncClientImpl(catalogSyncClientImpl)
.catalogConversionSourceImpl(catalogConversionSourceImpl)
.catalogId(catalogId)
.catalogOptions(properties)
.catalogProperties(properties)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ public Map<String, SyncResult> syncTableAcrossCatalogs(
TargetCatalogConfig::getCatalogTableIdentifier,
targetCatalog ->
catalogConversionFactory.createCatalogSyncClient(
targetCatalog.getCatalogConfig(), conf)));
targetCatalog.getCatalogConfig(),
targetTable.getFormatName(),
conf)));
catalogSyncResults.put(
targetTable.getFormatName(),
syncCatalogsForTargetTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
import org.apache.xtable.model.catalog.CatalogTableIdentifier;
import org.apache.xtable.spi.extractor.CatalogConversionSource;
import org.apache.xtable.spi.sync.CatalogSyncClient;
import org.apache.xtable.testutil.ITTestUtils.TestCatalogImpl;
import org.apache.xtable.testutil.ITTestUtils.TestCatalogConversionSourceImpl;
import org.apache.xtable.testutil.ITTestUtils.TestCatalogSyncImpl;

class TestCatalogConversionFactory {

Expand All @@ -39,12 +40,14 @@ void createSourceForConfig() {
ExternalCatalogConfig sourceCatalog =
ExternalCatalogConfig.builder()
.catalogId("catalogId")
.catalogImpl(TestCatalogImpl.class.getName())
.catalogOptions(Collections.emptyMap())
.catalogConversionSourceImpl(TestCatalogConversionSourceImpl.class.getName())
.catalogProperties(Collections.emptyMap())
.build();
CatalogConversionSource catalogConversionSource =
CatalogConversionFactory.createCatalogConversionSource(sourceCatalog, new Configuration());
assertEquals(catalogConversionSource.getClass().getName(), TestCatalogImpl.class.getName());
assertEquals(
catalogConversionSource.getClass().getName(),
TestCatalogConversionSourceImpl.class.getName());
}

@Test
Expand All @@ -54,8 +57,8 @@ void createForCatalog() {
.catalogConfig(
ExternalCatalogConfig.builder()
.catalogId("catalogId")
.catalogImpl(TestCatalogImpl.class.getName())
.catalogOptions(Collections.emptyMap())
.catalogSyncClientImpl(TestCatalogSyncImpl.class.getName())
.catalogProperties(Collections.emptyMap())
.build())
.catalogTableIdentifier(
CatalogTableIdentifier.builder()
Expand All @@ -65,7 +68,8 @@ void createForCatalog() {
.build();
CatalogSyncClient catalogSyncClient =
CatalogConversionFactory.getInstance()
.createCatalogSyncClient(targetCatalogConfig.getCatalogConfig(), new Configuration());
assertEquals(catalogSyncClient.getClass().getName(), TestCatalogImpl.class.getName());
.createCatalogSyncClient(
targetCatalogConfig.getCatalogConfig(), "TABLE_FORMAT", new Configuration());
assertEquals(catalogSyncClient.getClass().getName(), TestCatalogSyncImpl.class.getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -459,10 +459,10 @@ void testNoTableFormatConversionWithMultipleCatalogSync() {
.thenReturn(tableFormatSyncResults);
// Mocks for catalogSync.
when(mockCatalogConversionFactory.createCatalogSyncClient(
targetCatalogs.get(0).getCatalogConfig(), mockConf))
eq(targetCatalogs.get(0).getCatalogConfig()), any(), eq(mockConf)))
.thenReturn(mockCatalogSyncClient1);
when(mockCatalogConversionFactory.createCatalogSyncClient(
targetCatalogs.get(1).getCatalogConfig(), mockConf))
eq(targetCatalogs.get(1).getCatalogConfig()), any(), eq(mockConf)))
.thenReturn(mockCatalogSyncClient2);
when(catalogSync.syncTable(
eq(
Expand Down Expand Up @@ -590,8 +590,8 @@ private TargetCatalogConfig getTargetCatalog(String suffix) {
.catalogConfig(
ExternalCatalogConfig.builder()
.catalogId("catalogId-" + suffix)
.catalogImpl("catalogImpl-" + suffix)
.catalogOptions(Collections.emptyMap())
.catalogSyncClientImpl("catalogImpl-" + suffix)
.catalogProperties(Collections.emptyMap())
.build())
.catalogTableIdentifier(
CatalogTableIdentifier.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,10 @@ public static void validateTable(
Assertions.assertEquals(partitioningFields, internalTable.getPartitioningFields());
}

public static class TestCatalogImpl implements CatalogConversionSource, CatalogSyncClient {
public static class TestCatalogSyncImpl implements CatalogSyncClient {

public TestCatalogImpl(ExternalCatalogConfig catalogConfig, Configuration hadoopConf) {}

@Override
public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) {
return SourceTable.builder()
.name("source_table_name")
.basePath("file://base_path/v1/")
.formatName("ICEBERG")
.build();
}
public TestCatalogSyncImpl(
ExternalCatalogConfig catalogConfig, String tableFormat, Configuration hadoopConf) {}

@Override
public String getCatalogId() {
Expand Down Expand Up @@ -103,4 +95,18 @@ public void dropTable(InternalTable table, CatalogTableIdentifier tableIdentifie
@Override
public void close() throws Exception {}
}

public static class TestCatalogConversionSourceImpl implements CatalogConversionSource {
public TestCatalogConversionSourceImpl(
ExternalCatalogConfig sourceCatalogConfig, Configuration configuration) {}

@Override
public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) {
return SourceTable.builder()
.name("source_table_name")
.basePath("file://base_path/v1/")
.formatName("ICEBERG")
.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,13 @@
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;

import org.apache.xtable.catalog.CatalogConversionFactory;
import org.apache.xtable.catalog.ExternalCatalogConfigFactory;
import org.apache.xtable.conversion.ConversionConfig;
import org.apache.xtable.conversion.ConversionController;
import org.apache.xtable.conversion.ConversionSourceProvider;
Expand Down Expand Up @@ -132,12 +130,12 @@ public static void main(String[] args) throws Exception {
RunSync.TableFormatConverters tableFormatConverters =
loadTableFormatConversionConfigs(customConfig);

Map<String, DatasetConfig.Catalog> catalogsByName =
Map<String, ExternalCatalogConfig> catalogsById =
datasetConfig.getTargetCatalogs().stream()
.collect(Collectors.toMap(DatasetConfig.Catalog::getCatalogId, Function.identity()));
ExternalCatalogConfig sourceCatalogConfig = getCatalogConfig(datasetConfig.getSourceCatalog());
.collect(Collectors.toMap(ExternalCatalogConfig::getCatalogId, Function.identity()));
CatalogConversionSource catalogConversionSource =
CatalogConversionFactory.createCatalogConversionSource(sourceCatalogConfig, hadoopConf);
CatalogConversionFactory.createCatalogConversionSource(
datasetConfig.getSourceCatalog(), hadoopConf);
ConversionController conversionController = new ConversionController(hadoopConf);
for (DatasetConfig.Dataset dataset : datasetConfig.getDatasets()) {
SourceTable sourceTable = null;
Expand Down Expand Up @@ -181,9 +179,7 @@ public static void main(String[] args) throws Exception {
TargetCatalogConfig.builder()
.catalogTableIdentifier(
targetCatalogTableIdentifier.getCatalogTableIdentifier())
.catalogConfig(
getCatalogConfig(
catalogsByName.get(targetCatalogTableIdentifier.getCatalogId())))
.catalogConfig(catalogsById.get(targetCatalogTableIdentifier.getCatalogId()))
.build());
}
ConversionConfig conversionConfig =
Expand All @@ -208,19 +204,6 @@ public static void main(String[] args) throws Exception {
}
}

static ExternalCatalogConfig getCatalogConfig(DatasetConfig.Catalog catalog) {
if (!StringUtils.isEmpty(catalog.getCatalogType())) {
return ExternalCatalogConfigFactory.fromCatalogType(
catalog.getCatalogType(), catalog.getCatalogId(), catalog.getCatalogProperties());
} else {
return ExternalCatalogConfig.builder()
.catalogId(catalog.getCatalogId())
.catalogImpl(catalog.getCatalogImpl())
.catalogOptions(catalog.getCatalogProperties())
.build();
}
}

static Map<String, ConversionSourceProvider> getConversionSourceProviders(
List<String> tableFormats,
RunSync.TableFormatConverters tableFormatConverters,
Expand Down Expand Up @@ -252,36 +235,17 @@ public static class DatasetConfig {
* Configuration of the source catalog from which XTable will read. It must contain all the
* necessary connection and access details for describing and listing tables
*/
private Catalog sourceCatalog;
private ExternalCatalogConfig sourceCatalog;
/**
* Defines configuration one or more target catalogs, to which XTable will write or update
* tables. Unlike the source, these catalogs must be writable
*/
private List<Catalog> targetCatalogs;
private List<ExternalCatalogConfig> targetCatalogs;
/** A list of datasets that specify how a source table maps to one or more target tables. */
private List<Dataset> datasets;

/** Configuration for catalog. */
@Data
public static class Catalog {
/** A user defined unique identifier for the catalog. */
private String catalogId;
/**
* The type of the source catalog. This might be a specific type understood by XTable, such as
* Hive, Glue etc.
*/
private String catalogType;
/**
* (Optional) A fully qualified class name that implements the interfaces for
* CatalogSyncClient, it can be used if the implementation for catalogType doesn't exist in
* XTable. This is an optional field.
*/
private String catalogImpl;
/**
* A collection of configs used to configure access or connection properties for the catalog.
*/
private Map<String, String> catalogProperties;
}
ExternalCatalogConfig catalogConfig;

@Data
public static class Dataset {
Expand All @@ -306,8 +270,8 @@ public static class SourceTableIdentifier {
@Data
public static class TargetTableIdentifier {
/**
* The user defined unique identifier of the target {@link Catalog} where the table will be
* created or updated
* The user defined unique identifier of the target catalog where the table will be created or
* updated
*/
String catalogId;
/**
Expand Down
9 changes: 5 additions & 4 deletions xtable-utilities/src/test/resources/catalogConfig.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,27 @@
#
sourceCatalog:
catalogId: "source-1"
catalogImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl"
catalogConversionSourceImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogConversionSourceImpl"
catalogSyncClientImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl"
catalogProperties:
key01: "value1"
key02: "value2"
key03: "value3"
targetCatalogs:
- catalogId: "target-1"
catalogImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl"
catalogSyncClientImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl"
catalogProperties:
key11: "value1"
key12: "value2"
key13: "value3"
- catalogId: "target-2"
catalogImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl"
catalogSyncClientImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl"
catalogProperties:
key21: "value1"
key22: "value2"
key23: "value3"
- catalogId: "target-3"
catalogImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl"
catalogSyncClientImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl"
catalogProperties:
key31: "value1"
key32: "value2"
Expand Down

0 comments on commit 6e09f5d

Please sign in to comment.