diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergConfigUtil.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergConfigUtil.java new file mode 100644 index 0000000000..b9c12a78d9 --- /dev/null +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergConfigUtil.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.function.Supplier; +import javax.annotation.Nullable; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.mr.SerializationUtil; + + +class HiveIcebergConfigUtil { + + private static final String NAME = "name"; + private static final String TABLE_SCHEMA_MAP = "iceberg.mr.table.schema.map"; + + + private HiveIcebergConfigUtil() { + } + + /** + * Copies schema provided by schemaSupplier into the configuration. + * + * While copying, the schema is added into a map(tablename -> schema). This map is serialized and set as the + * value for the key TABLE_SCHEMA_MAP. + */ + static void copySchemaToConf(Supplier schemaSupplier, Configuration configuration, Properties tblProperties) { + String tableName = tblProperties.getProperty(NAME); + Map tableToSchema = + Optional.ofNullable(configuration.get(TABLE_SCHEMA_MAP)) + .map(x -> (HashMap) SerializationUtil.deserializeFromBase64(x)) + .orElse(new HashMap<>()); + if (!tableToSchema.containsKey(tableName)) { + tableToSchema.put(tableName, SchemaParser.toJson(schemaSupplier.get())); + } + configuration.set(TABLE_SCHEMA_MAP, SerializationUtil.serializeToBase64(tableToSchema)); + } + + /** + * Gets schema from the configuration. + * + * tblProperties is consulted to get the tablename. This tablename is looked up in the serialized map present in + * TABLE_SCHEMA_MAP configuration. The returned json schema is then parsed and returned. + */ + static Optional getSchemaFromConf(@Nullable Configuration configuration, Properties tblProperties) { + String tableName = tblProperties.getProperty(NAME); + return Optional.ofNullable(configuration) + .map(c -> c.get(TABLE_SCHEMA_MAP)) + .map(x -> (HashMap) SerializationUtil.deserializeFromBase64(x)) + .map(map -> map.get(tableName)) + .map(SchemaParser::fromJson); + } + +} diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java index 26307b1e8b..0a831ff179 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java @@ -56,6 +56,7 @@ public class HiveIcebergInputFormat extends MapredIcebergInputFormat Configuration.class, ExprNodeGenericFuncDesc.class) .orNoop() .buildStatic(); + static final String SPLIT_LOCATION = "iceberg.hive.split.location"; @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { @@ -72,7 +73,7 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { } } - String location = job.get(InputFormatConfig.TABLE_LOCATION); + String location = job.get(SPLIT_LOCATION); return Arrays.stream(super.getSplits(job, numSplits)) .map(split -> new HiveIcebergSplit((IcebergSplit) split, location)) .toArray(InputSplit[]::new); diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java index b46847ecf0..eb439ec61a 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java @@ -28,10 +28,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.Writable; import org.apache.iceberg.Schema; -import org.apache.iceberg.SchemaParser; -import org.apache.iceberg.Table; import org.apache.iceberg.mr.Catalogs; -import org.apache.iceberg.mr.InputFormatConfig; import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergObjectInspector; import org.apache.iceberg.mr.mapred.Container; @@ -41,13 +38,9 @@ public class HiveIcebergSerDe extends AbstractSerDe { @Override public void initialize(@Nullable Configuration configuration, Properties serDeProperties) throws SerDeException { - Schema tableSchema; - if (configuration.get(InputFormatConfig.TABLE_SCHEMA) != null) { - tableSchema = SchemaParser.fromJson(configuration.get(InputFormatConfig.TABLE_SCHEMA)); - } else { - Table table = Catalogs.loadTable(configuration, serDeProperties); - tableSchema = table.schema(); - } + Schema tableSchema = + HiveIcebergConfigUtil.getSchemaFromConf(configuration, serDeProperties) + .orElseGet(() -> Catalogs.loadTable(configuration, serDeProperties).schema()); try { this.inspector = IcebergObjectInspector.create(tableSchema); } catch (Exception e) { diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index d6c4c2feab..94e8e25473 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -43,6 +43,7 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, HiveStorageHandler { private static final String NAME = "name"; + private static final String LOCATION = "location"; private Configuration conf; @@ -79,6 +80,7 @@ public void configureInputJobProperties(TableDesc tableDesc, Map map.put(InputFormatConfig.TABLE_IDENTIFIER, props.getProperty(NAME)); map.put(InputFormatConfig.TABLE_LOCATION, table.location()); map.put(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(table.schema())); + map.put(HiveIcebergInputFormat.SPLIT_LOCATION, props.getProperty(LOCATION)); } @Override @@ -93,7 +95,11 @@ public void configureTableJobProperties(TableDesc tableDesc, Map @Override public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { - + HiveIcebergConfigUtil.copySchemaToConf( + () -> Catalogs.loadTable(conf, tableDesc.getProperties()).schema(), + jobConf, + tableDesc.getProperties() + ); } @Override diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHiveCatalogAndLinkedinMetadata.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHiveCatalogAndLinkedinMetadata.java new file mode 100644 index 0000000000..789fae04ab --- /dev/null +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHiveCatalogAndLinkedinMetadata.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.hive.HiveCatalogs; +import org.apache.iceberg.mr.TestHelper; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.rules.TemporaryFolder; + +public class TestHiveIcebergStorageHandlerWithHiveCatalogAndLinkedinMetadata + extends TestHiveIcebergStorageHandlerWithHiveCatalog { + + private HiveCatalog hiveCatalog; + private TemporaryFolder temporaryFolder; + private final FileFormat fileFormat = FileFormat.PARQUET; + + @Override + public TestTables testTables(Configuration conf, TemporaryFolder temp) { + hiveCatalog = HiveCatalogs.loadCatalog(conf); + temporaryFolder = temp; + return super.testTables(conf, temp); + } + + @Override + protected Table createIcebergTable(String tableName, Schema schema, List records) throws IOException { + // This code is derived from TestTables. There was no easy way to alter table location without changing + // bunch of interfaces. With this code the same outcome is achieved. + TableIdentifier tableIdentifier = TableIdentifier.parse("default." + tableName); + Table table = hiveCatalog.createTable( + tableIdentifier, + schema, + PartitionSpec.unpartitioned(), + getLocationWithoutURI(tableIdentifier), + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.name())); + + if (!records.isEmpty()) { + table + .newAppend() + .appendFile(TestHelper.writeFile(table, null, records, fileFormat, temporaryFolder.newFile())) + .commit(); + } + return table; + } + + private String getLocationWithoutURI(TableIdentifier tableIdentifier) { + try { + String location = DynMethods.builder("defaultWarehouseLocation") + .hiddenImpl(HiveCatalog.class, TableIdentifier.class) + .build() + .invoke(hiveCatalog, tableIdentifier); + return new URI(location).getPath(); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } +}