diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala index 0844c7361bfc2..df878d7181c16 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala @@ -62,10 +62,12 @@ object AvroConversionUtils { * @param rootCatalystType Catalyst [[StructType]] to be transformed into * @return converter accepting Avro payload and transforming it into a Catalyst one (in the form of [[InternalRow]]) */ - def createAvroToInternalRowConverter(rootAvroType: Schema, rootCatalystType: StructType): GenericRecord => Option[InternalRow] = - record => sparkAdapter.createAvroDeserializer(rootAvroType, rootCatalystType) + def createAvroToInternalRowConverter(rootAvroType: Schema, rootCatalystType: StructType): GenericRecord => Option[InternalRow] = { + val deserializer = sparkAdapter.createAvroDeserializer(rootAvroType, rootCatalystType) + record => deserializer .deserialize(record) .map(_.asInstanceOf[InternalRow]) + } /** * Creates converter to transform Catalyst payload into Avro one @@ -76,7 +78,8 @@ object AvroConversionUtils { * @return converter accepting Catalyst payload (in the form of [[InternalRow]]) and transforming it into an Avro one */ def createInternalRowToAvroConverter(rootCatalystType: StructType, rootAvroType: Schema, nullable: Boolean): InternalRow => GenericRecord = { - row => sparkAdapter.createAvroSerializer(rootCatalystType, rootAvroType, nullable) + val serializer = sparkAdapter.createAvroSerializer(rootCatalystType, rootAvroType, nullable) + row => serializer .serialize(row) .asInstanceOf[GenericRecord] } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/hudi/benchmark/HoodieBenchmark.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/hudi/benchmark/HoodieBenchmark.scala new file mode 100644 index 0000000000000..6d4317a8135e1 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/hudi/benchmark/HoodieBenchmark.scala @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.hudi.benchmark + + +import java.io.{OutputStream, PrintStream} + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.duration._ +import scala.util.Try + +import org.apache.commons.io.output.TeeOutputStream +import org.apache.commons.lang3.SystemUtils + +import org.apache.spark.util.Utils + +/** + * Reference from spark. + * Utility class to benchmark components. An example of how to use this is: + * val benchmark = new Benchmark("My Benchmark", valuesPerIteration) + * benchmark.addCase("V1")() + * benchmark.addCase("V2")() + * benchmark.run + * This will output the average time to run each function and the rate of each function. + * + * The benchmark function takes one argument that is the iteration that's being run. + * + * @param name name of this benchmark. + * @param valuesPerIteration number of values used in the test case, used to compute rows/s. + * @param minNumIters the min number of iterations that will be run per case, not counting warm-up. + * @param warmupTime amount of time to spend running dummy case iterations for JIT warm-up. + * @param minTime further iterations will be run for each case until this time is used up. + * @param outputPerIteration if true, the timing for each run will be printed to stdout. + * @param output optional output stream to write benchmark results to + */ +class HoodieBenchmark( + name: String, + valuesPerIteration: Long, + minNumIters: Int = 2, + warmupTime: FiniteDuration = 2.seconds, + minTime: FiniteDuration = 2.seconds, + outputPerIteration: Boolean = false, + output: Option[OutputStream] = None) { + import HoodieBenchmark._ + val benchmarks = mutable.ArrayBuffer.empty[HoodieBenchmark.Case] + + val out = if (output.isDefined) { + new PrintStream(new TeeOutputStream(System.out, output.get)) + } else { + System.out + } + + /** + * Adds a case to run when run() is called. The given function will be run for several + * iterations to collect timing statistics. + * + * @param name of the benchmark case + * @param numIters if non-zero, forces exactly this many iterations to be run + */ + def addCase(name: String, numIters: Int = 0)(f: Int => Unit): Unit = { + addTimerCase(name, numIters) { timer => + timer.startTiming() + f(timer.iteration) + timer.stopTiming() + } + } + + /** + * Adds a case with manual timing control. When the function is run, timing does not start + * until timer.startTiming() is called within the given function. The corresponding + * timer.stopTiming() method must be called before the function returns. + * + * @param name of the benchmark case + * @param numIters if non-zero, forces exactly this many iterations to be run + */ + def addTimerCase(name: String, numIters: Int = 0)(f: HoodieBenchmark.Timer => Unit): Unit = { + benchmarks += HoodieBenchmark.Case(name, f, numIters) + } + + /** + * Runs the benchmark and outputs the results to stdout. This should be copied and added as + * a comment with the benchmark. Although the results vary from machine to machine, it should + * provide some baseline. + */ + def run(): Unit = { + require(benchmarks.nonEmpty) + // scalastyle:off + println("Running benchmark: " + name) + + val results = benchmarks.map { c => + println(" Running case: " + c.name) + measure(valuesPerIteration, c.numIters)(c.fn) + } + println + + val firstBest = results.head.bestMs + // The results are going to be processor specific so it is useful to include that. + out.println(HoodieBenchmark.getJVMOSInfo()) + out.println(HoodieBenchmark.getProcessorName()) + val nameLen = Math.max(40, Math.max(name.length, benchmarks.map(_.name.length).max)) + out.printf(s"%-${nameLen}s %14s %14s %11s %12s %13s %10s\n", + name + ":", "Best Time(ms)", "Avg Time(ms)", "Stdev(ms)", "Rate(M/s)", "Per Row(ns)", "Relative") + out.println("-" * (nameLen + 80)) + results.zip(benchmarks).foreach { case (result, benchmark) => + out.printf(s"%-${nameLen}s %14s %14s %11s %12s %13s %10s\n", + benchmark.name, + "%5.0f" format result.bestMs, + "%4.0f" format result.avgMs, + "%5.0f" format result.stdevMs, + "%10.1f" format result.bestRate, + "%6.1f" format (1000 / result.bestRate), + "%3.1fX" format (firstBest / result.bestMs)) + } + out.println + // scalastyle:on + } + + /** + * Runs a single function `f` for iters, returning the average time the function took and + * the rate of the function. + */ + def measure(num: Long, overrideNumIters: Int)(f: Timer => Unit): Result = { + System.gc() // ensures garbage from previous cases don't impact this one + val warmupDeadline = warmupTime.fromNow + while (!warmupDeadline.isOverdue) { + f(new HoodieBenchmark.Timer(-1)) + } + val minIters = if (overrideNumIters != 0) overrideNumIters else minNumIters + val minDuration = if (overrideNumIters != 0) 0 else minTime.toNanos + val runTimes = ArrayBuffer[Long]() + var totalTime = 0L + var i = 0 + while (i < minIters || totalTime < minDuration) { + val timer = new HoodieBenchmark.Timer(i) + f(timer) + val runTime = timer.totalTime() + runTimes += runTime + totalTime += runTime + + if (outputPerIteration) { + // scalastyle:off + println(s"Iteration $i took ${NANOSECONDS.toMicros(runTime)} microseconds") + // scalastyle:on + } + i += 1 + } + // scalastyle:off + println(s" Stopped after $i iterations, ${NANOSECONDS.toMillis(runTimes.sum)} ms") + // scalastyle:on + assert(runTimes.nonEmpty) + val best = runTimes.min + val avg = runTimes.sum / runTimes.size + val stdev = if (runTimes.size > 1) { + math.sqrt(runTimes.map(time => (time - avg) * (time - avg)).sum / (runTimes.size - 1)) + } else 0 + Result(avg / 1000000.0, num / (best / 1000.0), best / 1000000.0, stdev / 1000000.0) + } +} + +object HoodieBenchmark { + + /** + * Object available to benchmark code to control timing e.g. to exclude set-up time. + * + * @param iteration specifies this is the nth iteration of running the benchmark case + */ + class Timer(val iteration: Int) { + private var accumulatedTime: Long = 0L + private var timeStart: Long = 0L + + def startTiming(): Unit = { + assert(timeStart == 0L, "Already started timing.") + timeStart = System.nanoTime + } + + def stopTiming(): Unit = { + assert(timeStart != 0L, "Have not started timing.") + accumulatedTime += System.nanoTime - timeStart + timeStart = 0L + } + + def totalTime(): Long = { + assert(timeStart == 0L, "Have not stopped timing.") + accumulatedTime + } + } + + case class Case(name: String, fn: Timer => Unit, numIters: Int) + case class Result(avgMs: Double, bestRate: Double, bestMs: Double, stdevMs: Double) + + /** + * This should return a user helpful processor information. Getting at this depends on the OS. + * This should return something like "Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz" + */ + def getProcessorName(): String = { + val cpu = if (SystemUtils.IS_OS_MAC_OSX) { + Utils.executeAndGetOutput(Seq("/usr/sbin/sysctl", "-n", "machdep.cpu.brand_string")) + .stripLineEnd + } else if (SystemUtils.IS_OS_LINUX) { + Try { + val grepPath = Utils.executeAndGetOutput(Seq("which", "grep")).stripLineEnd + Utils.executeAndGetOutput(Seq(grepPath, "-m", "1", "model name", "/proc/cpuinfo")) + .stripLineEnd.replaceFirst("model name[\\s*]:[\\s*]", "") + }.getOrElse("Unknown processor") + } else { + System.getenv("PROCESSOR_IDENTIFIER") + } + cpu + } + + /** + * This should return a user helpful JVM & OS information. + * This should return something like + * "OpenJDK 64-Bit Server VM 1.8.0_65-b17 on Linux 4.1.13-100.fc21.x86_64" + */ + def getJVMOSInfo(): String = { + val vmName = System.getProperty("java.vm.name") + val runtimeVersion = System.getProperty("java.runtime.version") + val osName = System.getProperty("os.name") + val osVersion = System.getProperty("os.version") + s"${vmName} ${runtimeVersion} on ${osName} ${osVersion}" + } +} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/hudi/benchmark/HoodieBenchmarkBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/hudi/benchmark/HoodieBenchmarkBase.scala new file mode 100644 index 0000000000000..ff4f0bc4ccb9e --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/hudi/benchmark/HoodieBenchmarkBase.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.hudi.benchmark + +import java.io.{File, FileOutputStream, OutputStream} + +/** + * Reference from spark. + * A base class for generate benchmark results to a file. + * For JDK9+, JDK major version number is added to the file names to distinguish the results. + */ +abstract class HoodieBenchmarkBase { + var output: Option[OutputStream] = None + + /** + * Main process of the whole benchmark. + * Implementations of this method are supposed to use the wrapper method `runBenchmark` + * for each benchmark scenario. + */ + def runBenchmarkSuite(mainArgs: Array[String]): Unit + + final def runBenchmark(benchmarkName: String)(func: => Any): Unit = { + val separator = "=" * 96 + val testHeader = (separator + '\n' + benchmarkName + '\n' + separator + '\n' + '\n').getBytes + output.foreach(_.write(testHeader)) + func + output.foreach(_.write('\n')) + } + + def main(args: Array[String]): Unit = { + // turning this on so the behavior between running benchmark via `spark-submit` or SBT will + // be consistent, also allow users to turn on/off certain behavior such as + // `spark.sql.codegen.factoryMode` + val regenerateBenchmarkFiles: Boolean = System.getenv("SPARK_GENERATE_BENCHMARK_FILES") == "1" + if (regenerateBenchmarkFiles) { + val version = System.getProperty("java.version").split("\\D+")(0).toInt + val jdkString = if (version > 8) s"-jdk$version" else "" + val resultFileName = + s"${this.getClass.getSimpleName.replace("$", "")}jdkStringsuffix-results.txt" + val prefix = HoodieBenchmarks.currentProjectRoot.map(_ + "/").getOrElse("") + val dir = new File(s"${prefix}benchmarks/") + if (!dir.exists()) { + // scalastyle:off println + println(s"Creating ${dir.getAbsolutePath} for benchmark results.") + // scalastyle:on println + dir.mkdirs() + } + val file = new File(dir, resultFileName) + if (!file.exists()) { + file.createNewFile() + } + output = Some(new FileOutputStream(file)) + } + + runBenchmarkSuite(args) + + output.foreach { o => + if (o != null) { + o.close() + } + } + + afterAll() + } + + def suffix: String = "" + + /** + * Any shutdown code to ensure a clean shutdown + */ + def afterAll(): Unit = {} +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/hudi/benchmark/HoodieBenchmarks.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/hudi/benchmark/HoodieBenchmarks.scala new file mode 100644 index 0000000000000..872991002f8b1 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/hudi/benchmark/HoodieBenchmarks.scala @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.hudi.benchmark + +import java.io.File +import java.lang.reflect.Modifier +import java.nio.file.{FileSystems, Paths} +import java.util.Locale +import scala.collection.JavaConverters._ +import scala.util.Try +import org.apache.hbase.thirdparty.com.google.common.reflect.ClassPath + +/** + * Reference from spark. + * Run all benchmarks. To run this benchmark, you should build Spark with either Maven or SBT. + * After that, you can run as below: + * + * {{{ + * 1. with spark-submit + * bin/spark-submit --class + * --jars , + * + * 2. generate result: + * SPARK_GENERATE_BENCHMARK_FILES=1 bin/spark-submit --class + * --jars , + * + * Results will be written to all corresponding files under "benchmarks/". + * Notice that it detects the sub-project's directories from jar's paths so the provided jars + * should be properly placed under target (Maven build) or target/scala-* (SBT) when you + * generate the files. + * }}} + * + * You can use a command as below to find all the test jars. + * Make sure to do not select duplicated jars created by different versions of builds or tools. + * {{{ + * find . -name '*-SNAPSHOT-tests.jar' | paste -sd ',' - + * }}} + * + * The example below runs all benchmarks and generates the results: + * {{{ + * SPARK_GENERATE_BENCHMARK_FILES=1 bin/spark-submit --class \ + * org.apache.spark.benchmark.Benchmarks --jars \ + * "`find . -name '*-SNAPSHOT-tests.jar' -o -name '*avro*-SNAPSHOT.jar' | paste -sd ',' -`" \ + * "`find . -name 'spark-core*-SNAPSHOT-tests.jar'`" \ + * "*" + * }}} + * + * The example below runs all benchmarks under "org.apache.spark.sql.execution.datasources" + * {{{ + * bin/spark-submit --class \ + * org.apache.spark.benchmark.Benchmarks --jars \ + * "`find . -name '*-SNAPSHOT-tests.jar' -o -name '*avro*-SNAPSHOT.jar' | paste -sd ',' -`" \ + * "`find . -name 'spark-core*-SNAPSHOT-tests.jar'`" \ + * "org.apache.spark.sql.execution.datasources.*" + * }}} + */ + +object HoodieBenchmarks { + var currentProjectRoot: Option[String] = None + + def main(args: Array[String]): Unit = { + val isFailFast = sys.env.get( + "SPARK_BENCHMARK_FAILFAST").map(_.toLowerCase(Locale.ROOT).trim.toBoolean).getOrElse(true) + val numOfSplits = sys.env.get( + "SPARK_BENCHMARK_NUM_SPLITS").map(_.toLowerCase(Locale.ROOT).trim.toInt).getOrElse(1) + val currentSplit = sys.env.get( + "SPARK_BENCHMARK_CUR_SPLIT").map(_.toLowerCase(Locale.ROOT).trim.toInt - 1).getOrElse(0) + var numBenchmark = 0 + + var isBenchmarkFound = false + val benchmarkClasses = ClassPath.from( + Thread.currentThread.getContextClassLoader + ).getTopLevelClassesRecursive("org.apache.spark").asScala.toArray + val matcher = FileSystems.getDefault.getPathMatcher(s"glob:${args.head}") + + benchmarkClasses.foreach { info => + lazy val clazz = info.load + lazy val runBenchmark = clazz.getMethod("main", classOf[Array[String]]) + // isAssignableFrom seems not working with the reflected class from Guava's + // getTopLevelClassesRecursive. + require(args.length > 0, "Benchmark class to run should be specified.") + if ( + info.getName.endsWith("Benchmark") && + // TODO(SPARK-34927): Support TPCDSQueryBenchmark in Benchmarks + !info.getName.endsWith("TPCDSQueryBenchmark") && + matcher.matches(Paths.get(info.getName)) && + Try(runBenchmark).isSuccess && // Does this has a main method? + !Modifier.isAbstract(clazz.getModifiers) // Is this a regular class? + ) { + numBenchmark += 1 + if (numBenchmark % numOfSplits == currentSplit) { + isBenchmarkFound = true + + val targetDirOrProjDir = + new File(clazz.getProtectionDomain.getCodeSource.getLocation.toURI) + .getParentFile.getParentFile + + // The root path to be referred in each benchmark. + currentProjectRoot = Some { + if (targetDirOrProjDir.getName == "target") { + // SBT build + targetDirOrProjDir.getParentFile.getCanonicalPath + } else { + // Maven build + targetDirOrProjDir.getCanonicalPath + } + } + + // scalastyle:off println + println(s"Running ${clazz.getName}:") + // scalastyle:on println + // Force GC to minimize the side effect. + System.gc() + try { + runBenchmark.invoke(null, args.tail.toArray) + } catch { + case e: Throwable if !isFailFast => + // scalastyle:off println + println(s"${clazz.getName} failed with the exception below:") + // scalastyle:on println + e.printStackTrace() + } + } + } + } + + if (!isBenchmarkFound) throw new RuntimeException("No benchmark found to run.") + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroSerDerBenchmark.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroSerDerBenchmark.scala new file mode 100644 index 0000000000000..5e092bdb51c36 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroSerDerBenchmark.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import org.apache.avro.generic.GenericRecord +import org.apache.hudi.{AvroConversionUtils, HoodieSparkUtils} +import org.apache.spark.hudi.benchmark.{HoodieBenchmark, HoodieBenchmarkBase} +import org.apache.spark.sql.functions.lit +import org.apache.spark.sql.{DataFrame, SparkSession} + +/** + * Benchmark to measure Avro SerDer performance. + */ +object AvroSerDerBenchmark extends HoodieBenchmarkBase { + protected val spark: SparkSession = getSparkSession + + def getSparkSession: SparkSession = SparkSession + .builder() + .master("local[1]") + .config("spark.driver.memory", "8G") + .appName(this.getClass.getCanonicalName) + .getOrCreate() + + def getDataFrame(numbers: Long): DataFrame = { + spark.range(0, numbers).toDF("id") + .withColumn("c1", lit("AvroSerDerBenchmark")) + .withColumn("c2", lit(12.99d)) + .withColumn("c3", lit(1)) + } + + /** + * Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Windows 10 10.0 + * Intel64 Family 6 Model 94 Stepping 3, GenuineIntel + * perf avro serializer for hoodie: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative + * ------------------------------------------------------------------------------------------------------------------------ + * serialize internalRow to avro Record 6391 6683 413 7.8 127.8 1.0X + */ + private def avroSerializerBenchmark: Unit = { + val benchmark = new HoodieBenchmark(s"perf avro serializer for hoodie", 50000000) + benchmark.addCase("serialize internalRow to avro Record") { _ => + val df = getDataFrame(50000000) + val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, "record", "my") + spark.sparkContext.getConf.registerAvroSchemas(avroSchema) + HoodieSparkUtils.createRdd(df,"record", "my", Some(avroSchema)).foreach(f => f) + } + benchmark.run() + } + + /** + * Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Windows 10 10.0 + * Intel64 Family 6 Model 94 Stepping 3, GenuineIntel + * perf avro deserializer for hoodie: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative + * ------------------------------------------------------------------------------------------------------------------------ + * deserialize avro Record to internalRow 1340 1360 27 7.5 134.0 1.0X + */ + private def avroDeserializerBenchmark: Unit = { + val benchmark = new HoodieBenchmark(s"perf avro deserializer for hoodie", 10000000) + val df = getDataFrame(10000000) + val sparkSchema = df.schema + val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, "record", "my") + val testRdd = HoodieSparkUtils.createRdd(df,"record", "my", Some(avroSchema)) + testRdd.cache() + testRdd.foreach(f => f) + spark.sparkContext.getConf.registerAvroSchemas(avroSchema) + benchmark.addCase("deserialize avro Record to internalRow") { _ => + testRdd.mapPartitions { iter => + val schema = AvroConversionUtils.convertStructTypeToAvroSchema(sparkSchema, "record", "my") + val avroToRowConverter = AvroConversionUtils.createAvroToInternalRowConverter(schema, sparkSchema) + iter.map(record => avroToRowConverter.apply(record.asInstanceOf[GenericRecord]).get) + }.foreach(f => f) + } + benchmark.run() + } + + override def afterAll(): Unit = { + spark.stop() + } + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + avroSerializerBenchmark + avroDeserializerBenchmark + } +}