diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveTypeToIcebergType.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveTypeToIcebergType.java new file mode 100644 index 0000000000..261ee795e9 --- /dev/null +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveTypeToIcebergType.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hive.legacy; + +import com.google.common.collect.Lists; +import java.util.List; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + + +public class HiveTypeToIcebergType extends HiveTypeUtil.HiveSchemaVisitor { + private int nextId = 1; + + @Override + public Type struct(StructTypeInfo struct, List names, List fieldResults) { + List fields = Lists.newArrayListWithExpectedSize(fieldResults.size()); + for (int i = 0; i < names.size(); i++) { + fields.add(Types.NestedField.optional(allocateId(), names.get(i), fieldResults.get(i))); + } + return Types.StructType.of(fields); + } + + @Override + public Type map(MapTypeInfo map, Type keyResult, Type valueResult) { + return Types.MapType.ofOptional(allocateId(), allocateId(), keyResult, valueResult); + } + + @Override + public Type list(ListTypeInfo list, Type elementResult) { + return Types.ListType.ofOptional(allocateId(), elementResult); + } + + @Override + public Type primitive(PrimitiveTypeInfo primitive) { + switch (primitive.getPrimitiveCategory()) { + case FLOAT: + return Types.FloatType.get(); + case DOUBLE: + return Types.DoubleType.get(); + case BOOLEAN: + return Types.BooleanType.get(); + case BYTE: + case SHORT: + case INT: + return Types.IntegerType.get(); + case LONG: + return Types.LongType.get(); + case CHAR: + case VARCHAR: + case STRING: + return Types.StringType.get(); + case BINARY: + return Types.BinaryType.get(); + case DECIMAL: + DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) primitive; + return Types.DecimalType.of(decimalTypeInfo.precision(), decimalTypeInfo.scale()); + case TIMESTAMP: + return Types.TimestampType.withoutZone(); + case DATE: + return Types.DateType.get(); + default: + throw new UnsupportedOperationException("Unsupported primitive type " + primitive); + } + } + + private int allocateId() { + int current = nextId; + nextId += 1; + return current; + } +} diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveTypeUtil.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveTypeUtil.java new file mode 100644 index 0000000000..a7b4848b4f --- /dev/null +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveTypeUtil.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hive.legacy; + +import com.google.common.collect.Lists; +import java.util.List; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +public class HiveTypeUtil { + private HiveTypeUtil() { + } + + public static T visit(TypeInfo typeInfo, HiveSchemaVisitor visitor) { + switch (typeInfo.getCategory()) { + case STRUCT: + final StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo; + List names = structTypeInfo.getAllStructFieldNames(); + List results = Lists.newArrayListWithExpectedSize(names.size()); + for (String name : names) { + results.add(visit(structTypeInfo.getStructFieldTypeInfo(name), visitor)); + } + return visitor.struct(structTypeInfo, names, results); + + case UNION: + throw new UnsupportedOperationException("Union data type not supported : " + typeInfo); + + case LIST: + ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo; + return visitor.list(listTypeInfo, visit(listTypeInfo.getListElementTypeInfo(), visitor)); + + case MAP: + final MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo; + return visitor.map(mapTypeInfo, + visit(mapTypeInfo.getMapKeyTypeInfo(), visitor), + visit(mapTypeInfo.getMapValueTypeInfo(), visitor)); + + default: + final PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo; + return visitor.primitive(primitiveTypeInfo); + } + } + + public static class HiveSchemaVisitor { + public T struct(StructTypeInfo struct, List names, List fieldResults) { + return null; + } + + public T list(ListTypeInfo list, T elementResult) { + return null; + } + + public T map(MapTypeInfo map, T keyResult, T valueResult) { + return null; + } + + public T primitive(PrimitiveTypeInfo primitive) { + return null; + } + } +} diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableOperations.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableOperations.java index 52b01b698c..2c95d33f7f 100644 --- a/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableOperations.java +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableOperations.java @@ -112,10 +112,11 @@ Iterable> getFilesByFilter(Expression expression) { matchingDirectories = getDirectoryInfosByFilter(expression); } - Iterable> filesPerDirectory = Iterables.transform(matchingDirectories, directory -> { - return Iterables.transform(FileSystemUtils.listFiles(directory.location(), conf), - file -> createDataFile(file, current().spec(), directory.partitionData(), directory.format())); - }); + Iterable> filesPerDirectory = Iterables.transform( + matchingDirectories, + directory -> Iterables.transform( + FileSystemUtils.listFiles(directory.location(), conf), + file -> createDataFile(file, current().spec(), directory.partitionData(), directory.format()))); // Note that we return an Iterable of Iterables here so that the TableScan can process iterables of individual // directories in parallel hence resulting in a parallel file listing @@ -167,7 +168,7 @@ private List getDirectoryInfosByFilter(Expression expression) { client -> client.listPartitionsByFilter(databaseName, tableName, partitionFilterString, (short) -1)); } - return LegacyHiveTableUtils.toDirectoryInfos(partitions); + return LegacyHiveTableUtils.toDirectoryInfos(partitions, current().spec()); } catch (TException e) { String errMsg = String.format("Failed to get partition info for %s.%s + expression %s from metastore", databaseName, tableName, expression); diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableScan.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableScan.java index 7e375caecb..c4c0e0ba4d 100644 --- a/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableScan.java +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableScan.java @@ -73,8 +73,12 @@ public CloseableIterable planFiles() { String specString = PartitionSpecParser.toJson(spec); ResidualEvaluator residuals = ResidualEvaluator.of(spec, filter(), isCaseSensitive()); - Iterable> tasks = Iterables.transform(hiveOps.getFilesByFilter(filter()), fileIterable -> - Iterables.transform(fileIterable, file -> new BaseFileScanTask(file, schemaString, specString, residuals))); + Iterable> tasks = Iterables.transform( + hiveOps.getFilesByFilter(filter()), + fileIterable -> + Iterables.transform( + fileIterable, + file -> new BaseFileScanTask(file, schemaString, specString, residuals))); return new ParallelIterable<>(tasks, ThreadPools.getWorkerPool()); } diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableUtils.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableUtils.java index f2fcc31db7..382cb3a922 100644 --- a/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableUtils.java +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableUtils.java @@ -19,8 +19,7 @@ package org.apache.iceberg.hive.legacy; -import com.google.common.base.Preconditions; -import java.util.ArrayList; +import com.google.common.collect.Lists; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -28,11 +27,15 @@ import java.util.stream.Collectors; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,41 +43,49 @@ class LegacyHiveTableUtils { - private LegacyHiveTableUtils() {} + private LegacyHiveTableUtils() { + } private static final Logger LOG = LoggerFactory.getLogger(LegacyHiveTableUtils.class); static Schema getSchema(org.apache.hadoop.hive.metastore.api.Table table) { Map props = getTableProperties(table); String schemaStr = props.get("avro.schema.literal"); - Schema schema; if (schemaStr == null) { LOG.warn("Table {}.{} does not have an avro.schema.literal set; using Hive schema instead. The schema will not" + - " have case sensitivity and nullability information", table.getDbName(), table.getTableName()); + " have case sensitivity and nullability information", table.getDbName(), table.getTableName()); // TODO: Add support for tables without avro.schema.literal throw new UnsupportedOperationException("Reading tables without avro.schema.literal not implemented yet"); - } else { - schema = AvroSchemaUtil.toIceberg(new org.apache.avro.Schema.Parser().parse(schemaStr)); } - List partCols = table.getPartitionKeys().stream().map(FieldSchema::getName).collect(Collectors.toList()); - return addPartitionColumnsIfRequired(schema, partCols); + Schema schema = AvroSchemaUtil.toIceberg(new org.apache.avro.Schema.Parser().parse(schemaStr)); + Types.StructType dataStructType = schema.asStruct(); + List fields = Lists.newArrayList(dataStructType.fields()); + + Schema partitionSchema = partitionSchema(table.getPartitionKeys(), schema); + Types.StructType partitionStructType = partitionSchema.asStruct(); + fields.addAll(partitionStructType.fields()); + return new Schema(fields); } - private static Schema addPartitionColumnsIfRequired(Schema schema, List partitionColumns) { - List fields = new ArrayList<>(schema.columns()); + private static Schema partitionSchema(List partitionKeys, Schema dataSchema) { AtomicInteger fieldId = new AtomicInteger(10000); - partitionColumns.stream().forEachOrdered(column -> { - Types.NestedField field = schema.findField(column); - if (field == null) { - // TODO: Support partition fields with non-string types - fields.add(Types.NestedField.required(fieldId.incrementAndGet(), column, Types.StringType.get())); - } else { - Preconditions.checkArgument(field.type().equals(Types.StringType.get()), - "Tables with non-string partition columns not supported yet"); + List partitionFields = Lists.newArrayList(); + partitionKeys.forEach(f -> { + Types.NestedField field = dataSchema.findField(f.getName()); + if (field != null) { + throw new IllegalStateException(String.format("Partition field %s also present in data", field.name())); } + partitionFields.add( + Types.NestedField.optional( + fieldId.incrementAndGet(), f.getName(), icebergType(f.getType()), f.getComment())); }); - return new Schema(fields); + return new Schema(partitionFields); + } + + private static Type icebergType(String hiveTypeString) { + PrimitiveTypeInfo primitiveTypeInfo = TypeInfoFactory.getPrimitiveTypeInfo(hiveTypeString); + return HiveTypeUtil.visit(primitiveTypeInfo, new HiveTypeToIcebergType()); } static Map getTableProperties(org.apache.hadoop.hive.metastore.api.Table table) { @@ -87,32 +98,28 @@ static Map getTableProperties(org.apache.hadoop.hive.metastore.a static PartitionSpec getPartitionSpec(org.apache.hadoop.hive.metastore.api.Table table, Schema schema) { PartitionSpec.Builder builder = PartitionSpec.builderFor(schema); - - table.getPartitionKeys().forEach(fieldSchema -> { - // TODO: Support partition fields with non-string types - Preconditions.checkArgument(fieldSchema.getType().equals("string"), - "Tables with non-string partition columns not supported yet"); - builder.identity(fieldSchema.getName()); - }); - + table.getPartitionKeys().forEach(fieldSchema -> builder.identity(fieldSchema.getName())); return builder.build(); } static DirectoryInfo toDirectoryInfo(org.apache.hadoop.hive.metastore.api.Table table) { return new DirectoryInfo(table.getSd().getLocation(), - serdeToFileFormat(table.getSd().getSerdeInfo().getSerializationLib()), null); + serdeToFileFormat(table.getSd().getSerdeInfo().getSerializationLib()), null); } - static List toDirectoryInfos(List partitions) { - return partitions.stream().map(p -> { - return new DirectoryInfo(p.getSd().getLocation(), - serdeToFileFormat(p.getSd().getSerdeInfo().getSerializationLib()), buildPartitionStructLike(p.getValues())); - }).collect(Collectors.toList()); + static List toDirectoryInfos(List partitions, PartitionSpec spec) { + return partitions.stream().map( + p -> new DirectoryInfo( + p.getSd().getLocation(), + serdeToFileFormat( + p.getSd().getSerdeInfo().getSerializationLib()), + buildPartitionStructLike(p.getValues(), spec)) + ).collect(Collectors.toList()); } - private static StructLike buildPartitionStructLike(List partitionValues) { + private static StructLike buildPartitionStructLike(List partitionValues, PartitionSpec spec) { + List fields = spec.partitionType().fields(); return new StructLike() { - @Override public int size() { return partitionValues.size(); @@ -120,7 +127,10 @@ public int size() { @Override public T get(int pos, Class javaClass) { - return javaClass.cast(partitionValues.get(pos)); + final Object partitionValue = Conversions.fromPartitionString( + fields.get(pos).type(), + partitionValues.get(pos)); + return javaClass.cast(partitionValue); } @Override diff --git a/hive/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java b/hive/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java index 4f7a5b0158..8e9852d612 100644 --- a/hive/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java +++ b/hive/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java @@ -111,7 +111,7 @@ private HiveConf newHiveConf(int port) { HiveConf newHiveConf = new HiveConf(new Configuration(), TestHiveMetastore.class); newHiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, "thrift://localhost:" + port); newHiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, "file:" + hiveLocalDir.getAbsolutePath()); - newHiveConf.set(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, "false"); + newHiveConf.set(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, "true"); newHiveConf.set("iceberg.hive.client-pool-size", "2"); return newHiveConf; } diff --git a/hive/src/test/java/org/apache/iceberg/hive/legacy/TestLegacyHiveTableScan.java b/hive/src/test/java/org/apache/iceberg/hive/legacy/TestLegacyHiveTableScan.java index 277ab75f1a..f0d9f1a7cb 100644 --- a/hive/src/test/java/org/apache/iceberg/hive/legacy/TestLegacyHiveTableScan.java +++ b/hive/src/test/java/org/apache/iceberg/hive/legacy/TestLegacyHiveTableScan.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import java.io.IOException; import java.net.URI; import java.nio.file.Files; @@ -42,6 +43,7 @@ import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.catalog.TableIdentifier; @@ -60,12 +62,13 @@ public class TestLegacyHiveTableScan extends HiveMetastoreTest { - private static final List DATA_COLUMNS = ImmutableList.of( new FieldSchema("strCol", "string", ""), new FieldSchema("intCol", "int", "")); private static final List PARTITION_COLUMNS = ImmutableList.of( - new FieldSchema("pcol", "string", "")); + new FieldSchema("pcol", "string", ""), + new FieldSchema("pIntCol", "int", "")); + private static HiveCatalog legacyCatalog; private static Path dbPath; @@ -93,44 +96,58 @@ public void testHiveScanUnpartitioned() throws Exception { public void testHiveScanSinglePartition() throws Exception { String tableName = "single_partition"; Table table = createTable(tableName, DATA_COLUMNS, PARTITION_COLUMNS); - addPartition(table, ImmutableList.of("1"), AVRO, "A", "B"); - filesMatch(ImmutableMap.of("pcol=1/A", AVRO, "pcol=1/B", AVRO), hiveScan(table)); + addPartition(table, ImmutableList.of("ds", 1), AVRO, "A", "B"); + filesMatch(ImmutableMap.of("pcol=ds/pIntCol=1/B", AVRO, "pcol=ds/pIntCol=1/A", AVRO), hiveScan(table)); } @Test public void testHiveScanMultiPartition() throws Exception { String tableName = "multi_partition"; Table table = createTable(tableName, DATA_COLUMNS, PARTITION_COLUMNS); - addPartition(table, ImmutableList.of("1"), AVRO, "A"); - addPartition(table, ImmutableList.of("2"), AVRO, "B"); - filesMatch(ImmutableMap.of("pcol=1/A", AVRO, "pcol=2/B", AVRO), hiveScan(table)); + addPartition(table, ImmutableList.of("ds", 1), AVRO, "A"); + addPartition(table, ImmutableList.of("ds", 2), AVRO, "B"); + filesMatch(ImmutableMap.of("pcol=ds/pIntCol=2/B", AVRO, "pcol=ds/pIntCol=1/A", AVRO), hiveScan(table)); } @Test public void testHiveScanMultiPartitionWithFilter() throws Exception { String tableName = "multi_partition_with_filter"; Table table = createTable(tableName, DATA_COLUMNS, PARTITION_COLUMNS); - addPartition(table, ImmutableList.of("1"), AVRO, "A"); - addPartition(table, ImmutableList.of("2"), AVRO, "B"); - filesMatch(ImmutableMap.of("pcol=2/B", AVRO), hiveScan(table, Expressions.equal("pcol", "2"))); + addPartition(table, ImmutableList.of("ds", 1), AVRO, "A"); + addPartition(table, ImmutableList.of("ds", 2), AVRO, "B"); + filesMatch( + ImmutableMap.of("pcol=ds/pIntCol=1/A", AVRO, "pcol=ds/pIntCol=2/B", AVRO), + hiveScan(table, Expressions.equal("pcol", "ds"))); + } + + @Test + public void testHiveScanNonStringPartitionQuery() throws Exception { + String tableName = "multi_partition_with_filter_on_non_string_partition_cols"; + Table table = createTable(tableName, DATA_COLUMNS, PARTITION_COLUMNS); + AssertHelpers.assertThrows( + "Filtering on non string partition is not supported by ORM layer and we can enable direct sql only on mysql", + RuntimeException.class, "Failed to get partition info", + () -> hiveScan(table, Expressions.and(Expressions.equal("pcol", "ds"), Expressions.equal("pIntCol", "1")))); } @Test public void testHiveScanMultiPartitionWithNonPartitionFilter() throws Exception { String tableName = "multi_partition_with_non_partition_filter"; Table table = createTable(tableName, DATA_COLUMNS, PARTITION_COLUMNS); - addPartition(table, ImmutableList.of("1"), AVRO, "A"); - addPartition(table, ImmutableList.of("2"), AVRO, "B"); - filesMatch(ImmutableMap.of("pcol=1/A", AVRO, "pcol=2/B", AVRO), hiveScan(table, Expressions.equal("intCol", 1))); + addPartition(table, ImmutableList.of("ds", 1), AVRO, "A"); + addPartition(table, ImmutableList.of("ds", 2), AVRO, "B"); + filesMatch( + ImmutableMap.of("pcol=ds/pIntCol=1/A", AVRO, "pcol=ds/pIntCol=2/B", AVRO), + hiveScan(table, Expressions.equal("intCol", 1))); } @Test public void testHiveScanHybridTable() throws Exception { String tableName = "hybrid_table"; Table table = createTable(tableName, DATA_COLUMNS, PARTITION_COLUMNS); - addPartition(table, ImmutableList.of("1"), AVRO, "A"); - addPartition(table, ImmutableList.of("2"), ORC, "B"); - filesMatch(ImmutableMap.of("pcol=1/A", AVRO, "pcol=2/B", ORC), hiveScan(table)); + addPartition(table, ImmutableList.of("ds", 1), AVRO, "A"); + addPartition(table, ImmutableList.of("ds", 2), ORC, "B"); + filesMatch(ImmutableMap.of("pcol=ds/pIntCol=1/A", AVRO, "pcol=ds/pIntCol=2/B", ORC), hiveScan(table)); } private static Table createTable(String tableName, List columns, List partitionColumns) @@ -191,7 +208,7 @@ private static Path location(Table table) { return Paths.get(table.getSd().getLocation()); } - private static Path location(Table table, List partitionValues) { + private static Path location(Table table, List partitionValues) { Path partitionLocation = location(table); for (int i = 0; i < table.getPartitionKeysSize(); i++) { partitionLocation = partitionLocation.resolve( @@ -208,13 +225,13 @@ private void addFiles(Table table, FileFormat format, String... fileNames) throw } } - private void addPartition(Table table, List partitionValues, FileFormat format, String... fileNames) + private void addPartition(Table table, List partitionValues, FileFormat format, String... fileNames) throws Exception { Path partitionLocation = location(table, partitionValues); Files.createDirectories(partitionLocation); long currentTimeMillis = System.currentTimeMillis(); metastoreClient.add_partition(new Partition( - partitionValues, + Lists.transform(partitionValues, Object::toString), table.getDbName(), table.getTableName(), (int) currentTimeMillis / 1000, @@ -237,9 +254,11 @@ private Map hiveScan(Table table, Expression filter) { CloseableIterable fileScanTasks = legacyCatalog .loadTable(TableIdentifier.of(table.getDbName(), table.getTableName())) .newScan().filter(filter).planFiles(); - return StreamSupport.stream(fileScanTasks.spliterator(), false).collect(Collectors.toMap( - f -> tableLocation.relativize(Paths.get(URI.create(f.file().path().toString()))).toString().split("\\.")[0], - f -> f.file().format())); + return StreamSupport + .stream(fileScanTasks.spliterator(), false) + .collect(Collectors.toMap( + f -> tableLocation.relativize(Paths.get(URI.create(f.file().path().toString()))).toString().split("\\.")[0], + f -> f.file().format())); } private static void filesMatch(Map expected, Map actual) {