Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,22 @@ class DeleteMetadataTableProcedure extends BaseProcedure with ProcedureBuilder w
super.checkArgs(PARAMETERS, args)

val tableName = getArgValueOrDefault(args, PARAMETERS(0))
val basePath = getBasePath(tableName)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
val tableNames = tableName.get.asInstanceOf[String].split(",")
var metadataPaths = ""
for (tb <- tableNames) {
val basePath = getBasePath(Option.apply(tb))
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build

try {
val metadataTableBasePath = deleteMetadataTable(metaClient, new HoodieSparkEngineContext(jsc), false)
Seq(Row(s"Deleted Metadata Table at '$metadataTableBasePath'"))
} catch {
case e: FileNotFoundException =>
Seq(Row("File not found: " + e.getMessage))
try {
val metadataTableBasePath = deleteMetadataTable(metaClient, new HoodieSparkEngineContext(jsc), false)
metadataPaths = s"$metadataPaths,$metadataTableBasePath"
Seq(Row(s"Deleted Metadata Table at '$metadataTableBasePath'"))
} catch {
case e: FileNotFoundException =>
Seq(Row("File not found: " + e.getMessage))
}
}
Seq(Row(s"Deleted Metadata Table at '$metadataPaths'"))
}

override def build = new DeleteMetadataTableProcedure()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,64 @@ class TestMetadataProcedure extends HoodieSparkProcedureTestBase {
}
}

test("Test Call create_metadata_table then create_metadata_table with mutiltables") {
withTempDir { tmp =>
val tableName = generateTableName
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
// insert data to table
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")

val tableName_1 = generateTableName
// create table
spark.sql(
s"""
|create table $tableName_1 (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName_1'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
// insert data to table
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")

val tables = s"$tableName,$tableName_1"

// The first step is delete the metadata
val ret = spark.sql(s"""call delete_metadata_table(table => '$tables')""").collect()
assertResult(1) {
ret.length
}

// The second step is create the metadata
val createResult = spark.sql(s"""call create_metadata_table(table => '$tableName')""").collect()
assertResult(1) {
createResult.length
}
}
}

test("Test Call init_metadata_table Procedure") {
withTempDir { tmp =>
val tableName = generateTableName
Expand Down