Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 8 additions & 43 deletions mr/src/main/java/org/apache/iceberg/mr/Catalogs.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,14 @@
* <p>In case the value of catalog type is null, iceberg.catalog.<code>catalogName</code>
* .catalog-impl config is used to determine the catalog implementation class.
*
* <p>If catalog name is null, get the catalog type from {@link InputFormatConfig#CATALOG
* iceberg.mr.catalog} config:
* <p>If catalog name is null, get the catalog type from {@link CatalogUtil#ICEBERG_CATALOG_TYPE
* catalog type} config:
*
* <ul>
* <li>hive: HiveCatalog
* <li>location: HadoopTables
* <li>hadoop: HadoopCatalog
* </ul>
*
* <p>In case the value of catalog type is null, {@link InputFormatConfig#CATALOG_LOADER_CLASS
* iceberg.mr.catalog.loader.class} is used to determine the catalog implementation class.
*
* <p>Note: null catalog name mode is only supported for backwards compatibility. Using this mode is
* NOT RECOMMENDED.
*/
public final class Catalogs {

Expand Down Expand Up @@ -259,41 +253,12 @@ static Optional<Catalog> loadCatalog(Configuration conf, String catalogName) {
private static Map<String, String> getCatalogProperties(
Configuration conf, String catalogName, String catalogType) {
String keyPrefix = InputFormatConfig.CATALOG_CONFIG_PREFIX + catalogName;
Map<String, String> catalogProperties =
Streams.stream(conf.iterator())
.filter(e -> e.getKey().startsWith(keyPrefix))
.collect(
Collectors.toMap(
e -> e.getKey().substring(keyPrefix.length() + 1), Map.Entry::getValue));
return addCatalogPropertiesIfMissing(conf, catalogType, catalogProperties);
}

/**
* This method is used for backward-compatible catalog configuration. Collect all the catalog
* specific configuration from the global hive configuration. Note: this should be removed when
* the old catalog configuration is deprecated.
*
* @param conf global hive configuration
* @param catalogType type of the catalog
* @param catalogProperties pre-populated catalog properties
* @return complete map of catalog properties
*/
private static Map<String, String> addCatalogPropertiesIfMissing(
Configuration conf, String catalogType, Map<String, String> catalogProperties) {
if (catalogType != null) {
catalogProperties.putIfAbsent(CatalogUtil.ICEBERG_CATALOG_TYPE, catalogType);
}

String legacyCatalogImpl = conf.get(InputFormatConfig.CATALOG_LOADER_CLASS);
if (legacyCatalogImpl != null) {
catalogProperties.putIfAbsent(CatalogProperties.CATALOG_IMPL, legacyCatalogImpl);
}

String legacyWarehouseLocation = conf.get(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION);
if (legacyWarehouseLocation != null) {
catalogProperties.putIfAbsent(CatalogProperties.WAREHOUSE_LOCATION, legacyWarehouseLocation);
}
return catalogProperties;
return Streams.stream(conf.iterator())
.filter(e -> e.getKey().startsWith(keyPrefix))
.collect(
Collectors.toMap(
e -> e.getKey().substring(keyPrefix.length() + 1), Map.Entry::getValue));
}

/**
Expand All @@ -317,7 +282,7 @@ private static String getCatalogType(Configuration conf, String catalogName) {
return catalogType;
}
} else {
String catalogType = conf.get(InputFormatConfig.CATALOG);
String catalogType = conf.get(CatalogUtil.ICEBERG_CATALOG_TYPE);
if (catalogType != null && catalogType.equals(LOCATION)) {
return NO_CATALOG_TYPE;
} else {
Expand Down
22 changes: 0 additions & 22 deletions mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,28 +48,6 @@ private InputFormatConfig() {}
public static final String TABLE_CATALOG_PREFIX = "iceberg.mr.table.catalog.";
public static final String LOCALITY = "iceberg.mr.locality";

/**
* @deprecated please use {@link #catalogPropertyConfigKey(String, String)} with config key {@link
* org.apache.iceberg.CatalogUtil#ICEBERG_CATALOG_TYPE} to specify the type of a catalog.
*/
@Deprecated public static final String CATALOG = "iceberg.mr.catalog";

/**
* @deprecated please use {@link #catalogPropertyConfigKey(String, String)} with config key {@link
* org.apache.iceberg.CatalogProperties#WAREHOUSE_LOCATION} to specify the warehouse location
* of a catalog.
*/
@Deprecated
public static final String HADOOP_CATALOG_WAREHOUSE_LOCATION =
"iceberg.mr.catalog.hadoop.warehouse.location";

/**
* @deprecated please use {@link #catalogPropertyConfigKey(String, String)} with config key {@link
* org.apache.iceberg.CatalogProperties#CATALOG_IMPL} to specify the implementation of a
* catalog.
*/
@Deprecated public static final String CATALOG_LOADER_CLASS = "iceberg.mr.catalog.loader.class";

public static final String SELECTED_COLUMNS = "iceberg.mr.selected.columns";
public static final String EXTERNAL_TABLE_PURGE = "external.table.purge";

Expand Down
90 changes: 2 additions & 88 deletions mr/src/test/java/org/apache/iceberg/mr/TestCatalogs.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void before() {

@Test
public void testLoadTableFromLocation() throws IOException {
conf.set(InputFormatConfig.CATALOG, Catalogs.LOCATION);
conf.set(CatalogUtil.ICEBERG_CATALOG_TYPE, Catalogs.LOCATION);
AssertHelpers.assertThrows(
"Should complain about table location not set",
IllegalArgumentException.class,
Expand Down Expand Up @@ -109,7 +109,7 @@ public void testCreateDropTableToLocation() throws IOException {
"schema not set",
() -> Catalogs.createTable(conf, missingSchema));

conf.set(InputFormatConfig.CATALOG, Catalogs.LOCATION);
conf.set(CatalogUtil.ICEBERG_CATALOG_TYPE, Catalogs.LOCATION);
Properties missingLocation = new Properties();
missingLocation.put(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(SCHEMA));
AssertHelpers.assertThrows(
Expand Down Expand Up @@ -211,60 +211,6 @@ public void testCreateDropTableToCatalog() throws IOException {
() -> Catalogs.loadTable(conf, dropProperties));
}

@Test
public void testLegacyLoadCatalogDefault() {
Optional<Catalog> defaultCatalog = Catalogs.loadCatalog(conf, null);
Assert.assertTrue(defaultCatalog.isPresent());
Assertions.assertThat(defaultCatalog.get()).isInstanceOf(HiveCatalog.class);
Assert.assertTrue(Catalogs.hiveCatalog(conf, new Properties()));
}

@Test
public void testLegacyLoadCatalogHive() {
conf.set(InputFormatConfig.CATALOG, CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE);
Optional<Catalog> hiveCatalog = Catalogs.loadCatalog(conf, null);
Assert.assertTrue(hiveCatalog.isPresent());
Assertions.assertThat(hiveCatalog.get()).isInstanceOf(HiveCatalog.class);
Assert.assertTrue(Catalogs.hiveCatalog(conf, new Properties()));
}

@Test
public void testLegacyLoadCatalogHadoop() {
conf.set(InputFormatConfig.CATALOG, CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
conf.set(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION, "/tmp/mylocation");
Optional<Catalog> hadoopCatalog = Catalogs.loadCatalog(conf, null);
Assert.assertTrue(hadoopCatalog.isPresent());
Assertions.assertThat(hadoopCatalog.get()).isInstanceOf(HadoopCatalog.class);
Assert.assertFalse(Catalogs.hiveCatalog(conf, new Properties()));
}

@Test
public void testLegacyLoadCatalogCustom() {
conf.set(InputFormatConfig.CATALOG_LOADER_CLASS, CustomHadoopCatalog.class.getName());
conf.set(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION, "/tmp/mylocation");
Optional<Catalog> customHadoopCatalog = Catalogs.loadCatalog(conf, null);
Assert.assertTrue(customHadoopCatalog.isPresent());
Assertions.assertThat(customHadoopCatalog.get()).isInstanceOf(CustomHadoopCatalog.class);
Assert.assertFalse(Catalogs.hiveCatalog(conf, new Properties()));
}

@Test
public void testLegacyLoadCatalogLocation() {
conf.set(InputFormatConfig.CATALOG, Catalogs.LOCATION);
Assert.assertFalse(Catalogs.loadCatalog(conf, null).isPresent());
Assert.assertFalse(Catalogs.hiveCatalog(conf, new Properties()));
}

@Test
public void testLegacyLoadCatalogUnknown() {
conf.set(InputFormatConfig.CATALOG, "fooType");
AssertHelpers.assertThrows(
"should complain about catalog not supported",
UnsupportedOperationException.class,
"Unknown catalog type",
() -> Catalogs.loadCatalog(conf, null));
}

@Test
public void testLoadCatalogDefault() {
String catalogName = "barCatalog";
Expand All @@ -290,21 +236,6 @@ public void testLoadCatalogHive() {
Assert.assertTrue(Catalogs.hiveCatalog(conf, properties));
}

@Test
public void testLegacyLoadCustomCatalogWithHiveCatalogTypeSet() {
String catalogName = "barCatalog";
conf.set(
InputFormatConfig.catalogPropertyConfigKey(catalogName, CatalogUtil.ICEBERG_CATALOG_TYPE),
CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE);
conf.set(InputFormatConfig.CATALOG_LOADER_CLASS, CustomHadoopCatalog.class.getName());
conf.set(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION, "/tmp/mylocation");
AssertHelpers.assertThrows(
"Should complain about both configs being set",
IllegalArgumentException.class,
"both type and catalog-impl are set",
() -> Catalogs.loadCatalog(conf, catalogName));
}

@Test
public void testLoadCatalogHadoop() {
String catalogName = "barCatalog";
Expand All @@ -325,23 +256,6 @@ public void testLoadCatalogHadoop() {
Assert.assertFalse(Catalogs.hiveCatalog(conf, properties));
}

@Test
public void testLoadCatalogHadoopWithLegacyWarehouseLocation() {
String catalogName = "barCatalog";
conf.set(
InputFormatConfig.catalogPropertyConfigKey(catalogName, CatalogUtil.ICEBERG_CATALOG_TYPE),
CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
conf.set(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION, "/tmp/mylocation");
Optional<Catalog> hadoopCatalog = Catalogs.loadCatalog(conf, catalogName);
Assert.assertTrue(hadoopCatalog.isPresent());
Assertions.assertThat(hadoopCatalog.get()).isInstanceOf(HadoopCatalog.class);
Assert.assertEquals(
"HadoopCatalog{name=barCatalog, location=/tmp/mylocation}", hadoopCatalog.get().toString());
Properties properties = new Properties();
properties.put(InputFormatConfig.CATALOG_NAME, catalogName);
Assert.assertFalse(Catalogs.hiveCatalog(conf, properties));
}

@Test
public void testLoadCatalogCustom() {
String catalogName = "barCatalog";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public class TestIcebergInputFormats {
@Before
public void before() throws IOException {
conf = new Configuration();
conf.set(InputFormatConfig.CATALOG, Catalogs.LOCATION);
conf.set(CatalogUtil.ICEBERG_CATALOG_TYPE, Catalogs.LOCATION);
HadoopTables tables = new HadoopTables(conf);

File location = temp.newFolder(testInputFormat.name(), fileFormat.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
Expand Down Expand Up @@ -64,7 +65,7 @@ public static Object[][] parameters() {
@Before
@Override
public void writeTestDataFile() throws IOException {
conf.set(InputFormatConfig.CATALOG, Catalogs.LOCATION);
conf.set(CatalogUtil.ICEBERG_CATALOG_TYPE, Catalogs.LOCATION);
super.writeTestDataFile();
}

Expand Down