Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ private void bootstrapFromFilesystem(HoodieEngineContext engineContext, HoodieTa
createInstantTime);
}).forEach(status -> {
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setPath(partition + Path.SEPARATOR + status.getPath().getName());
writeStat.setPath((partition.isEmpty() ? "" : partition + Path.SEPARATOR) + status.getPath().getName());
writeStat.setPartitionPath(partition);
writeStat.setTotalWriteBytes(status.getLen());
commitMetadata.addWriteStat(partition, writeStat);
Expand Down Expand Up @@ -374,9 +374,10 @@ private Map<String, List<FileStatus>> getPartitionsToFilesMapping(HoodieTableMet
.collect(Collectors.toList());

if (p.getRight().length > filesInDir.size()) {
// Is a partition. Add all data files to result.
String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetMetaClient.getBasePath()), p.getLeft());
partitionToFileStatus.put(partitionName, filesInDir);
// deal with Non-partition table, we should exclude .hoodie
partitionToFileStatus.put(partitionName, filesInDir.stream()
.filter(f -> !f.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)).collect(Collectors.toList()));
} else {
// Add sub-dirs to the queue
pathsToList.addAll(Arrays.stream(p.getRight())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -700,4 +700,50 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
}
}
}

test("test Non partition table with metatable support") {
List(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).foreach { tableType =>
initSparkContext("testNonPartitionTableWithMetaTable")
initSparkContext("test_schema_evolution")
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
val basePath = path.toAbsolutePath.toString
try {
val df = spark.range(0, 10).toDF("keyid")
.withColumn("col3", expr("keyid"))
.withColumn("age", expr("keyid + 1000"))

df.write.format("hudi")
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key, tableType)
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY.key, "col3")
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY.key, "keyid")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY.key, "")
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY.key, "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY.key, "insert")
.option("hoodie.insert.shuffle.parallelism", "1")
.option("hoodie.metadata.enable", "true")
.option(HoodieWriteConfig.TABLE_NAME.key, "hoodie_test")
.mode(SaveMode.Overwrite).save(basePath)
// upsert same record again
val df_update = spark.range(0, 10).toDF("keyid")
.withColumn("col3", expr("keyid"))
.withColumn("age", expr("keyid + 2000"))
df_update.write.format("hudi")
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key, tableType)
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY.key, "col3")
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY.key, "keyid")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY.key, "")
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY.key, "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY.key, "upsert")
.option("hoodie.upsert.shuffle.parallelism", "1")
.option("hoodie.metadata.enable", "true")
.option(HoodieWriteConfig.TABLE_NAME.key, "hoodie_test")
.mode(SaveMode.Append).save(basePath)
assert(spark.read.format("hudi").load(basePath).count() == 10)
assert(spark.read.format("hudi").load(basePath).where("age >= 2000").count() == 10)
} finally {
spark.stop()
FileUtils.deleteDirectory(path.toFile)
}
}
}
}