diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java index 66f05e00de25..e274ad857875 100644 --- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java @@ -42,6 +42,7 @@ import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import org.joda.time.DateTime; import org.junit.After; import org.junit.Assert; import org.junit.Assume; @@ -361,6 +362,25 @@ public void addPartitionToPartitioned() { sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", tableName)); } + @Test + public void addDataPartitionedByDateToPartitioned() { + createDatePartitionedFileTable("parquet"); + + String createIceberg = + "CREATE TABLE %s (id Integer, name String, dept String, date Date) USING iceberg PARTITIONED BY (date)"; + + sql(createIceberg, tableName); + + Object result = scalarSql("CALL %s.system.add_files('%s', '`parquet`.`%s`', map('date', '2021-01-01'))", + catalogName, tableName, fileTableDir.getAbsolutePath()); + + Assert.assertEquals(2L, result); + + assertEquals("Iceberg table contains correct data", + sql("SELECT id, name, dept, date FROM %s WHERE date = '2021-01-01' ORDER BY id", sourceTableName), + sql("SELECT id, name, dept, date FROM %s ORDER BY id", tableName)); + } + @Test public void addFilteredPartitionsToPartitioned() { createCompositePartitionedTable("parquet"); @@ -779,6 +799,25 @@ public void testPartitionedImportFromEmptyPartitionDoesNotThrow() { unpartitionedDF.col("dept"), unpartitionedDF.col("name").as("naMe")); + private static final StructField[] dateStruct = { + new StructField("id", DataTypes.IntegerType, true, Metadata.empty()), + new StructField("name", DataTypes.StringType, true, Metadata.empty()), + new StructField("dept", DataTypes.StringType, true, Metadata.empty()), + new StructField("ts", DataTypes.DateType, true, Metadata.empty()) + }; + + private static java.sql.Date toDate(String value) { + return new java.sql.Date(DateTime.parse(value).getMillis()); + } + + private static final Dataset dateDF = + spark.createDataFrame( + ImmutableList.of( + RowFactory.create(1, "John Doe", "hr", toDate("2021-01-01")), + RowFactory.create(2, "Jane Doe", "hr", toDate("2021-01-01")), + RowFactory.create(3, "Matt Doe", "hr", toDate("2021-01-02")), + RowFactory.create(4, "Will Doe", "facilities", toDate("2021-01-02"))), + new StructType(dateStruct)).repartition(2); private void createUnpartitionedFileTable(String format) { String createParquet = @@ -852,4 +891,13 @@ private void createPartitionedHiveTable() { partitionedDF.write().insertInto(sourceTableName); partitionedDF.write().insertInto(sourceTableName); } + + private void createDatePartitionedFileTable(String format) { + String createParquet = "CREATE TABLE %s (id Integer, name String, dept String, date Date) USING %s " + + "PARTITIONED BY (date) LOCATION '%s'"; + + sql(createParquet, sourceTableName, format, fileTableDir.getAbsolutePath()); + + dateDF.write().insertInto(sourceTableName); + } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java index 04ac3384ee6e..a55c11be944c 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java @@ -27,6 +27,7 @@ import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.MetadataTableUtils; @@ -48,6 +49,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.SparkTableUtil.SparkPartition; import org.apache.iceberg.spark.source.SparkTable; @@ -77,6 +79,7 @@ import org.apache.spark.sql.connector.expressions.Transform; import org.apache.spark.sql.execution.datasources.FileStatusCache; import org.apache.spark.sql.execution.datasources.InMemoryFileIndex; +import org.apache.spark.sql.execution.datasources.PartitionDirectory; import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation; import org.apache.spark.sql.types.IntegerType; import org.apache.spark.sql.types.LongType; @@ -745,9 +748,11 @@ public static TableIdentifier identifierToTableIdentifier(Identifier identifier) * @param spark a Spark session * @param rootPath a table identifier * @param format format of the file + * @param partitionFilter partitionFilter of the file * @return all table's partitions */ - public static List getPartitions(SparkSession spark, Path rootPath, String format) { + public static List getPartitions(SparkSession spark, Path rootPath, String format, + Map partitionFilter) { FileStatusCache fileStatusCache = FileStatusCache.getOrCreate(spark); Map emptyMap = Collections.emptyMap(); @@ -768,9 +773,23 @@ public static List getPartitions(SparkSession spark, Path rootPa org.apache.spark.sql.execution.datasources.PartitionSpec spec = fileIndex.partitionSpec(); StructType schema = spec.partitionColumns(); + if (schema.isEmpty()) { + return Lists.newArrayList(); + } + + List filterExpressions = + SparkUtil.partitionMapToExpression(schema, partitionFilter); + Seq scalaPartitionFilters = + JavaConverters.asScalaBufferConverter(filterExpressions).asScala().toSeq(); + + List dataFilters = Lists.newArrayList(); + Seq scalaDataFilters = + JavaConverters.asScalaBufferConverter(dataFilters).asScala().toSeq(); + + Seq filteredPartitions = fileIndex.listFiles(scalaPartitionFilters, scalaDataFilters); return JavaConverters - .seqAsJavaListConverter(spec.partitions()) + .seqAsJavaListConverter(filteredPartitions) .asJava() .stream() .map(partition -> { @@ -781,7 +800,11 @@ public static List getPartitions(SparkSession spark, Path rootPa Object value = CatalystTypeConverters.convertToScala(catalystValue, field.dataType()); values.put(field.name(), String.valueOf(value)); }); - return new SparkPartition(values, partition.path().toString(), format); + + FileStatus fileStatus = + JavaConverters.seqAsJavaListConverter(partition.files()).asJava().get(0); + + return new SparkPartition(values, fileStatus.getPath().getParent().toString(), format); }).collect(Collectors.toList()); } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java index 06f74d4fda06..7754aa406123 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java @@ -19,7 +19,10 @@ package org.apache.iceberg.spark; +import java.sql.Date; +import java.sql.Timestamp; import java.util.List; +import java.util.Map; import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; @@ -31,6 +34,7 @@ import org.apache.iceberg.hadoop.HadoopConfigurable; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.transforms.Transform; import org.apache.iceberg.transforms.UnknownTransform; import org.apache.iceberg.types.TypeUtil; @@ -38,7 +42,15 @@ import org.apache.iceberg.util.Pair; import org.apache.spark.sql.RuntimeConfig; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.expressions.BoundReference; +import org.apache.spark.sql.catalyst.expressions.EqualTo; +import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.catalyst.expressions.Literal; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructType; import org.apache.spark.util.SerializableConfiguration; +import org.joda.time.DateTime; public class SparkUtil { @@ -179,4 +191,63 @@ public static Configuration hadoopConfCatalogOverrides(SparkSession spark, Strin private static String hadoopConfPrefixForCatalog(String catalogName) { return String.format(SPARK_CATALOG_HADOOP_CONF_OVERRIDE_FMT_STR, catalogName); } + + /** + * Get a List of Spark filter Expression. + * + * @param schema table schema + * @param filters filters in the format of a Map, where key is one of the table column name, + * and value is the specific value to be filtered on the column. + * @return a List of filters in the format of Spark Expression. + */ + public static List partitionMapToExpression(StructType schema, + Map filters) { + List filterExpressions = Lists.newArrayList(); + for (Map.Entry entry : filters.entrySet()) { + try { + int index = schema.fieldIndex(entry.getKey()); + DataType dataType = schema.fields()[index].dataType(); + BoundReference ref = new BoundReference(index, dataType, true); + switch (dataType.typeName()) { + case "integer": + filterExpressions.add(new EqualTo(ref, + Literal.create(Integer.parseInt(entry.getValue()), DataTypes.IntegerType))); + break; + case "string": + filterExpressions.add(new EqualTo(ref, Literal.create(entry.getValue(), DataTypes.StringType))); + break; + case "short": + filterExpressions.add(new EqualTo(ref, + Literal.create(Short.parseShort(entry.getValue()), DataTypes.ShortType))); + break; + case "long": + filterExpressions.add(new EqualTo(ref, + Literal.create(Long.parseLong(entry.getValue()), DataTypes.LongType))); + break; + case "float": + filterExpressions.add(new EqualTo(ref, + Literal.create(Float.parseFloat(entry.getValue()), DataTypes.FloatType))); + break; + case "double": + filterExpressions.add(new EqualTo(ref, + Literal.create(Double.parseDouble(entry.getValue()), DataTypes.DoubleType))); + break; + case "date": + filterExpressions.add(new EqualTo(ref, + Literal.create(new Date(DateTime.parse(entry.getValue()).getMillis()), DataTypes.DateType))); + break; + case "timestamp": + filterExpressions.add(new EqualTo(ref, + Literal.create(new Timestamp(DateTime.parse(entry.getValue()).getMillis()), DataTypes.TimestampType))); + break; + default: + throw new IllegalStateException("Unexpected data type in partition filters: " + dataType); + } + } catch (IllegalArgumentException e) { + // ignore if filter is not on table columns + } + } + + return filterExpressions; + } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java index 328578674988..7e2253004210 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java @@ -159,7 +159,8 @@ private static void ensureNameMappingPresent(Table table) { private void importFileTable(Table table, Path tableLocation, String format, Map partitionFilter, boolean checkDuplicateFiles) { // List Partitions via Spark InMemory file search interface - List partitions = Spark3Util.getPartitions(spark(), tableLocation, format); + List partitions = + Spark3Util.getPartitions(spark(), tableLocation, format, partitionFilter); if (table.spec().isUnpartitioned()) { Preconditions.checkArgument(partitions.isEmpty(), "Cannot add partitioned files to an unpartitioned table"); @@ -171,12 +172,8 @@ private void importFileTable(Table table, Path tableLocation, String format, Map importPartitions(table, ImmutableList.of(partition), checkDuplicateFiles); } else { Preconditions.checkArgument(!partitions.isEmpty(), - "Cannot find any partitions in table %s", partitions); - List filteredPartitions = SparkTableUtil.filterPartitions(partitions, partitionFilter); - Preconditions.checkArgument(!filteredPartitions.isEmpty(), - "Cannot find any partitions which match the given filter. Partition filter is %s", - MAP_JOINER.join(partitionFilter)); - importPartitions(table, filteredPartitions, checkDuplicateFiles); + "Cannot find any matching partitions in table %s", partitions); + importPartitions(table, partitions, checkDuplicateFiles); } }