diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala index 713cd5d7da291..f14d96139b7ab 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala @@ -81,6 +81,7 @@ object HoodieProcedures { ,(ShowInvalidParquetProcedure.NAME, ShowInvalidParquetProcedure.builder) ,(HiveSyncProcedure.NAME, HiveSyncProcedure.builder) ,(CopyToTempView.NAME, CopyToTempView.builder) + ,(ShowCommitExtraMetadataProcedure.NAME, ShowCommitExtraMetadataProcedure.builder) ) } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitExtraMetadataProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitExtraMetadataProcedure.scala new file mode 100644 index 0000000000000..1a8f4dd9e44d4 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitExtraMetadataProcedure.scala @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.HoodieCLIUtils +import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieReplaceCommitMetadata} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.exception.HoodieException +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util +import java.util.function.Supplier +import scala.collection.JavaConversions._ + +class ShowCommitExtraMetadataProcedure() extends BaseProcedure with ProcedureBuilder { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None), + ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 100), + ProcedureParameter.optional(2, "instant_time", DataTypes.StringType, None), + ProcedureParameter.optional(3, "metadata_key", DataTypes.StringType, None) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("instant_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("metadata_key", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("metadata_value", DataTypes.StringType, nullable = true, Metadata.empty) + )) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val table = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String] + val limit = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Int] + val instantTime = getArgValueOrDefault(args, PARAMETERS(2)) + val metadataKey = getArgValueOrDefault(args, PARAMETERS(3)) + + val hoodieCatalogTable = HoodieCLIUtils.getHoodieCatalogTable(sparkSession, table) + val basePath = hoodieCatalogTable.tableLocation + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + val activeTimeline = metaClient.getActiveTimeline + val timeline = activeTimeline.getCommitsTimeline.filterCompletedInstants + + val hoodieInstantOption: Option[HoodieInstant] = if (instantTime.isEmpty) { + getCommitForLastInstant(timeline) + } else { + getCommitForInstant(timeline, instantTime.get.asInstanceOf[String]) + } + + if (hoodieInstantOption.isEmpty) { + throw new HoodieException(s"Commit $instantTime not found in Commits $timeline.") + } + + val commitMetadataOptional = getHoodieCommitMetadata(timeline, hoodieInstantOption) + + if (commitMetadataOptional.isEmpty) { + throw new HoodieException(s"Commit $instantTime not found commitMetadata in Commits $timeline.") + } + + val meta = commitMetadataOptional.get + val timestamp: String = hoodieInstantOption.get.getTimestamp + val action: String = hoodieInstantOption.get.getAction + val metadatas: util.Map[String, String] = if (metadataKey.isEmpty) { + meta.getExtraMetadata + } else { + meta.getExtraMetadata.filter(r => r._1.equals(metadataKey.get.asInstanceOf[String].trim)) + } + + val rows = new util.ArrayList[Row] + metadatas.foreach(r => rows.add(Row(timestamp, action, r._1, r._2))) + rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList + } + + override def build: Procedure = new ShowCommitExtraMetadataProcedure() + + private def getCommitForLastInstant(timeline: HoodieTimeline): Option[HoodieInstant] = { + val instantOptional = timeline.getReverseOrderedInstants + .findFirst + if (instantOptional.isPresent) { + Option.apply(instantOptional.get()) + } else { + Option.empty + } + } + + private def getCommitForInstant(timeline: HoodieTimeline, instantTime: String): Option[HoodieInstant] = { + val instants: util.List[HoodieInstant] = util.Arrays.asList( + new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, instantTime), + new HoodieInstant(false, HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, instantTime)) + + val hoodieInstant: Option[HoodieInstant] = instants.find((i: HoodieInstant) => timeline.containsInstant(i)) + hoodieInstant + } + + private def getHoodieCommitMetadata(timeline: HoodieTimeline, hoodieInstant: Option[HoodieInstant]): Option[HoodieCommitMetadata] = { + if (hoodieInstant.isDefined) { + if (hoodieInstant.get.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) { + Option(HoodieReplaceCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant.get).get, + classOf[HoodieReplaceCommitMetadata])) + } else { + Option(HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant.get).get, + classOf[HoodieCommitMetadata])) + } + } else { + Option.empty + } + } +} + +object ShowCommitExtraMetadataProcedure { + val NAME = "show_commit_extra_metadata" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new ShowCommitExtraMetadataProcedure() + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCommitsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCommitsProcedure.scala index 2840b2243430d..03cf26800df96 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCommitsProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCommitsProcedure.scala @@ -61,9 +61,7 @@ class TestCommitsProcedure extends HoodieSparkProcedureTestBase { // collect archived commits for table val endTs = commits(0).get(0).toString val archivedCommits = spark.sql(s"""call show_archived_commits(table => '$tableName', end_ts => '$endTs')""").collect() - assertResult(4) { - archivedCommits.length - } + assertResult(4){archivedCommits.length} } } @@ -109,9 +107,7 @@ class TestCommitsProcedure extends HoodieSparkProcedureTestBase { // collect archived commits for table val endTs = commits(0).get(0).toString val archivedCommits = spark.sql(s"""call show_archived_commits_metadata(table => '$tableName', end_ts => '$endTs')""").collect() - assertResult(4) { - archivedCommits.length - } + assertResult(4){archivedCommits.length} } } @@ -288,4 +284,50 @@ class TestCommitsProcedure extends HoodieSparkProcedureTestBase { assertResult(1){result.length} } } + + test("Test Call show_commit_extra_metadata Procedure") { + withTempDir { tmp => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") + + // Check required fields + checkExceptionContain(s"""call show_commit_extra_metadata()""")( + s"arguments is empty") + + // collect commits for table + val commits = spark.sql(s"""call show_commits(table => '$tableName', limit => 10)""").collect() + assertResult(2){commits.length} + + val instant_time = commits(0).get(0).toString + // get specify instantTime's extraMetadatas + val metadatas1 = spark.sql(s"""call show_commit_extra_metadata(table => '$tableName', instant_time => '$instant_time')""").collect() + assertResult(true){metadatas1.length > 0} + + // get last instantTime's extraMetadatas + val metadatas2 = spark.sql(s"""call show_commit_extra_metadata(table => '$tableName')""").collect() + assertResult(true){metadatas2.length > 0} + + // get last instantTime's extraMetadatas and filter extraMetadatas with metadata_key + val metadatas3 = spark.sql(s"""call show_commit_extra_metadata(table => '$tableName', metadata_key => 'schema')""").collect() + assertResult(1){metadatas3.length} + } + } }