Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ project(':iceberg-mr') {

test {
// testJoinTables / testScanTable
maxHeapSize '1500m'
maxHeapSize '2500m'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this needed? Additional tasks because of partitioning?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. The extra tasks eat more memory :(

}
}

Expand Down Expand Up @@ -608,7 +608,7 @@ if (jdkVersion == '8') {

test {
// testJoinTables / testScanTable
maxHeapSize '1500m'
maxHeapSize '2500m'
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -63,6 +64,18 @@ public static Schema convert(List<FieldSchema> fieldSchemas) {
return HiveSchemaConverter.convert(names, typeInfos);
}

/**
* Converts the Hive partition columns to Iceberg identity partition specification.
* @param schema The Iceberg schema
* @param fieldSchemas The partition column specification
* @return The Iceberg partition specification
*/
public static PartitionSpec spec(Schema schema, List<FieldSchema> fieldSchemas) {
PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
fieldSchemas.forEach(fieldSchema -> builder.identity(fieldSchema.getName()));
return builder.build();
}

/**
* Converts the Hive list of column names and column types to an Iceberg schema.
* @param names The list of the Hive column names
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@

package org.apache.iceberg.mr.hive;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.CatalogUtil;
Expand Down Expand Up @@ -93,11 +96,16 @@ public void preCreateTable(org.apache.hadoop.hive.metastore.api.Table hmsTable)
// If the table does not exist collect data for table creation
// - InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC takes precedence so the user can override the
// Iceberg schema and specification generated by the code
// - Partitioned Hive tables are currently not allowed

Schema schema = schema(catalogProperties, hmsTable);
PartitionSpec spec = spec(schema, catalogProperties, hmsTable);

// If there are partition keys specified remove them from the HMS table and add them to the column list
if (hmsTable.isSetPartitionKeys()) {
hmsTable.getSd().getCols().addAll(hmsTable.getPartitionKeys());
hmsTable.setPartitionKeysIsSet(false);
}

catalogProperties.put(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(schema));
catalogProperties.put(InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(spec));

Expand Down Expand Up @@ -197,20 +205,32 @@ private static Schema schema(Properties properties, org.apache.hadoop.hive.metas
if (properties.getProperty(InputFormatConfig.TABLE_SCHEMA) != null) {
return SchemaParser.fromJson(properties.getProperty(InputFormatConfig.TABLE_SCHEMA));
} else {
return HiveSchemaUtil.convert(hmsTable.getSd().getCols());
if (hmsTable.isSetPartitionKeys() && !hmsTable.getPartitionKeys().isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would isSetPartitionKeys() be true and getPartitionKeys() empty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theoretically it could be set to an empty list. Checked this way to keep on the safe side.

// Add partitioning columns to the original column list before creating the Iceberg Schema
List<FieldSchema> cols = new ArrayList<>(hmsTable.getSd().getCols());
cols.addAll(hmsTable.getPartitionKeys());
return HiveSchemaUtil.convert(cols);
} else {
return HiveSchemaUtil.convert(hmsTable.getSd().getCols());
}
}
}

private static PartitionSpec spec(Schema schema, Properties properties,
org.apache.hadoop.hive.metastore.api.Table hmsTable) {

Preconditions.checkArgument(hmsTable.getPartitionKeys() == null || hmsTable.getPartitionKeys().isEmpty(),
"Partitioned Hive tables are currently not supported");

if (properties.getProperty(InputFormatConfig.PARTITION_SPEC) != null) {
Preconditions.checkArgument(!hmsTable.isSetPartitionKeys() || hmsTable.getPartitionKeys().isEmpty(),
"Provide only one of the following: Hive partition specification, or the " +
InputFormatConfig.PARTITION_SPEC + " property");
return PartitionSpecParser.fromJson(schema, properties.getProperty(InputFormatConfig.PARTITION_SPEC));
} else {
return PartitionSpec.unpartitioned();
if (hmsTable.isSetPartitionKeys() && !hmsTable.getPartitionKeys().isEmpty()) {
// If the table is partitioned then generate the identity partition definitions for the Iceberg table
return HiveSchemaUtil.spec(schema, hmsTable.getPartitionKeys());
} else {
return PartitionSpec.unpartitioned();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,22 @@ public static List<Object> valuesForTestRecord(Record record) {
);
}

/**
* Converts a list of Object arrays to a list of Iceberg records.
* @param schema The schema of the Iceberg record
* @param rows The data of the records
* @return The list of the converted records
*/
public static List<Record> valueForRow(Schema schema, List<Object[]> rows) {
return rows.stream().map(row -> {
Record record = GenericRecord.create(schema);
for (int i = 0; i < row.length; ++i) {
record.set(i, row[i]);
}
return record;
}).collect(Collectors.toList());
}

/**
* Check if 2 Iceberg records are the same or not. Compares OffsetDateTimes only by the Intant they represent.
* @param expected The expected record
Expand Down Expand Up @@ -219,13 +235,24 @@ public static void validateData(Table table, List<Record> expected, int sortBy)
iterable.forEach(records::add);
}

validateData(expected, records, sortBy);
}

/**
* Validates whether the 2 sets of records are the same. The results should be sorted by a unique key so we do
* not end up with flaky tests.
* @param expected The expected list of Records (The list will be sorted)
* @param actual The actual list of Records (The list will be sorted)
* @param sortBy The column position by which we will sort
*/
public static void validateData(List<Record> expected, List<Record> actual, int sortBy) {
// Sort based on the specified column
expected.sort(Comparator.comparingLong(record -> (Long) record.get(sortBy)));
records.sort(Comparator.comparingLong(record -> (Long) record.get(sortBy)));
actual.sort(Comparator.comparingLong(record -> (Long) record.get(sortBy)));

Assert.assertEquals(expected.size(), records.size());
Assert.assertEquals(expected.size(), actual.size());
for (int i = 0; i < expected.size(); ++i) {
assertEquals(expected.get(i), records.get(i));
assertEquals(expected.get(i), actual.get(i));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -41,6 +40,7 @@
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TestHelpers.Row;
import org.apache.iceberg.catalog.TableIdentifier;
Expand All @@ -52,7 +52,9 @@
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.TestHelper;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.thrift.TException;
Expand Down Expand Up @@ -645,66 +647,75 @@ public void selectSameColumnTwice() throws IOException {
@Test
public void testCreateTableWithColumnSpecification() throws IOException {
TableIdentifier identifier = TableIdentifier.of("default", "customers");

shell.executeStatement("CREATE EXTERNAL TABLE customers (customer_id BIGINT, first_name STRING, last_name STRING)" +
Map<StructLike, List<Record>> data = new HashMap<>(1);
data.put(null, CUSTOMER_RECORDS);
String createSql = "CREATE EXTERNAL TABLE " + identifier +
" (customer_id BIGINT, first_name STRING, last_name STRING)" +
" STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
testTables.locationForCreateTableSQL(identifier));

// Check the Iceberg table data
org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
Assert.assertEquals(CUSTOMER_SCHEMA.asStruct(), icebergTable.schema().asStruct());
Assert.assertEquals(SPEC, icebergTable.spec());

testTables.appendIcebergTable(shell.getHiveConf(), icebergTable, fileFormat, null, CUSTOMER_RECORDS);
testTables.locationForCreateTableSQL(identifier);
runCreateAndReadTest(identifier, createSql, CUSTOMER_SCHEMA, PartitionSpec.unpartitioned(), data);
}

List<Object[]> descRows = shell.executeStatement("SELECT * FROM default.customers ORDER BY customer_id DESC");
@Test
public void testCreateTableWithColumnSpecificationPartitioned() throws IOException {
TableIdentifier identifier = TableIdentifier.of("default", "customers");
PartitionSpec spec = PartitionSpec.builderFor(CUSTOMER_SCHEMA).identity("last_name").build();
Map<StructLike, List<Record>> data = ImmutableMap.of(
Row.of("Brown"), Collections.singletonList(CUSTOMER_RECORDS.get(0)),
Row.of("Green"), Collections.singletonList(CUSTOMER_RECORDS.get(1)),
Row.of("Pink"), Collections.singletonList(CUSTOMER_RECORDS.get(2)));
String createSql = "CREATE EXTERNAL TABLE " + identifier +
" (customer_id BIGINT, first_name STRING) PARTITIONED BY (last_name STRING) " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
testTables.locationForCreateTableSQL(identifier);
runCreateAndReadTest(identifier, createSql, CUSTOMER_SCHEMA, spec, data);
}

Assert.assertEquals(3, descRows.size());
Assert.assertArrayEquals(new Object[] {2L, "Trudy", "Pink"}, descRows.get(0));
Assert.assertArrayEquals(new Object[] {1L, "Bob", "Green"}, descRows.get(1));
Assert.assertArrayEquals(new Object[] {0L, "Alice", "Brown"}, descRows.get(2));
@Test
public void testCreatePartitionedTableByProperty() throws IOException {
TableIdentifier identifier = TableIdentifier.of("default", "customers");
PartitionSpec spec = PartitionSpec.builderFor(CUSTOMER_SCHEMA).identity("last_name").build();
Map<StructLike, List<Record>> data = ImmutableMap.of(
Row.of("Brown"), Collections.singletonList(CUSTOMER_RECORDS.get(0)),
Row.of("Green"), Collections.singletonList(CUSTOMER_RECORDS.get(1)),
Row.of("Pink"), Collections.singletonList(CUSTOMER_RECORDS.get(2)));
String createSql = "CREATE EXTERNAL TABLE " + identifier +
" STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
testTables.locationForCreateTableSQL(identifier) +
"TBLPROPERTIES ('" + InputFormatConfig.PARTITION_SPEC + "'='" + PartitionSpecParser.toJson(spec) + "', " +
"'" + InputFormatConfig.TABLE_SCHEMA + "'='" + SchemaParser.toJson(CUSTOMER_SCHEMA) + "')";
runCreateAndReadTest(identifier, createSql, CUSTOMER_SCHEMA, spec, data);
}

@Test
public void testCreateTableWithColumnSpecificationPartitioned() {
public void testCreatePartitionedTableWithPropertiesAndWithColumnSpecification() {
PartitionSpec spec = PartitionSpec.builderFor(CUSTOMER_SCHEMA).identity("last_name").build();

AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class,
"currently not supported", () -> {
"Provide only one of the following", () -> {
shell.executeStatement("CREATE EXTERNAL TABLE customers (customer_id BIGINT) " +
"PARTITIONED BY (first_name STRING) " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
testTables.locationForCreateTableSQL(TableIdentifier.of("default", "customers")));
testTables.locationForCreateTableSQL(TableIdentifier.of("default", "customers")) +
" TBLPROPERTIES ('" + InputFormatConfig.PARTITION_SPEC + "'='" +
PartitionSpecParser.toJson(spec) + "')");
}
);
}

@Test
public void testCreatePartitionedTable() throws IOException {
public void testCreateTableWithColumnSpecificationMultilevelPartitioned() throws IOException {
TableIdentifier identifier = TableIdentifier.of("default", "customers");
PartitionSpec spec = PartitionSpec.builderFor(CUSTOMER_SCHEMA).identity("first_name").build();

shell.executeStatement("CREATE EXTERNAL TABLE customers " +
PartitionSpec spec = PartitionSpec.builderFor(CUSTOMER_SCHEMA).identity("first_name").identity("last_name").build();
Map<StructLike, List<Record>> data = ImmutableMap.of(
Row.of("Alice", "Brown"), Collections.singletonList(CUSTOMER_RECORDS.get(0)),
Row.of("Bob", "Green"), Collections.singletonList(CUSTOMER_RECORDS.get(1)),
Row.of("Trudy", "Pink"), Collections.singletonList(CUSTOMER_RECORDS.get(2)));
String createSql = "CREATE EXTERNAL TABLE " + identifier + " (customer_id BIGINT) " +
"PARTITIONED BY (first_name STRING, last_name STRING) " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
testTables.locationForCreateTableSQL(identifier) +
"TBLPROPERTIES ('" + InputFormatConfig.PARTITION_SPEC + "'='" + PartitionSpecParser.toJson(spec) + "', " +
"'" + InputFormatConfig.TABLE_SCHEMA + "'='" + SchemaParser.toJson(CUSTOMER_SCHEMA) + "')");

org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
Assert.assertEquals(CUSTOMER_SCHEMA.asStruct(), icebergTable.schema().asStruct());
Assert.assertEquals(spec, icebergTable.spec());

testTables.appendIcebergTable(shell.getHiveConf(), icebergTable, fileFormat, Row.of("Alice"),
Arrays.asList(CUSTOMER_RECORDS.get(0)));
testTables.appendIcebergTable(shell.getHiveConf(), icebergTable, fileFormat, Row.of("Bob"),
Arrays.asList(CUSTOMER_RECORDS.get(1)));
testTables.appendIcebergTable(shell.getHiveConf(), icebergTable, fileFormat, Row.of("Trudy"),
Arrays.asList(CUSTOMER_RECORDS.get(2)));

List<Object[]> descRows = shell.executeStatement("SELECT * FROM default.customers ORDER BY customer_id DESC");

Assert.assertEquals(3, descRows.size());
Assert.assertArrayEquals(new Object[] {2L, "Trudy", "Pink"}, descRows.get(0));
Assert.assertArrayEquals(new Object[] {1L, "Bob", "Green"}, descRows.get(1));
Assert.assertArrayEquals(new Object[] {0L, "Alice", "Brown"}, descRows.get(2));
testTables.locationForCreateTableSQL(identifier);
runCreateAndReadTest(identifier, createSql, CUSTOMER_SCHEMA, spec, data);
}

@Test
Expand Down Expand Up @@ -1027,4 +1038,25 @@ public void testStructOfStructsInTable() throws IOException {
Assert.assertEquals(expectedInnerStruct.getField("value"), queryResult.get(0)[1]);
}
}

private void runCreateAndReadTest(TableIdentifier identifier, String createSQL, Schema expectedSchema,
PartitionSpec expectedSpec, Map<StructLike, List<Record>> data) throws IOException {
shell.executeStatement(createSQL);

org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
Assert.assertEquals(expectedSchema.asStruct(), icebergTable.schema().asStruct());
Assert.assertEquals(expectedSpec, icebergTable.spec());

List<Record> expected = Lists.newArrayList();
for (StructLike partition : data.keySet()) {
testTables.appendIcebergTable(shell.getHiveConf(), icebergTable, fileFormat, partition, data.get(partition));
expected.addAll(data.get(partition));
}

List<Object[]> descRows = shell.executeStatement("SELECT * FROM " + identifier.toString() +
" ORDER BY " + expectedSchema.columns().get(0).name() + " DESC");
List<Record> records = HiveIcebergTestUtils.valueForRow(icebergTable.schema(), descRows);

HiveIcebergTestUtils.validateData(expected, records, 0);
}
}