diff --git a/build.gradle b/build.gradle index d6a9403074d9..2b6f281778e8 100644 --- a/build.gradle +++ b/build.gradle @@ -542,7 +542,7 @@ project(':iceberg-mr') { test { // testJoinTables / testScanTable - maxHeapSize '1500m' + maxHeapSize '2500m' } } @@ -608,7 +608,7 @@ if (jdkVersion == '8') { test { // testJoinTables / testScanTable - maxHeapSize '1500m' + maxHeapSize '2500m' } } } diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java index 8f73df355b7b..135fcdd8cce1 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -63,6 +64,18 @@ public static Schema convert(List fieldSchemas) { return HiveSchemaConverter.convert(names, typeInfos); } + /** + * Converts the Hive partition columns to Iceberg identity partition specification. + * @param schema The Iceberg schema + * @param fieldSchemas The partition column specification + * @return The Iceberg partition specification + */ + public static PartitionSpec spec(Schema schema, List fieldSchemas) { + PartitionSpec.Builder builder = PartitionSpec.builderFor(schema); + fieldSchemas.forEach(fieldSchema -> builder.identity(fieldSchema.getName())); + return builder.build(); + } + /** * Converts the Hive list of column names and column types to an Iceberg schema. * @param names The list of the Hive column names diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java index 4f969e45d68e..011178a99903 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java @@ -19,10 +19,13 @@ package org.apache.iceberg.mr.hive; +import java.util.ArrayList; +import java.util.List; import java.util.Properties; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.CatalogUtil; @@ -93,11 +96,16 @@ public void preCreateTable(org.apache.hadoop.hive.metastore.api.Table hmsTable) // If the table does not exist collect data for table creation // - InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC takes precedence so the user can override the // Iceberg schema and specification generated by the code - // - Partitioned Hive tables are currently not allowed Schema schema = schema(catalogProperties, hmsTable); PartitionSpec spec = spec(schema, catalogProperties, hmsTable); + // If there are partition keys specified remove them from the HMS table and add them to the column list + if (hmsTable.isSetPartitionKeys()) { + hmsTable.getSd().getCols().addAll(hmsTable.getPartitionKeys()); + hmsTable.setPartitionKeysIsSet(false); + } + catalogProperties.put(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(schema)); catalogProperties.put(InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(spec)); @@ -197,20 +205,32 @@ private static Schema schema(Properties properties, org.apache.hadoop.hive.metas if (properties.getProperty(InputFormatConfig.TABLE_SCHEMA) != null) { return SchemaParser.fromJson(properties.getProperty(InputFormatConfig.TABLE_SCHEMA)); } else { - return HiveSchemaUtil.convert(hmsTable.getSd().getCols()); + if (hmsTable.isSetPartitionKeys() && !hmsTable.getPartitionKeys().isEmpty()) { + // Add partitioning columns to the original column list before creating the Iceberg Schema + List cols = new ArrayList<>(hmsTable.getSd().getCols()); + cols.addAll(hmsTable.getPartitionKeys()); + return HiveSchemaUtil.convert(cols); + } else { + return HiveSchemaUtil.convert(hmsTable.getSd().getCols()); + } } } private static PartitionSpec spec(Schema schema, Properties properties, org.apache.hadoop.hive.metastore.api.Table hmsTable) { - Preconditions.checkArgument(hmsTable.getPartitionKeys() == null || hmsTable.getPartitionKeys().isEmpty(), - "Partitioned Hive tables are currently not supported"); - if (properties.getProperty(InputFormatConfig.PARTITION_SPEC) != null) { + Preconditions.checkArgument(!hmsTable.isSetPartitionKeys() || hmsTable.getPartitionKeys().isEmpty(), + "Provide only one of the following: Hive partition specification, or the " + + InputFormatConfig.PARTITION_SPEC + " property"); return PartitionSpecParser.fromJson(schema, properties.getProperty(InputFormatConfig.PARTITION_SPEC)); } else { - return PartitionSpec.unpartitioned(); + if (hmsTable.isSetPartitionKeys() && !hmsTable.getPartitionKeys().isEmpty()) { + // If the table is partitioned then generate the identity partition definitions for the Iceberg table + return HiveSchemaUtil.spec(schema, hmsTable.getPartitionKeys()); + } else { + return PartitionSpec.unpartitioned(); + } } } } diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java index e109984e4dd7..56f93e9a5bdb 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java @@ -184,6 +184,22 @@ public static List valuesForTestRecord(Record record) { ); } + /** + * Converts a list of Object arrays to a list of Iceberg records. + * @param schema The schema of the Iceberg record + * @param rows The data of the records + * @return The list of the converted records + */ + public static List valueForRow(Schema schema, List rows) { + return rows.stream().map(row -> { + Record record = GenericRecord.create(schema); + for (int i = 0; i < row.length; ++i) { + record.set(i, row[i]); + } + return record; + }).collect(Collectors.toList()); + } + /** * Check if 2 Iceberg records are the same or not. Compares OffsetDateTimes only by the Intant they represent. * @param expected The expected record @@ -219,13 +235,24 @@ public static void validateData(Table table, List expected, int sortBy) iterable.forEach(records::add); } + validateData(expected, records, sortBy); + } + + /** + * Validates whether the 2 sets of records are the same. The results should be sorted by a unique key so we do + * not end up with flaky tests. + * @param expected The expected list of Records (The list will be sorted) + * @param actual The actual list of Records (The list will be sorted) + * @param sortBy The column position by which we will sort + */ + public static void validateData(List expected, List actual, int sortBy) { // Sort based on the specified column expected.sort(Comparator.comparingLong(record -> (Long) record.get(sortBy))); - records.sort(Comparator.comparingLong(record -> (Long) record.get(sortBy))); + actual.sort(Comparator.comparingLong(record -> (Long) record.get(sortBy))); - Assert.assertEquals(expected.size(), records.size()); + Assert.assertEquals(expected.size(), actual.size()); for (int i = 0; i < expected.size(); ++i) { - assertEquals(expected.get(i), records.get(i)); + assertEquals(expected.get(i), actual.get(i)); } } diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandler.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandler.java index 4a87ede341c4..0a75cd6c990f 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandler.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandler.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.math.BigDecimal; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -41,6 +40,7 @@ import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.StructLike; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TestHelpers.Row; import org.apache.iceberg.catalog.TableIdentifier; @@ -52,7 +52,9 @@ import org.apache.iceberg.mr.InputFormatConfig; import org.apache.iceberg.mr.TestHelper; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.thrift.TException; @@ -645,66 +647,75 @@ public void selectSameColumnTwice() throws IOException { @Test public void testCreateTableWithColumnSpecification() throws IOException { TableIdentifier identifier = TableIdentifier.of("default", "customers"); - - shell.executeStatement("CREATE EXTERNAL TABLE customers (customer_id BIGINT, first_name STRING, last_name STRING)" + + Map> data = new HashMap<>(1); + data.put(null, CUSTOMER_RECORDS); + String createSql = "CREATE EXTERNAL TABLE " + identifier + + " (customer_id BIGINT, first_name STRING, last_name STRING)" + " STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + - testTables.locationForCreateTableSQL(identifier)); - - // Check the Iceberg table data - org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier); - Assert.assertEquals(CUSTOMER_SCHEMA.asStruct(), icebergTable.schema().asStruct()); - Assert.assertEquals(SPEC, icebergTable.spec()); - - testTables.appendIcebergTable(shell.getHiveConf(), icebergTable, fileFormat, null, CUSTOMER_RECORDS); + testTables.locationForCreateTableSQL(identifier); + runCreateAndReadTest(identifier, createSql, CUSTOMER_SCHEMA, PartitionSpec.unpartitioned(), data); + } - List descRows = shell.executeStatement("SELECT * FROM default.customers ORDER BY customer_id DESC"); + @Test + public void testCreateTableWithColumnSpecificationPartitioned() throws IOException { + TableIdentifier identifier = TableIdentifier.of("default", "customers"); + PartitionSpec spec = PartitionSpec.builderFor(CUSTOMER_SCHEMA).identity("last_name").build(); + Map> data = ImmutableMap.of( + Row.of("Brown"), Collections.singletonList(CUSTOMER_RECORDS.get(0)), + Row.of("Green"), Collections.singletonList(CUSTOMER_RECORDS.get(1)), + Row.of("Pink"), Collections.singletonList(CUSTOMER_RECORDS.get(2))); + String createSql = "CREATE EXTERNAL TABLE " + identifier + + " (customer_id BIGINT, first_name STRING) PARTITIONED BY (last_name STRING) " + + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + + testTables.locationForCreateTableSQL(identifier); + runCreateAndReadTest(identifier, createSql, CUSTOMER_SCHEMA, spec, data); + } - Assert.assertEquals(3, descRows.size()); - Assert.assertArrayEquals(new Object[] {2L, "Trudy", "Pink"}, descRows.get(0)); - Assert.assertArrayEquals(new Object[] {1L, "Bob", "Green"}, descRows.get(1)); - Assert.assertArrayEquals(new Object[] {0L, "Alice", "Brown"}, descRows.get(2)); + @Test + public void testCreatePartitionedTableByProperty() throws IOException { + TableIdentifier identifier = TableIdentifier.of("default", "customers"); + PartitionSpec spec = PartitionSpec.builderFor(CUSTOMER_SCHEMA).identity("last_name").build(); + Map> data = ImmutableMap.of( + Row.of("Brown"), Collections.singletonList(CUSTOMER_RECORDS.get(0)), + Row.of("Green"), Collections.singletonList(CUSTOMER_RECORDS.get(1)), + Row.of("Pink"), Collections.singletonList(CUSTOMER_RECORDS.get(2))); + String createSql = "CREATE EXTERNAL TABLE " + identifier + + " STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + + testTables.locationForCreateTableSQL(identifier) + + "TBLPROPERTIES ('" + InputFormatConfig.PARTITION_SPEC + "'='" + PartitionSpecParser.toJson(spec) + "', " + + "'" + InputFormatConfig.TABLE_SCHEMA + "'='" + SchemaParser.toJson(CUSTOMER_SCHEMA) + "')"; + runCreateAndReadTest(identifier, createSql, CUSTOMER_SCHEMA, spec, data); } @Test - public void testCreateTableWithColumnSpecificationPartitioned() { + public void testCreatePartitionedTableWithPropertiesAndWithColumnSpecification() { + PartitionSpec spec = PartitionSpec.builderFor(CUSTOMER_SCHEMA).identity("last_name").build(); + AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class, - "currently not supported", () -> { + "Provide only one of the following", () -> { shell.executeStatement("CREATE EXTERNAL TABLE customers (customer_id BIGINT) " + "PARTITIONED BY (first_name STRING) " + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + - testTables.locationForCreateTableSQL(TableIdentifier.of("default", "customers"))); + testTables.locationForCreateTableSQL(TableIdentifier.of("default", "customers")) + + " TBLPROPERTIES ('" + InputFormatConfig.PARTITION_SPEC + "'='" + + PartitionSpecParser.toJson(spec) + "')"); } ); } @Test - public void testCreatePartitionedTable() throws IOException { + public void testCreateTableWithColumnSpecificationMultilevelPartitioned() throws IOException { TableIdentifier identifier = TableIdentifier.of("default", "customers"); - PartitionSpec spec = PartitionSpec.builderFor(CUSTOMER_SCHEMA).identity("first_name").build(); - - shell.executeStatement("CREATE EXTERNAL TABLE customers " + + PartitionSpec spec = PartitionSpec.builderFor(CUSTOMER_SCHEMA).identity("first_name").identity("last_name").build(); + Map> data = ImmutableMap.of( + Row.of("Alice", "Brown"), Collections.singletonList(CUSTOMER_RECORDS.get(0)), + Row.of("Bob", "Green"), Collections.singletonList(CUSTOMER_RECORDS.get(1)), + Row.of("Trudy", "Pink"), Collections.singletonList(CUSTOMER_RECORDS.get(2))); + String createSql = "CREATE EXTERNAL TABLE " + identifier + " (customer_id BIGINT) " + + "PARTITIONED BY (first_name STRING, last_name STRING) " + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + - testTables.locationForCreateTableSQL(identifier) + - "TBLPROPERTIES ('" + InputFormatConfig.PARTITION_SPEC + "'='" + PartitionSpecParser.toJson(spec) + "', " + - "'" + InputFormatConfig.TABLE_SCHEMA + "'='" + SchemaParser.toJson(CUSTOMER_SCHEMA) + "')"); - - org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier); - Assert.assertEquals(CUSTOMER_SCHEMA.asStruct(), icebergTable.schema().asStruct()); - Assert.assertEquals(spec, icebergTable.spec()); - - testTables.appendIcebergTable(shell.getHiveConf(), icebergTable, fileFormat, Row.of("Alice"), - Arrays.asList(CUSTOMER_RECORDS.get(0))); - testTables.appendIcebergTable(shell.getHiveConf(), icebergTable, fileFormat, Row.of("Bob"), - Arrays.asList(CUSTOMER_RECORDS.get(1))); - testTables.appendIcebergTable(shell.getHiveConf(), icebergTable, fileFormat, Row.of("Trudy"), - Arrays.asList(CUSTOMER_RECORDS.get(2))); - - List descRows = shell.executeStatement("SELECT * FROM default.customers ORDER BY customer_id DESC"); - - Assert.assertEquals(3, descRows.size()); - Assert.assertArrayEquals(new Object[] {2L, "Trudy", "Pink"}, descRows.get(0)); - Assert.assertArrayEquals(new Object[] {1L, "Bob", "Green"}, descRows.get(1)); - Assert.assertArrayEquals(new Object[] {0L, "Alice", "Brown"}, descRows.get(2)); + testTables.locationForCreateTableSQL(identifier); + runCreateAndReadTest(identifier, createSql, CUSTOMER_SCHEMA, spec, data); } @Test @@ -1027,4 +1038,25 @@ public void testStructOfStructsInTable() throws IOException { Assert.assertEquals(expectedInnerStruct.getField("value"), queryResult.get(0)[1]); } } + + private void runCreateAndReadTest(TableIdentifier identifier, String createSQL, Schema expectedSchema, + PartitionSpec expectedSpec, Map> data) throws IOException { + shell.executeStatement(createSQL); + + org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier); + Assert.assertEquals(expectedSchema.asStruct(), icebergTable.schema().asStruct()); + Assert.assertEquals(expectedSpec, icebergTable.spec()); + + List expected = Lists.newArrayList(); + for (StructLike partition : data.keySet()) { + testTables.appendIcebergTable(shell.getHiveConf(), icebergTable, fileFormat, partition, data.get(partition)); + expected.addAll(data.get(partition)); + } + + List descRows = shell.executeStatement("SELECT * FROM " + identifier.toString() + + " ORDER BY " + expectedSchema.columns().get(0).name() + " DESC"); + List records = HiveIcebergTestUtils.valueForRow(icebergTable.schema(), descRows); + + HiveIcebergTestUtils.validateData(expected, records, 0); + } }