Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ object HoodieProcedures {
mapBuilder.put(ShowRollbacksProcedure.NAME, ShowRollbacksProcedure.builder)
mapBuilder.put(ShowRollbackDetailProcedure.NAME, ShowRollbackDetailProcedure.builder)
mapBuilder.put(ExportInstantsProcedure.NAME, ExportInstantsProcedure.builder)
mapBuilder.put(ShowAllFileSystemViewProcedure.NAME, ShowAllFileSystemViewProcedure.builder)
mapBuilder.put(ShowLatestFileSystemViewProcedure.NAME, ShowLatestFileSystemViewProcedure.builder)
mapBuilder.build
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hudi.command.procedures

import com.google.common.collect.Lists
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieDefaultTimeline, HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.util
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}

import java.util.function.{Function, Supplier}
import java.util.stream.Collectors
import scala.collection.JavaConverters.{asJavaIteratorConverter, asScalaIteratorConverter}

class ShowFileSystemViewProcedure(showLatest: Boolean) extends BaseProcedure with ProcedureBuilder {
private val PARAMETERS_ALL: Array[ProcedureParameter] = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "max_instant", DataTypes.StringType, ""),
ProcedureParameter.optional(2, "include_max", DataTypes.BooleanType, false),
ProcedureParameter.optional(3, "include_inflight", DataTypes.BooleanType, false),
ProcedureParameter.optional(4, "exclude_compaction", DataTypes.BooleanType, false),
ProcedureParameter.optional(5, "limit", DataTypes.IntegerType, 10),
ProcedureParameter.optional(6, "path_regex", DataTypes.StringType, "*/*/*")
)

private val OUTPUT_TYPE_ALL: StructType = StructType(Array[StructField](
StructField("partition", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("file_id", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("base_instant", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("data_file", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("data_file_size", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("num_delta_files", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_delta_file_size", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("delta_files", DataTypes.StringType, nullable = true, Metadata.empty)
))

private val PARAMETERS_LATEST: Array[ProcedureParameter] = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "max_instant", DataTypes.StringType, ""),
ProcedureParameter.optional(2, "include_max", DataTypes.BooleanType, false),
ProcedureParameter.optional(3, "include_inflight", DataTypes.BooleanType, false),
ProcedureParameter.optional(4, "exclude_compaction", DataTypes.BooleanType, false),
ProcedureParameter.optional(5, "limit", DataTypes.IntegerType, 10),
ProcedureParameter.required(6, "partition_path", DataTypes.StringType, None),
ProcedureParameter.optional(7, "merge", DataTypes.BooleanType, true)

)

private val OUTPUT_TYPE_LATEST: StructType = StructType(Array[StructField](
StructField("partition", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("file_id", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("base_instant", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("data_file", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("data_file_size", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("num_delta_files", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_delta_file_size", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("delta_size_compaction_scheduled", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("delta_size_compaction_unscheduled", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("delta_to_base_radio_compaction_scheduled", DataTypes.DoubleType, nullable = true, Metadata.empty),
StructField("delta_to_base_radio_compaction_unscheduled", DataTypes.DoubleType, nullable = true, Metadata.empty),
StructField("delta_files_compaction_scheduled", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("delta_files_compaction_unscheduled", DataTypes.StringType, nullable = true, Metadata.empty)
))

private def buildFileSystemView(table: Option[Any],
globRegex: String,
maxInstant: String,
includeMaxInstant: Boolean,
includeInflight: Boolean,
excludeCompaction: Boolean
): HoodieTableFileSystemView = {
val basePath = getBasePath(table)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
val fs = metaClient.getFs
val globPath = String.format("%s/%s/*", basePath, globRegex)
val statuses = FSUtils.getGlobStatusExcludingMetaFolder(fs, new Path(globPath))
var timeline: HoodieTimeline = if (excludeCompaction) {
metaClient.getActiveTimeline.getCommitsTimeline
} else {
metaClient.getActiveTimeline.getWriteTimeline
}
if (!includeInflight) {
timeline = timeline.filterCompletedInstants()
}
var instants = timeline.getInstants.iterator().asScala
if (maxInstant.nonEmpty) {
val predicate = if (includeMaxInstant) {
HoodieTimeline.GREATER_THAN_OR_EQUALS
} else {
HoodieTimeline.GREATER_THAN
}
instants = instants.filter(instant => predicate.test(maxInstant, instant.getTimestamp))
}

val details = new Function[HoodieInstant, org.apache.hudi.common.util.Option[Array[Byte]]]
with java.io.Serializable {
override def apply(instant: HoodieInstant): util.Option[Array[Byte]] = {
metaClient.getActiveTimeline.getInstantDetails(instant)
}
}
val filteredTimeline = new HoodieDefaultTimeline(Lists.newArrayList(instants.asJava).stream(), details)
new HoodieTableFileSystemView(metaClient, filteredTimeline, statuses.toArray(new Array[FileStatus](0)))
}

private def showAllFileSlices(fsView: HoodieTableFileSystemView): java.util.List[Row] = {
val rows: java.util.List[Row] = Lists.newArrayList()
fsView.getAllFileGroups.iterator().asScala.foreach(fg => {
fg.getAllFileSlices.iterator().asScala.foreach(fs => {
val fileId = fg.getFileGroupId.getFileId
var baseFilePath = ""
var baseFileSize = 0L
if (fs.getBaseFile.isPresent) {
baseFilePath = fs.getBaseFile.get.getPath
baseFileSize = fs.getBaseFile.get.getFileSize
}
val numLogFiles = fs.getLogFiles.count()
val sumLogFileSize = fs.getLogFiles.iterator().asScala.map(_.getFileSize).sum
val logFiles = fs.getLogFiles.collect(Collectors.toList[HoodieLogFile]).toString

rows.add(Row(fg.getPartitionPath, fileId, fs.getBaseInstantTime, baseFilePath, baseFileSize, numLogFiles,
sumLogFileSize, logFiles))
})
})
rows
}

private def showLatestFileSlices(fsView: HoodieTableFileSystemView,
table: Option[Any],
partition: String,
maxInstant: String,
merge: Boolean): java.util.List[Row] = {
var fileSliceStream: java.util.stream.Stream[FileSlice] = null
if (!merge) {
fileSliceStream = fsView.getLatestFileSlices(partition)
} else {
fileSliceStream = fsView.getLatestMergedFileSlicesBeforeOrOn(partition, if (maxInstant.isEmpty) {
val basePath = getBasePath(table)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
metaClient.getActiveTimeline.filterCompletedAndCompactionInstants().lastInstant().get().getTimestamp
} else {
maxInstant
})
}
val rows: java.util.List[Row] = Lists.newArrayList()
fileSliceStream.iterator().asScala.foreach {
fs => {
val fileId = fs.getFileId
val baseInstantTime = fs.getBaseInstantTime
var baseFilePath = ""
var baseFileSize = 0L
if (fs.getBaseFile.isPresent) {
baseFilePath = fs.getBaseFile.get.getPath
baseFileSize = fs.getBaseFile.get.getFileSize
}
val numLogFiles = fs.getLogFiles.count()
val sumLogFileSize = fs.getLogFiles.iterator().asScala.map(_.getFileSize).sum
val logFilesScheduledForCompactionTotalSize = fs.getLogFiles.iterator().asScala
.filter(logFile => logFile.getBaseCommitTime.equals(fs.getBaseInstantTime))
.map(_.getFileSize).sum
val logFilesUnscheduledTotalSize = fs.getLogFiles.iterator().asScala
.filter(logFile => !logFile.getBaseCommitTime.equals(fs.getBaseInstantTime))
.map(_.getFileSize).sum
val logSelectedForCompactionToBaseRatio = if (baseFileSize > 0) {
logFilesScheduledForCompactionTotalSize / (baseFileSize * 1.0)
} else {
-1
}
val logUnscheduledToBaseRatio = if (baseFileSize > 0) {
logFilesUnscheduledTotalSize / (baseFileSize * 1.0)
} else {
-1
}
val logFilesCommitTimeEqualInstantTime = fs.getLogFiles.iterator().asScala
.filter(logFile => logFile.getBaseCommitTime.equals(fs.getBaseInstantTime))
.mkString("[", ",", "]")
val logFilesCommitTimeNonEqualInstantTime = fs.getLogFiles.iterator().asScala
.filter(logFile => !logFile.getBaseCommitTime.equals(fs.getBaseInstantTime))
.mkString("[", ",", "]")
rows.add(Row(partition, fileId, baseInstantTime, baseFilePath, baseFileSize, numLogFiles, sumLogFileSize,
logFilesScheduledForCompactionTotalSize, logFilesUnscheduledTotalSize, logSelectedForCompactionToBaseRatio,
logUnscheduledToBaseRatio, logFilesCommitTimeEqualInstantTime, logFilesCommitTimeNonEqualInstantTime
))
}
}
rows
}

override def parameters: Array[ProcedureParameter] = if (showLatest) {
PARAMETERS_LATEST
} else {
PARAMETERS_ALL
}

override def outputType: StructType = if (showLatest) {
OUTPUT_TYPE_LATEST
} else {
OUTPUT_TYPE_ALL
}

override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(parameters, args)
val table = getArgValueOrDefault(args, parameters(0))
val maxInstant = getArgValueOrDefault(args, parameters(1)).get.asInstanceOf[String]
val includeMax = getArgValueOrDefault(args, parameters(2)).get.asInstanceOf[Boolean]
val includeInflight = getArgValueOrDefault(args, parameters(3)).get.asInstanceOf[Boolean]
val excludeCompaction = getArgValueOrDefault(args, parameters(4)).get.asInstanceOf[Boolean]
val limit = getArgValueOrDefault(args, parameters(5)).get.asInstanceOf[Int]
val rows: java.util.List[Row] = if (!showLatest) {
val globRegex = getArgValueOrDefault(args, parameters(6)).get.asInstanceOf[String]
val fsView = buildFileSystemView(table, globRegex, maxInstant, includeMax, includeInflight, excludeCompaction)
showAllFileSlices(fsView)
} else {
val partitionPath = getArgValueOrDefault(args, parameters(6)).get.asInstanceOf[String]
val merge = getArgValueOrDefault(args, parameters(7)).get.asInstanceOf[Boolean]
val fsView = buildFileSystemView(table, partitionPath, maxInstant, includeMax, includeInflight, excludeCompaction)
showLatestFileSlices(fsView, table, partitionPath, maxInstant, merge)
}
rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList
}

override def build: Procedure = new ShowFileSystemViewProcedure(showLatest)
}

object ShowAllFileSystemViewProcedure {
val NAME = "show_fsview_all"

def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new ShowFileSystemViewProcedure(false)
}
}

object ShowLatestFileSystemViewProcedure {
val NAME = "show_fsview_latest"

def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get(): ProcedureBuilder = new ShowFileSystemViewProcedure(true)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hudi.procedure

import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase

class TestFsViewProcedure extends HoodieSparkSqlTestBase {
test("Test Call show_fsview_all Procedure") {
withTempDir { tmp =>
val tableName = generateTableName
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| partitioned by (ts)
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
// insert data to table
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")

// Check required fields
checkExceptionContain(s"""call show_fsview_all(limit => 10)""")(
s"Argument: table is required")

// collect result for table
val result = spark.sql(
s"""call show_fsview_all(table => '$tableName', path_regex => '*/', limit => 10)""".stripMargin).collect()
assertResult(2) {
result.length
}
}
}

test("Test Call show_fsview_latest Procedure") {
withTempDir { tmp =>
val tableName = generateTableName
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| partitioned by (ts)
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
// insert data to table
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")

// Check required fields
checkExceptionContain(s"""call show_fsview_latest(limit => 10)""")(
s"Argument: table is required")

// collect result for table
val result = spark.sql(
s"""call show_fsview_latest(table => '$tableName', partition_path => 'ts=1000', limit => 10)""".stripMargin).collect()
assertResult(1) {
result.length
}
}
}
}