Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 73 additions & 3 deletions core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@

package org.apache.iceberg;

import java.util.Collections;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
Expand All @@ -33,6 +37,12 @@

public abstract class BaseMetastoreCatalog implements Catalog {
private static final Logger LOG = LoggerFactory.getLogger(BaseMetastoreCatalog.class);
private Map<String, String> catalogProps = Collections.emptyMap();

@Override
public void initialize(String name, Map<String, String> properties) {
catalogProps = properties;
}

@Override
public Table loadTable(TableIdentifier identifier) {
Expand Down Expand Up @@ -158,7 +168,7 @@ public Table create() {
}

String baseLocation = location != null ? location : defaultWarehouseLocation(identifier);
Map<String, String> properties = propertiesBuilder.build();
Map<String, String> properties = updateTableProperties(propertiesBuilder.build());
TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, sortOrder, baseLocation, properties);

try {
Expand All @@ -178,7 +188,7 @@ public Transaction createTransaction() {
}

String baseLocation = location != null ? location : defaultWarehouseLocation(identifier);
Map<String, String> properties = propertiesBuilder.build();
Map<String, String> properties = updateTableProperties(propertiesBuilder.build());
TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, sortOrder, baseLocation, properties);
return Transactions.createTableTransaction(identifier.toString(), ops, metadata);
}
Expand All @@ -205,7 +215,8 @@ private Transaction newReplaceTableTransaction(boolean orCreate) {
metadata = ops.current().buildReplacement(schema, spec, sortOrder, baseLocation, propertiesBuilder.build());
} else {
String baseLocation = location != null ? location : defaultWarehouseLocation(identifier);
metadata = TableMetadata.newTableMetadata(schema, spec, sortOrder, baseLocation, propertiesBuilder.build());
Map<String, String> properties = updateTableProperties(propertiesBuilder.build());
metadata = TableMetadata.newTableMetadata(schema, spec, sortOrder, baseLocation, properties);
}

if (orCreate) {
Expand Down Expand Up @@ -238,4 +249,63 @@ protected static String fullTableName(String catalogName, TableIdentifier identi

return sb.toString();
}

@Override
public Table createTable(
TableIdentifier identifier,
Schema schema,
PartitionSpec spec,
String location,
Map<String, String> properties) {
return buildTable(identifier, schema)
.withPartitionSpec(spec)
.withLocation(location)
.withProperties(updateTableProperties(properties))
.create();
}

/**
* Get default table properties set at Catalog level through catalog properties.
*
* @return default table properties specified in catalog properties
*/
private Map<String, String> tableDefaultProperties() {
Map<String, String> props = catalogProps == null ? Collections.emptyMap() : catalogProps;

return props.entrySet().stream()
.filter(e -> e.getKey().toLowerCase(Locale.ROOT).startsWith(CatalogProperties.TABLE_DEFAULT_PREFIX))
.collect(Collectors.toMap(e -> e.getKey().replace(CatalogProperties.TABLE_DEFAULT_PREFIX, ""),
Map.Entry::getValue));
}

/**
* Get table properties that are enforced at Catalog level through catalog properties.
*
* @return default table properties enforced through catalog properties
*/
private Map<String, String> tableOverrideProperties() {
Map<String, String> props = catalogProps == null ? Collections.emptyMap() : catalogProps;

return props.entrySet().stream()
.filter(e -> e.getKey().toLowerCase(Locale.ROOT).startsWith(CatalogProperties.TABLE_OVERRIDE_PREFIX))
.collect(Collectors.toMap(e -> e.getKey().replace(CatalogProperties.TABLE_OVERRIDE_PREFIX, ""),
Map.Entry::getValue));
}

/**
* Return updated table properties with table properties defaults and enforcements set at Catalog level through
* catalog properties.
*
* @return updated table properties with defaults and enforcements set at Catalog level
*/
private Map<String, String> updateTableProperties(Map<String, String> tableProperties) {
Map<String, String> props = tableProperties == null ? Collections.emptyMap() : tableProperties;

return Stream.concat(
Stream.concat(
tableDefaultProperties().entrySet().stream(),
props.entrySet().stream()),
tableOverrideProperties().entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e2));
}
}
2 changes: 2 additions & 0 deletions core/src/main/java/org/apache/iceberg/CatalogProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ private CatalogProperties() {
public static final String CATALOG_IMPL = "catalog-impl";
public static final String FILE_IO_IMPL = "io-impl";
public static final String WAREHOUSE_LOCATION = "warehouse";
public static final String TABLE_DEFAULT_PREFIX = "table-default.";
public static final String TABLE_OVERRIDE_PREFIX = "table-override.";
Comment on lines +32 to +33
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit / open-question: How do people feel about these names? I'd kind of like them to have properties or something in them, but this might be overkill.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm good with this.


/**
* Controls whether the catalog will cache table entries upon load.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ public HadoopCatalog() {

@Override
public void initialize(String name, Map<String, String> properties) {
super.initialize(name, properties);

String inputWarehouseLocation = properties.get(CatalogProperties.WAREHOUSE_LOCATION);
Preconditions.checkArgument(inputWarehouseLocation != null && !inputWarehouseLocation.equals(""),
"Cannot instantiate hadoop catalog. No location provided for warehouse (Set warehouse config)");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.zip.GZIPOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
Expand Down Expand Up @@ -170,10 +172,15 @@ void rewriteMetadataAsGzipWithOldExtension() throws IOException {
}

protected HadoopCatalog hadoopCatalog() throws IOException {
return hadoopCatalog(Collections.emptyMap());
}

protected HadoopCatalog hadoopCatalog(Map<String, String> catalogProperties) throws IOException {
HadoopCatalog hadoopCatalog = new HadoopCatalog();
hadoopCatalog.setConf(new Configuration());
hadoopCatalog.initialize("hadoop",
ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION, temp.newFolder().getAbsolutePath()));
ImmutableMap.<String, String>builder().putAll(catalogProperties).put(CatalogProperties.WAREHOUSE_LOCATION,
temp.newFolder().getAbsolutePath()).build());
return hadoopCatalog;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -547,4 +548,60 @@ private static void addVersionsToTable(Table table) {
table.newAppend().appendFile(dataFile1).commit();
table.newAppend().appendFile(dataFile2).commit();
}

@Test
public void testTablePropsDefaultsWithoutConflict() throws IOException {
TableIdentifier tableIdent = TableIdentifier.of("db", "ns1", "ns2", "tbl");
Map<String, String> catalogProps = ImmutableMap.of("table-default.key3", "value3",
"table-override.key4", "value4");

Table table = hadoopCatalog(catalogProps).buildTable(tableIdent, SCHEMA)
.withPartitionSpec(SPEC)
.withProperties(null)
.withProperty("key1", "value1")
.withProperty("key2", "value2")
.create();

Assert.assertEquals("value1", table.properties().get("key1"));
Assert.assertEquals("value2", table.properties().get("key2"));
Assert.assertEquals("value3", table.properties().get("key3"));
Assert.assertEquals("value4", table.properties().get("key4"));
}


@Test
public void testTablePropsOverrideCatalogDefaultProps() throws IOException {
TableIdentifier tableIdent = TableIdentifier.of("db", "ns1", "ns2", "tbl");
Map<String, String> catalogProps = ImmutableMap.of("table-default.key3", "value3");

Table table = hadoopCatalog(catalogProps).buildTable(tableIdent, SCHEMA)
.withPartitionSpec(SPEC)
.withProperties(null)
.withProperty("key1", "value1")
.withProperty("key2", "value2")
.withProperty("key3", "value31")
.create();

Assert.assertEquals("value1", table.properties().get("key1"));
Assert.assertEquals("value2", table.properties().get("key2"));
Assert.assertEquals("value31", table.properties().get("key3"));
}

@Test
public void testEnforcedCatalogPropsOverrideTableDefaults() throws IOException {
TableIdentifier tableIdent = TableIdentifier.of("db", "ns1", "ns2", "tbl");
Map<String, String> catalogProps = ImmutableMap.of("table-override.key3", "value3");

Table table = hadoopCatalog(catalogProps).buildTable(tableIdent, SCHEMA)
.withPartitionSpec(SPEC)
.withProperties(null)
.withProperty("key1", "value1")
.withProperty("key2", "value2")
.withProperty("key3", "value31")
.create();

Assert.assertEquals("value1", table.properties().get("key1"));
Assert.assertEquals("value2", table.properties().get("key2"));
Assert.assertEquals("value3", table.properties().get("key3"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public HiveCatalog() {

@Override
public void initialize(String inputName, Map<String, String> properties) {
super.initialize(name, properties);

this.name = inputName;
if (conf == null) {
LOG.warn("No Hadoop Configuration was set, using the default environment Configuration");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.CachingCatalog;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
Expand Down Expand Up @@ -468,4 +469,84 @@ public void testUUIDinTableProperties() throws Exception {
catalog.dropTable(tableIdentifier);
}
}

@Test
public void testTablePropsDefaultsWithoutConflict() {
Schema schema = new Schema(
required(1, "id", Types.IntegerType.get(), "unique ID")
);
TableIdentifier tableIdent = TableIdentifier.of(DB_NAME, "tbl");

ImmutableMap<String, String> catalogProps = ImmutableMap.of("table-default.key3", "value3",
"table-override.key4", "value4");
HiveCatalog hiveCatalog = (HiveCatalog) CatalogUtil.loadCatalog(HiveCatalog.class.getName(),
CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE, catalogProps, hiveConf);

try {
Table table = hiveCatalog.buildTable(tableIdent, schema)
.withProperty("key1", "value1")
.withProperty("key2", "value2")
.create();

Assert.assertEquals("value1", table.properties().get("key1"));
Assert.assertEquals("value2", table.properties().get("key2"));
Assert.assertEquals("value3", table.properties().get("key3"));
Assert.assertEquals("value4", table.properties().get("key4"));
} finally {
hiveCatalog.dropTable(tableIdent);
}
}

@Test
public void testTablePropsOverrideCatalogDefaultProps() {
Schema schema = new Schema(
required(1, "id", Types.IntegerType.get(), "unique ID")
);
TableIdentifier tableIdent = TableIdentifier.of(DB_NAME, "tbl");

ImmutableMap<String, String> catalogProps = ImmutableMap.of("table-default.key3", "value3");
HiveCatalog hiveCatalog = (HiveCatalog) CatalogUtil.loadCatalog(HiveCatalog.class.getName(),
CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE, catalogProps, hiveConf);

try {
Table table = hiveCatalog.buildTable(tableIdent, schema)
.withProperty("key1", "value1")
.withProperty("key2", "value2")
.withProperty("key3", "value31")
.create();

Assert.assertEquals("value1", table.properties().get("key1"));
Assert.assertEquals("value2", table.properties().get("key2"));
Assert.assertEquals("value31", table.properties().get("key3"));
} finally {
hiveCatalog.dropTable(tableIdent);
}
}

@Test
public void testEnforcedCatalogPropsOverrideTableDefaults() {
Schema schema = new Schema(
required(1, "id", Types.IntegerType.get(), "unique ID")
);
TableIdentifier tableIdent = TableIdentifier.of(DB_NAME, "tbl");

ImmutableMap<String, String> catalogProps = ImmutableMap.of(
"table-override.key3", "value3");
HiveCatalog hiveCatalog = (HiveCatalog) CatalogUtil.loadCatalog(HiveCatalog.class.getName(),
CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE, catalogProps, hiveConf);

try {
Table table = hiveCatalog.buildTable(tableIdent, schema)
.withProperty("key1", "value1")
.withProperty("key2", "value2")
.withProperty("key3", "value31")
.create();

Assert.assertEquals("value1", table.properties().get("key1"));
Assert.assertEquals("value2", table.properties().get("key2"));
Assert.assertEquals("value3", table.properties().get("key3"));
} finally {
hiveCatalog.dropTable(tableIdent);
}
}
}