Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ private object HoodieMergeOnReadRDD {
val logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
.withBasePath(tablePath)
.withLogFilePaths(logFiles.map(logFile => getFilePath(logFile.getPath)).asJava)
.withLogFilePaths(logFiles.map(logFile => logFile.getPath.toString).asJava)
.withReaderSchema(logSchema)
.withLatestInstantTime(tableState.latestCommitTimestamp)
.withReadBlocksLazily(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

package org.apache.spark.sql.hudi

import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.SimpleKeyGenerator
import org.apache.spark.sql.SaveMode

class TestDeleteTable extends TestHoodieSqlBase {

test("Test Delete Table") {
Expand Down Expand Up @@ -198,4 +203,46 @@ class TestDeleteTable extends TestHoodieSqlBase {
}
}
}

Seq(false, true).foreach { urlencode =>
test(s"Test Delete single-partition table' partitions, urlencode: $urlencode") {
withTempDir { tmp =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"

import spark.implicits._
val df = Seq((1, "z3", "v1", "2021/10/01"), (2, "l4", "v1", "2021/10/02"))
.toDF("id", "name", "ts", "dt")

df.write.format("hudi")
.option(HoodieWriteConfig.TBL_NAME.key, tableName)
.option(TABLE_TYPE.key, MOR_TABLE_TYPE_OPT_VAL)
.option(RECORDKEY_FIELD.key, "id")
.option(PRECOMBINE_FIELD.key, "ts")
.option(PARTITIONPATH_FIELD.key, "dt")
.option(URL_ENCODE_PARTITIONING.key(), urlencode)
.option(KEYGENERATOR_CLASS_NAME.key, classOf[SimpleKeyGenerator].getName)
.option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1")
.option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1")
.mode(SaveMode.Overwrite)
.save(tablePath)

// register meta to spark catalog by creating table
spark.sql(
s"""
|create table $tableName using hudi
|location '$tablePath'
|""".stripMargin)

// delete 2021-10-01 partition
if (urlencode) {
spark.sql(s"""delete from $tableName where dt="2021/10/01"""")
} else {
spark.sql(s"delete from $tableName where dt='2021/10/01'")
}

checkAnswer(s"select dt from $tableName")(Seq(s"2021/10/02"))
}
}
}
}