Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ object HoodieProcedures {
mapBuilder.put(ShowLatestFileSystemViewProcedure.NAME, ShowLatestFileSystemViewProcedure.builder)
mapBuilder.put(ShowHoodieLogFileMetadataProcedure.NAME, ShowHoodieLogFileMetadataProcedure.builder)
mapBuilder.put(ShowHoodieLogFileRecordsProcedure.NAME, ShowHoodieLogFileRecordsProcedure.builder)
mapBuilder.put(StatsWriteAmplificationProcedure.NAME, StatsWriteAmplificationProcedure.builder)
mapBuilder.put(StatsFileSizeProcedure.NAME, StatsFileSizeProcedure.builder)
mapBuilder.build
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hudi.command.procedures

import com.codahale.metrics.{Histogram, Snapshot, UniformReservoir}
import com.google.common.collect.{Lists, Maps}
import org.apache.hadoop.fs.Path
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.sql.Row
import org.apache.spark.sql.hudi.command.procedures.StatsFileSizeProcedure.MAX_FILES
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}

import java.util.function.Supplier
import scala.collection.JavaConverters.{asScalaBufferConverter, mapAsScalaMapConverter}

class StatsFileSizeProcedure extends BaseProcedure with ProcedureBuilder {

override def parameters: Array[ProcedureParameter] = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "partition_path", DataTypes.StringType, ""),
ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 10)
)

override def outputType: StructType = StructType(Array[StructField](
StructField("commit_time", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("min", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("10th", DataTypes.DoubleType, nullable = true, Metadata.empty),
StructField("50th", DataTypes.DoubleType, nullable = true, Metadata.empty),
StructField("avg", DataTypes.DoubleType, nullable = true, Metadata.empty),
StructField("95th", DataTypes.DoubleType, nullable = true, Metadata.empty),
StructField("max", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("num_files", DataTypes.IntegerType, nullable = true, Metadata.empty),
StructField("stddev", DataTypes.DoubleType, nullable = true, Metadata.empty)
))

override def call(args: ProcedureArgs): Seq[Row] = {
checkArgs(parameters, args)
val table = getArgValueOrDefault(args, parameters(0))
val globRegex = getArgValueOrDefault(args, parameters(1)).get.asInstanceOf[String]
val limit: Int = getArgValueOrDefault(args, parameters(2)).get.asInstanceOf[Int]
val basePath = getBasePath(table)
val fs = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build.getFs
val globPath = String.format("%s/%s/*", basePath, globRegex)
val statuses = FSUtils.getGlobStatusExcludingMetaFolder(fs, new Path(globPath))

val globalHistogram = new Histogram(new UniformReservoir(MAX_FILES))
val commitHistogramMap: java.util.Map[String, Histogram] = Maps.newHashMap()
statuses.asScala.foreach(
status => {
val instantTime = FSUtils.getCommitTime(status.getPath.getName)
val len = status.getLen
commitHistogramMap.putIfAbsent(instantTime, new Histogram(new UniformReservoir(MAX_FILES)))
commitHistogramMap.get(instantTime).update(len)
globalHistogram.update(len)
}
)
val rows: java.util.List[Row] = Lists.newArrayList()
commitHistogramMap.asScala.foreach {
case (instantTime, histogram) =>
val snapshot = histogram.getSnapshot
rows.add(printFileSizeHistogram(instantTime, snapshot))
}
val snapshot = globalHistogram.getSnapshot
rows.add(printFileSizeHistogram("ALL", snapshot))
rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList
}

def printFileSizeHistogram(instantTime: String, snapshot: Snapshot): Row = {
Row(
instantTime,
snapshot.getMin,
snapshot.getValue(0.1),
snapshot.getMedian,
snapshot.getMean,
snapshot.get95thPercentile,
snapshot.getMax,
snapshot.size,
snapshot.getStdDev
)
}

override def build: Procedure = new StatsFileSizeProcedure
}

object StatsFileSizeProcedure {
val MAX_FILES = 1000000
val NAME = "stats_filesizes"

def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get(): ProcedureBuilder = new StatsFileSizeProcedure()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hudi.command.procedures

import com.google.common.collect.Lists
import org.apache.hudi.common.model.HoodieCommitMetadata
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}

import java.text.DecimalFormat
import java.util.function.Supplier
import scala.collection.JavaConverters.asScalaIteratorConverter

class StatsWriteAmplificationProcedure extends BaseProcedure with ProcedureBuilder {
override def parameters: Array[ProcedureParameter] = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10)
)

override def outputType: StructType = StructType(Array[StructField](
StructField("commit_time", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("total_upserted", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_written", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("write_amplification_factor", DataTypes.StringType, nullable = true, Metadata.empty)
))

override def call(args: ProcedureArgs): Seq[Row] = {
checkArgs(parameters, args)
val table = getArgValueOrDefault(args, parameters(0))
val limit: Int = getArgValueOrDefault(args, parameters(1)).get.asInstanceOf[Int]
val basePath = getBasePath(table)
val client = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
val activeTimeline = client.getActiveTimeline
val timeline = activeTimeline.getCommitTimeline.filterCompletedInstants()

val rows: java.util.List[Row] = Lists.newArrayList()
val df = new DecimalFormat("#.00")
var totalRecordsUpserted = 0L
var totalRecordsWritten = 0L
timeline.getInstants.iterator.asScala.foreach(
instantTime => {
var waf = "0"
val commit = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(instantTime).get(), classOf[HoodieCommitMetadata])
if (commit.fetchTotalUpdateRecordsWritten() > 0) {
waf = df.format(commit.fetchTotalRecordsWritten().toFloat / commit.fetchTotalUpdateRecordsWritten())
}
rows.add(Row(instantTime.getTimestamp, commit.fetchTotalUpdateRecordsWritten, commit.fetchTotalRecordsWritten, waf))
totalRecordsUpserted = totalRecordsUpserted + commit.fetchTotalUpdateRecordsWritten()
totalRecordsWritten = totalRecordsWritten + commit.fetchTotalRecordsWritten()
}
)
var waf = "0"
if (totalRecordsUpserted > 0) {
waf = df.format(totalRecordsWritten.toFloat / totalRecordsUpserted)
}
rows.add(Row("Total", totalRecordsUpserted, totalRecordsWritten, waf))
rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList
}

override def build: Procedure = new StatsWriteAmplificationProcedure
}

object StatsWriteAmplificationProcedure {
val NAME = "stats_wa"

def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get(): ProcedureBuilder = new StatsWriteAmplificationProcedure()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.hudi.procedure

import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase

class TestStatsProcedure extends HoodieSparkSqlTestBase {
test("Test Call stats_wa Procedure") {
withTempDir { tmp =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| partitioned by (ts)
| location '$tablePath'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
// insert data to table
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
spark.sql(s"update $tableName set name = 'b1', price = 100 where id = 1")

// Check required fields
checkExceptionContain(s"""call stats_wa(limit => 10)""")(
s"Argument: table is required")

// collect result for table
val result = spark.sql(
s"""call stats_wa(table => '$tableName')""".stripMargin).collect()
assertResult(4) {
result.length
}
}
}

test("Test Call stats_filesizes Procedure") {
withTempDir { tmp =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| partitioned by (ts)
| location '$tablePath'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
// insert data to table
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")

// Check required fields
checkExceptionContain(s"""call stats_filesizes(limit => 10)""")(
s"Argument: table is required")

// collect result for table
val result = spark.sql(
s"""call stats_filesizes(table => '$tableName', partition_path => '/*')""".stripMargin).collect()
assertResult(3) {
result.length
}
}
}
}