diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala index 7f842a795fcab..7f29f4b86f871 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala @@ -52,6 +52,8 @@ object HoodieProcedures { mapBuilder.put(ShowLatestFileSystemViewProcedure.NAME, ShowLatestFileSystemViewProcedure.builder) mapBuilder.put(ShowHoodieLogFileMetadataProcedure.NAME, ShowHoodieLogFileMetadataProcedure.builder) mapBuilder.put(ShowHoodieLogFileRecordsProcedure.NAME, ShowHoodieLogFileRecordsProcedure.builder) + mapBuilder.put(StatsWriteAmplificationProcedure.NAME, StatsWriteAmplificationProcedure.builder) + mapBuilder.put(StatsFileSizeProcedure.NAME, StatsFileSizeProcedure.builder) mapBuilder.build } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsFileSizeProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsFileSizeProcedure.scala new file mode 100644 index 0000000000000..e0ece6e086b1f --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsFileSizeProcedure.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import com.codahale.metrics.{Histogram, Snapshot, UniformReservoir} +import com.google.common.collect.{Lists, Maps} +import org.apache.hadoop.fs.Path +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.spark.sql.Row +import org.apache.spark.sql.hudi.command.procedures.StatsFileSizeProcedure.MAX_FILES +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util.function.Supplier +import scala.collection.JavaConverters.{asScalaBufferConverter, mapAsScalaMapConverter} + +class StatsFileSizeProcedure extends BaseProcedure with ProcedureBuilder { + + override def parameters: Array[ProcedureParameter] = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None), + ProcedureParameter.optional(1, "partition_path", DataTypes.StringType, ""), + ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 10) + ) + + override def outputType: StructType = StructType(Array[StructField]( + StructField("commit_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("min", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("10th", DataTypes.DoubleType, nullable = true, Metadata.empty), + StructField("50th", DataTypes.DoubleType, nullable = true, Metadata.empty), + StructField("avg", DataTypes.DoubleType, nullable = true, Metadata.empty), + StructField("95th", DataTypes.DoubleType, nullable = true, Metadata.empty), + StructField("max", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_files", DataTypes.IntegerType, nullable = true, Metadata.empty), + StructField("stddev", DataTypes.DoubleType, nullable = true, Metadata.empty) + )) + + override def call(args: ProcedureArgs): Seq[Row] = { + checkArgs(parameters, args) + val table = getArgValueOrDefault(args, parameters(0)) + val globRegex = getArgValueOrDefault(args, parameters(1)).get.asInstanceOf[String] + val limit: Int = getArgValueOrDefault(args, parameters(2)).get.asInstanceOf[Int] + val basePath = getBasePath(table) + val fs = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build.getFs + val globPath = String.format("%s/%s/*", basePath, globRegex) + val statuses = FSUtils.getGlobStatusExcludingMetaFolder(fs, new Path(globPath)) + + val globalHistogram = new Histogram(new UniformReservoir(MAX_FILES)) + val commitHistogramMap: java.util.Map[String, Histogram] = Maps.newHashMap() + statuses.asScala.foreach( + status => { + val instantTime = FSUtils.getCommitTime(status.getPath.getName) + val len = status.getLen + commitHistogramMap.putIfAbsent(instantTime, new Histogram(new UniformReservoir(MAX_FILES))) + commitHistogramMap.get(instantTime).update(len) + globalHistogram.update(len) + } + ) + val rows: java.util.List[Row] = Lists.newArrayList() + commitHistogramMap.asScala.foreach { + case (instantTime, histogram) => + val snapshot = histogram.getSnapshot + rows.add(printFileSizeHistogram(instantTime, snapshot)) + } + val snapshot = globalHistogram.getSnapshot + rows.add(printFileSizeHistogram("ALL", snapshot)) + rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList + } + + def printFileSizeHistogram(instantTime: String, snapshot: Snapshot): Row = { + Row( + instantTime, + snapshot.getMin, + snapshot.getValue(0.1), + snapshot.getMedian, + snapshot.getMean, + snapshot.get95thPercentile, + snapshot.getMax, + snapshot.size, + snapshot.getStdDev + ) + } + + override def build: Procedure = new StatsFileSizeProcedure +} + +object StatsFileSizeProcedure { + val MAX_FILES = 1000000 + val NAME = "stats_filesizes" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get(): ProcedureBuilder = new StatsFileSizeProcedure() + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsWriteAmplificationProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsWriteAmplificationProcedure.scala new file mode 100644 index 0000000000000..bb7b9e8e043bf --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsWriteAmplificationProcedure.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import com.google.common.collect.Lists +import org.apache.hudi.common.model.HoodieCommitMetadata +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.text.DecimalFormat +import java.util.function.Supplier +import scala.collection.JavaConverters.asScalaIteratorConverter + +class StatsWriteAmplificationProcedure extends BaseProcedure with ProcedureBuilder { + override def parameters: Array[ProcedureParameter] = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None), + ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10) + ) + + override def outputType: StructType = StructType(Array[StructField]( + StructField("commit_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("total_upserted", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_written", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("write_amplification_factor", DataTypes.StringType, nullable = true, Metadata.empty) + )) + + override def call(args: ProcedureArgs): Seq[Row] = { + checkArgs(parameters, args) + val table = getArgValueOrDefault(args, parameters(0)) + val limit: Int = getArgValueOrDefault(args, parameters(1)).get.asInstanceOf[Int] + val basePath = getBasePath(table) + val client = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + val activeTimeline = client.getActiveTimeline + val timeline = activeTimeline.getCommitTimeline.filterCompletedInstants() + + val rows: java.util.List[Row] = Lists.newArrayList() + val df = new DecimalFormat("#.00") + var totalRecordsUpserted = 0L + var totalRecordsWritten = 0L + timeline.getInstants.iterator.asScala.foreach( + instantTime => { + var waf = "0" + val commit = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(instantTime).get(), classOf[HoodieCommitMetadata]) + if (commit.fetchTotalUpdateRecordsWritten() > 0) { + waf = df.format(commit.fetchTotalRecordsWritten().toFloat / commit.fetchTotalUpdateRecordsWritten()) + } + rows.add(Row(instantTime.getTimestamp, commit.fetchTotalUpdateRecordsWritten, commit.fetchTotalRecordsWritten, waf)) + totalRecordsUpserted = totalRecordsUpserted + commit.fetchTotalUpdateRecordsWritten() + totalRecordsWritten = totalRecordsWritten + commit.fetchTotalRecordsWritten() + } + ) + var waf = "0" + if (totalRecordsUpserted > 0) { + waf = df.format(totalRecordsWritten.toFloat / totalRecordsUpserted) + } + rows.add(Row("Total", totalRecordsUpserted, totalRecordsWritten, waf)) + rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList + } + + override def build: Procedure = new StatsWriteAmplificationProcedure +} + +object StatsWriteAmplificationProcedure { + val NAME = "stats_wa" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get(): ProcedureBuilder = new StatsWriteAmplificationProcedure() + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestStatsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestStatsProcedure.scala new file mode 100644 index 0000000000000..2da5392e9b6a9 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestStatsProcedure.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.procedure + +import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase + +class TestStatsProcedure extends HoodieSparkSqlTestBase { + test("Test Call stats_wa Procedure") { + withTempDir { tmp => + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | partitioned by (ts) + | location '$tablePath' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") + spark.sql(s"update $tableName set name = 'b1', price = 100 where id = 1") + + // Check required fields + checkExceptionContain(s"""call stats_wa(limit => 10)""")( + s"Argument: table is required") + + // collect result for table + val result = spark.sql( + s"""call stats_wa(table => '$tableName')""".stripMargin).collect() + assertResult(4) { + result.length + } + } + } + + test("Test Call stats_filesizes Procedure") { + withTempDir { tmp => + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | partitioned by (ts) + | location '$tablePath' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") + + // Check required fields + checkExceptionContain(s"""call stats_filesizes(limit => 10)""")( + s"Argument: table is required") + + // collect result for table + val result = spark.sql( + s"""call stats_filesizes(table => '$tableName', partition_path => '/*')""".stripMargin).collect() + assertResult(3) { + result.length + } + } + } +}