diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergConfigUtil.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergConfigUtil.java new file mode 100644 index 0000000000..b6f8893299 --- /dev/null +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergConfigUtil.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.function.Supplier; +import javax.annotation.Nullable; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.mr.Catalogs; +import org.apache.iceberg.mr.SerializationUtil; + + +class HiveIcebergConfigUtil { + + private static final String TABLE_SCHEMA_MAP = "iceberg.mr.table.schema.map"; + + + private HiveIcebergConfigUtil() { + } + + /** + * Copies schema provided by schemaSupplier into the configuration. + * + * While copying, the schema is added into a map(tablename -> schema). This map is serialized and set as the + * value for the key TABLE_SCHEMA_MAP. + */ + static void copySchemaToConf(Supplier schemaSupplier, Configuration configuration, Properties tblProperties) { + String tableName = tblProperties.getProperty(Catalogs.NAME); + Map tableToSchema = + Optional.ofNullable(configuration.get(TABLE_SCHEMA_MAP)) + .map(x -> (HashMap) SerializationUtil.deserializeFromBase64(x)) + .orElseGet(() -> new HashMap<>()); + if (!tableToSchema.containsKey(tableName)) { + tableToSchema.put(tableName, SchemaParser.toJson(schemaSupplier.get())); + } + configuration.set(TABLE_SCHEMA_MAP, SerializationUtil.serializeToBase64(tableToSchema)); + } + + /** + * Gets schema from the configuration. + * + * tblProperties is consulted to get the tablename. This tablename is looked up in the serialized map present in + * TABLE_SCHEMA_MAP configuration. The returned json schema is then parsed and returned. + */ + static Optional getSchemaFromConf(@Nullable Configuration configuration, Properties tblProperties) { + String tableName = tblProperties.getProperty(Catalogs.NAME); + return Optional.ofNullable(configuration) + .map(c -> c.get(TABLE_SCHEMA_MAP)) + .map(x -> (HashMap) SerializationUtil.deserializeFromBase64(x)) + .map(map -> map.get(tableName)) + .map(SchemaParser::fromJson); + } + +} diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java index 26307b1e8b..0a831ff179 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java @@ -56,6 +56,7 @@ public class HiveIcebergInputFormat extends MapredIcebergInputFormat Configuration.class, ExprNodeGenericFuncDesc.class) .orNoop() .buildStatic(); + static final String SPLIT_LOCATION = "iceberg.hive.split.location"; @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { @@ -72,7 +73,7 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { } } - String location = job.get(InputFormatConfig.TABLE_LOCATION); + String location = job.get(SPLIT_LOCATION); return Arrays.stream(super.getSplits(job, numSplits)) .map(split -> new HiveIcebergSplit((IcebergSplit) split, location)) .toArray(InputSplit[]::new); diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java index 8c8f91ec15..dd710b033d 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java @@ -19,6 +19,7 @@ package org.apache.iceberg.mr.hive; +import java.util.Optional; import java.util.Properties; import javax.annotation.Nullable; import org.apache.hadoop.conf.Configuration; @@ -51,7 +52,12 @@ public void initialize(@Nullable Configuration configuration, Properties serDePr // the resulting properties are serialized and distributed to the executors Schema tableSchema; - if (configuration.get(InputFormatConfig.TABLE_SCHEMA) != null) { + // LinkedIn's Hive doesn't call configureInputJobProperties() before initializing SerDe. This is a workaround + // to appropriately capture configs from configureJobConf() + Optional configSchema = HiveIcebergConfigUtil.getSchemaFromConf(configuration, serDeProperties); + if (configSchema.isPresent()) { + tableSchema = configSchema.get(); + } else if (configuration.get(InputFormatConfig.TABLE_SCHEMA) != null) { tableSchema = SchemaParser.fromJson(configuration.get(InputFormatConfig.TABLE_SCHEMA)); } else if (serDeProperties.get(InputFormatConfig.TABLE_SCHEMA) != null) { tableSchema = SchemaParser.fromJson((String) serDeProperties.get(InputFormatConfig.TABLE_SCHEMA)); diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index b4a5d3ff00..fd32de895e 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -77,6 +77,7 @@ public void configureInputJobProperties(TableDesc tableDesc, Map map.put(InputFormatConfig.TABLE_IDENTIFIER, props.getProperty(Catalogs.NAME)); map.put(InputFormatConfig.TABLE_LOCATION, table.location()); map.put(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(table.schema())); + map.put(HiveIcebergInputFormat.SPLIT_LOCATION, props.getProperty(Catalogs.LOCATION)); } @Override @@ -97,7 +98,11 @@ public void configureInputJobCredentials(TableDesc tableDesc, Map Catalogs.loadTable(conf, tableDesc.getProperties()).schema(), + jobConf, + tableDesc.getProperties() + ); } @Override diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHiveCatalogAndLinkedinMetadata.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHiveCatalogAndLinkedinMetadata.java new file mode 100644 index 0000000000..4ea19a47c7 --- /dev/null +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHiveCatalogAndLinkedinMetadata.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.hive.HiveCatalogs; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.rules.TemporaryFolder; + +public class TestHiveIcebergStorageHandlerWithHiveCatalogAndLinkedinMetadata + extends TestHiveIcebergStorageHandlerWithHiveCatalog { + + private HiveCatalog hiveCatalog; + private TemporaryFolder temporaryFolder; + + @Override + public TestTables testTables(Configuration conf, TemporaryFolder temp) { + hiveCatalog = HiveCatalogs.loadCatalog(conf); + temporaryFolder = temp; + return super.testTables(conf, temp); + } + + @Override + protected Table createIcebergTable(String tableName, Schema schema, List records) throws IOException { + // This code is derived from TestTables. There was no easy way to alter table location without changing + // bunch of interfaces. With this code the same outcome is achieved. + TableIdentifier tableIdentifier = TableIdentifier.parse("default." + tableName); + Table table = hiveCatalog.buildTable(tableIdentifier, schema) + .withPartitionSpec(PartitionSpec.unpartitioned()) + .withLocation(getLocationWithoutURI(tableIdentifier)) + .withProperties( + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.name(), + TableProperties.ENGINE_HIVE_ENABLED, "true")) + .create(); + + if (!records.isEmpty()) { + GenericAppenderHelper appender = new GenericAppenderHelper(table, fileFormat, temporaryFolder); + table + .newAppend() + .appendFile(appender.writeFile(null, records)) + .commit(); + } + return table; + } + + private String getLocationWithoutURI(TableIdentifier tableIdentifier) { + try { + String location = DynMethods.builder("defaultWarehouseLocation") + .hiddenImpl(HiveCatalog.class, TableIdentifier.class) + .build() + .invoke(hiveCatalog, tableIdentifier); + return new URI(location).getPath(); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } +}