diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 4f1d79bff1e96..b81e00b05940e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -318,7 +318,7 @@ private void bootstrapFromFilesystem(HoodieEngineContext engineContext, HoodieTa createInstantTime); }).forEach(status -> { HoodieWriteStat writeStat = new HoodieWriteStat(); - writeStat.setPath(partition + Path.SEPARATOR + status.getPath().getName()); + writeStat.setPath((partition.isEmpty() ? "" : partition + Path.SEPARATOR) + status.getPath().getName()); writeStat.setPartitionPath(partition); writeStat.setTotalWriteBytes(status.getLen()); commitMetadata.addWriteStat(partition, writeStat); @@ -374,9 +374,10 @@ private Map> getPartitionsToFilesMapping(HoodieTableMet .collect(Collectors.toList()); if (p.getRight().length > filesInDir.size()) { - // Is a partition. Add all data files to result. String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetMetaClient.getBasePath()), p.getLeft()); - partitionToFileStatus.put(partitionName, filesInDir); + // deal with Non-partition table, we should exclude .hoodie + partitionToFileStatus.put(partitionName, filesInDir.stream() + .filter(f -> !f.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)).collect(Collectors.toList())); } else { // Add sub-dirs to the queue pathsToList.addAll(Arrays.stream(p.getRight()) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala index 44be4d160c3da..a06eeb188d75a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala @@ -700,4 +700,50 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { } } } + + test("test Non partition table with metatable support") { + List(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).foreach { tableType => + initSparkContext("testNonPartitionTableWithMetaTable") + initSparkContext("test_schema_evolution") + val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") + val basePath = path.toAbsolutePath.toString + try { + val df = spark.range(0, 10).toDF("keyid") + .withColumn("col3", expr("keyid")) + .withColumn("age", expr("keyid + 1000")) + + df.write.format("hudi") + .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key, tableType) + .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY.key, "col3") + .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY.key, "keyid") + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY.key, "") + .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY.key, "org.apache.hudi.keygen.NonpartitionedKeyGenerator") + .option(DataSourceWriteOptions.OPERATION_OPT_KEY.key, "insert") + .option("hoodie.insert.shuffle.parallelism", "1") + .option("hoodie.metadata.enable", "true") + .option(HoodieWriteConfig.TABLE_NAME.key, "hoodie_test") + .mode(SaveMode.Overwrite).save(basePath) + // upsert same record again + val df_update = spark.range(0, 10).toDF("keyid") + .withColumn("col3", expr("keyid")) + .withColumn("age", expr("keyid + 2000")) + df_update.write.format("hudi") + .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key, tableType) + .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY.key, "col3") + .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY.key, "keyid") + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY.key, "") + .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY.key, "org.apache.hudi.keygen.NonpartitionedKeyGenerator") + .option(DataSourceWriteOptions.OPERATION_OPT_KEY.key, "upsert") + .option("hoodie.upsert.shuffle.parallelism", "1") + .option("hoodie.metadata.enable", "true") + .option(HoodieWriteConfig.TABLE_NAME.key, "hoodie_test") + .mode(SaveMode.Append).save(basePath) + assert(spark.read.format("hudi").load(basePath).count() == 10) + assert(spark.read.format("hudi").load(basePath).where("age >= 2000").count() == 10) + } finally { + spark.stop() + FileUtils.deleteDirectory(path.toFile) + } + } + } }