From 4741d1db8e8e44f40c47cc96e8462867e6cdb47d Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Sun, 10 Feb 2019 07:51:04 +0000 Subject: [PATCH 1/3] Basic benchmarks for Iceberg Spark Data Source --- build.gradle | 21 +- gradle.properties | 2 + .../iceberg/spark/benchmark/base/Action.java | 25 +++ .../spark/benchmark/base/SparkBenchmark.java | 182 ++++++++++++++++++ .../SparkParquetFlatDataBenchmark.java | 59 ++++++ .../SparkParquetFlatDataFilterBenchmark.java | 121 ++++++++++++ .../SparkParquetFlatDataReadBenchmark.java | 152 +++++++++++++++ .../SparkParquetFlatDataWriteBenchmark.java | 89 +++++++++ .../SparkParquetNestedDataBenchmark.java | 58 ++++++ ...SparkParquetNestedDataFilterBenchmark.java | 120 ++++++++++++ .../SparkParquetNestedDataReadBenchmark.java | 153 +++++++++++++++ .../SparkParquetNestedDataWriteBenchmark.java | 89 +++++++++ 12 files changed, 1070 insertions(+), 1 deletion(-) create mode 100644 gradle.properties create mode 100644 spark/src/jmh/java/org/apache/iceberg/spark/benchmark/base/Action.java create mode 100644 spark/src/jmh/java/org/apache/iceberg/spark/benchmark/base/SparkBenchmark.java create mode 100644 spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetFlatDataBenchmark.java create mode 100644 spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetFlatDataFilterBenchmark.java create mode 100644 spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetFlatDataReadBenchmark.java create mode 100644 spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetFlatDataWriteBenchmark.java create mode 100644 spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetNestedDataBenchmark.java create mode 100644 spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetNestedDataFilterBenchmark.java create mode 100644 spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetNestedDataReadBenchmark.java create mode 100644 spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetNestedDataWriteBenchmark.java diff --git a/build.gradle b/build.gradle index 51d438234d69..cf2cc8834c62 100644 --- a/build.gradle +++ b/build.gradle @@ -21,7 +21,8 @@ buildscript { repositories { jcenter() gradlePluginPortal() - maven { url "http://palantir.bintray.com/releases" } + maven { url "http://palantir.bintray.com/releases" } + maven { url "https://plugins.gradle.org/m2/" } } dependencies { classpath 'com.github.jengelman.gradle.plugins:shadow:5.0.0' @@ -30,6 +31,7 @@ buildscript { classpath 'com.palantir.baseline:gradle-baseline-java:0.55.0' classpath 'com.diffplug.spotless:spotless-plugin-gradle:3.14.0' classpath 'gradle.plugin.org.inferred:gradle-processors:2.1.0' + classpath 'me.champeau.gradle:jmh-gradle-plugin:0.4.8' } } @@ -85,6 +87,8 @@ subprojects { scalaVersion = '2.11' sparkVersion = '2.4.0' caffeineVersion = "2.7.0" + + jmhVersion = '1.21' } sourceCompatibility = '1.8' @@ -160,6 +164,21 @@ configure(baselineProjects) { } } +def jmhProjects = [ project("iceberg-spark") ] + +configure(jmhProjects) { + apply plugin: 'me.champeau.gradle.jmh' + + jmh { + jmhVersion = jmhVersion + failOnError = true + forceGC = true + includeTests = true + humanOutputFile = file(jmhOutputPath) + include = [jmhIncludeRegex] + } +} + project(':iceberg-api') { dependencies { testCompile "org.apache.avro:avro:$avroVersion" diff --git a/gradle.properties b/gradle.properties new file mode 100644 index 000000000000..f2ff982d9caf --- /dev/null +++ b/gradle.properties @@ -0,0 +1,2 @@ +jmhOutputPath=build/reports/jmh/human-readable-output.txt +jmhIncludeRegex=.* diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/base/Action.java b/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/base/Action.java new file mode 100644 index 000000000000..64adbef8d308 --- /dev/null +++ b/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/base/Action.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.benchmark.base; + +@FunctionalInterface +public interface Action { + void invoke(); +} diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/base/SparkBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/base/SparkBenchmark.java new file mode 100644 index 000000000000..290bc985fdbc --- /dev/null +++ b/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/base/SparkBenchmark.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.benchmark.base; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.Table; +import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.types.StructType; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import static org.apache.iceberg.TableProperties.WRITE_NEW_DATA_LOCATION; + +@Fork(1) +@State(Scope.Benchmark) +@Warmup(iterations = 3) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.SingleShotTime) +public abstract class SparkBenchmark { + + private final Configuration hadoopConf = initHadoopConf(); + private final Table table = initTable(); + private SparkSession spark; + + protected abstract Configuration initHadoopConf(); + + protected final Configuration hadoopConf() { + return hadoopConf; + } + + protected abstract Table initTable(); + + protected final Table table() { + return table; + } + + protected final SparkSession spark() { + return spark; + } + + protected String newTableLocation() { + String tmpDir = hadoopConf.get("hadoop.tmp.dir"); + Path tablePath = new Path(tmpDir, "spark-iceberg-table-" + UUID.randomUUID()); + return tablePath.toString(); + } + + protected String dataLocation() { + Map properties = table.properties(); + return properties.getOrDefault(WRITE_NEW_DATA_LOCATION, String.format("%s/data", table.location())); + } + + protected void cleanupFiles() throws IOException { + try (FileSystem fileSystem = FileSystem.get(hadoopConf)) { + Path dataPath = new Path(dataLocation()); + fileSystem.delete(dataPath, true); + Path tablePath = new Path(table.location()); + fileSystem.delete(tablePath, true); + } + } + + protected void setupSpark() { + spark = SparkSession.builder() + .config("spark.ui.enabled", false) + .master("local") + .getOrCreate(); + Configuration sparkHadoopConf = spark.sparkContext().hadoopConfiguration(); + hadoopConf.forEach(entry -> sparkHadoopConf.set(entry.getKey(), entry.getValue())); + } + + protected void tearDownSpark() { + spark.stop(); + } + + protected void materialize(Dataset ds) { + ds.queryExecution().toRdd().toJavaRDD().foreach(record -> { }); + } + + protected void appendAsFile(Dataset ds) { + // ensure the schema is precise (including nullability) + StructType sparkSchema = SparkSchemaUtil.convert(table.schema()); + spark.createDataFrame(ds.rdd(), sparkSchema) + .coalesce(1) + .write() + .format("iceberg") + .mode(SaveMode.Append) + .save(table.location()); + } + + protected void withSQLConf(Map conf, Action action) { + SQLConf sqlConf = SQLConf.get(); + + Map currentConfValues = new HashMap<>(); + conf.keySet().forEach(confKey -> { + if (sqlConf.contains(confKey)) { + String currentConfValue = sqlConf.getConfString(confKey); + currentConfValues.put(confKey, currentConfValue); + } + }); + + conf.forEach((confKey, confValue) -> { + if (SQLConf.staticConfKeys().contains(confKey)) { + throw new RuntimeException("Cannot modify the value of a static config: " + confKey); + } + sqlConf.setConfString(confKey, confValue); + }); + + try { + action.invoke(); + } finally { + conf.forEach((confKey, confValue) -> { + if (currentConfValues.containsKey(confKey)) { + sqlConf.setConfString(confKey, currentConfValues.get(confKey)); + } else { + sqlConf.unsetConf(confKey); + } + }); + } + } + + protected void withTableProperties(Map props, Action action) { + Map tableProps = table.properties(); + Map currentPropValues = new HashMap<>(); + props.keySet().forEach(propKey -> { + if (tableProps.containsKey(propKey)) { + String currentPropValue = tableProps.get(propKey); + currentPropValues.put(propKey, currentPropValue); + } + }); + + UpdateProperties updateProperties = table.updateProperties(); + props.forEach(updateProperties::set); + updateProperties.commit(); + + try { + action.invoke(); + } finally { + UpdateProperties restoreProperties = table.updateProperties(); + props.forEach((propKey, propValue) -> { + if (currentPropValues.containsKey(propKey)) { + restoreProperties.set(propKey, currentPropValues.get(propKey)); + } else { + restoreProperties.remove(propKey); + } + }); + restoreProperties.commit(); + } + } +} diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetFlatDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetFlatDataBenchmark.java new file mode 100644 index 000000000000..ccd41d9c116c --- /dev/null +++ b/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetFlatDataBenchmark.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.benchmark.parquet; + +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.ConfigProperties; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.spark.benchmark.base.SparkBenchmark; +import org.apache.iceberg.types.Types; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +public abstract class SparkParquetFlatDataBenchmark extends SparkBenchmark { + + @Override + protected Configuration initHadoopConf() { + Configuration conf = new Configuration(); + conf.set(ConfigProperties.COMPRESS_METADATA, "true"); + return conf; + } + + @Override + protected final Table initTable() { + Schema schema = new Schema( + required(1, "longCol", Types.LongType.get()), + required(2, "intCol", Types.IntegerType.get()), + required(3, "floatCol", Types.FloatType.get()), + optional(4, "doubleCol", Types.DoubleType.get()), + optional(5, "decimalCol", Types.DecimalType.of(20, 5)), + optional(6, "dateCol", Types.DateType.get()), + optional(7, "timestampCol", Types.TimestampType.withZone()), + optional(8, "stringCol", Types.StringType.get())); + PartitionSpec partitionSpec = PartitionSpec.unpartitioned(); + HadoopTables tables = new HadoopTables(hadoopConf()); + return tables.create(schema, partitionSpec, Maps.newHashMap(), newTableLocation()); + } +} diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetFlatDataFilterBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetFlatDataFilterBenchmark.java new file mode 100644 index 000000000000..4448c65b8cc8 --- /dev/null +++ b/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetFlatDataFilterBenchmark.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.benchmark.parquet; + +import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.Map; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.internal.SQLConf; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; + +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.spark.sql.functions.current_date; +import static org.apache.spark.sql.functions.date_add; +import static org.apache.spark.sql.functions.expr; + +/** + * A benchmark that evaluates the file skipping capabilities in the Spark data source for Iceberg. + * + * This class uses a dataset with a flat schema, where the records are clustered according to the + * column used in the filter predicate. + * + * The performance is compared to the built-in file source in Spark. + * + * To run this benchmark: + * + * ./gradlew :iceberg-spark:jmh + * -PjmhIncludeRegex=SparkParquetFlatDataFilterBenchmark + * -PjmhOutputPath=benchmark/parquet-flat-data-filter-benchmark-result.txt + * + */ +public class SparkParquetFlatDataFilterBenchmark extends SparkParquetFlatDataBenchmark { + + private static final String FILTER_COND = "dateCol == date_add(current_date(), 1)"; + private static final int NUM_FILES = 500; + private static final int NUM_ROWS = 10000; + + @Setup + public void setup() { + setupSpark(); + appendData(); + } + + @TearDown + public void tearDown() throws IOException { + tearDownSpark(); + cleanupFiles(); + } + + @Benchmark + @Threads(1) + public void readWithFilterIceberg() { + Map tableProperties = Maps.newHashMap(); + tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); + withTableProperties(tableProperties, () -> { + String tableLocation = table().location(); + Dataset df = spark().read().format("iceberg").load(tableLocation).filter(FILTER_COND); + materialize(df); + }); + } + + @Benchmark + @Threads(1) + public void readWithFilterFileSourceVectorized() { + Map conf = Maps.newHashMap(); + conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "true"); + conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024)); + withSQLConf(conf, () -> { + Dataset df = spark().read().parquet(dataLocation()).filter(FILTER_COND); + materialize(df); + }); + } + + @Benchmark + @Threads(1) + public void readWithFilterFileSourceNonVectorized() { + Map conf = Maps.newHashMap(); + conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "false"); + conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024)); + withSQLConf(conf, () -> { + Dataset df = spark().read().parquet(dataLocation()).filter(FILTER_COND); + materialize(df); + }); + } + + private void appendData() { + for (int fileNum = 1; fileNum < NUM_FILES; fileNum++) { + Dataset df = spark().range(NUM_ROWS) + .withColumnRenamed("id", "longCol") + .withColumn("intCol", expr("CAST(longCol AS INT)")) + .withColumn("floatCol", expr("CAST(longCol AS FLOAT)")) + .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)")) + .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(20, 5))")) + .withColumn("dateCol", date_add(current_date(), fileNum)) + .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)")) + .withColumn("stringCol", expr("CAST(dateCol AS STRING)")); + appendAsFile(df); + } + } +} diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetFlatDataReadBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetFlatDataReadBenchmark.java new file mode 100644 index 000000000000..16972d79ddad --- /dev/null +++ b/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetFlatDataReadBenchmark.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.benchmark.parquet; + +import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.Map; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.internal.SQLConf; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; + +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.spark.sql.functions.current_date; +import static org.apache.spark.sql.functions.date_add; +import static org.apache.spark.sql.functions.expr; + +/** + * A benchmark that evaluates the performance of reading Parquet data with a flat schema + * using Iceberg and the built-in file source in Spark. + * + * To run this benchmark: + * + * ./gradlew :iceberg-spark:jmh + * -PjmhIncludeRegex=SparkParquetFlatDataReadBenchmark + * -PjmhOutputPath=benchmark/parquet-flat-data-read-benchmark-result.txt + * + */ +public class SparkParquetFlatDataReadBenchmark extends SparkParquetFlatDataBenchmark { + + private static final int NUM_FILES = 10; + private static final int NUM_ROWS = 1000000; + + @Setup + public void setup() { + setupSpark(); + appendData(); + } + + @TearDown + public void tearDown() throws IOException { + tearDownSpark(); + cleanupFiles(); + } + + @Benchmark + @Threads(1) + public void readIceberg() { + Map tableProperties = Maps.newHashMap(); + tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); + withTableProperties(tableProperties, () -> { + String tableLocation = table().location(); + Dataset df = spark().read().format("iceberg").load(tableLocation); + materialize(df); + }); + } + + @Benchmark + @Threads(1) + public void readFileSourceVectorized() { + Map conf = Maps.newHashMap(); + conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "true"); + conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024)); + withSQLConf(conf, () -> { + Dataset df = spark().read().parquet(dataLocation()); + materialize(df); + }); + } + + @Benchmark + @Threads(1) + public void readFileSourceNonVectorized() { + Map conf = Maps.newHashMap(); + conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "false"); + conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024)); + withSQLConf(conf, () -> { + Dataset df = spark().read().parquet(dataLocation()); + materialize(df); + }); + } + + @Benchmark + @Threads(1) + public void readWithProjectionIceberg() { + Map tableProperties = Maps.newHashMap(); + tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); + withTableProperties(tableProperties, () -> { + String tableLocation = table().location(); + Dataset df = spark().read().format("iceberg").load(tableLocation).select("longCol"); + materialize(df); + }); + } + + @Benchmark + @Threads(1) + public void readWithProjectionFileSourceVectorized() { + Map conf = Maps.newHashMap(); + conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "true"); + conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024)); + withSQLConf(conf, () -> { + Dataset df = spark().read().parquet(dataLocation()).select("longCol"); + materialize(df); + }); + } + + @Benchmark + @Threads(1) + public void readWithProjectionFileSourceNonVectorized() { + Map conf = Maps.newHashMap(); + conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "false"); + conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024)); + withSQLConf(conf, () -> { + Dataset df = spark().read().parquet(dataLocation()).select("longCol"); + materialize(df); + }); + } + + private void appendData() { + for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) { + Dataset df = spark().range(NUM_ROWS) + .withColumnRenamed("id", "longCol") + .withColumn("intCol", expr("CAST(longCol AS INT)")) + .withColumn("floatCol", expr("CAST(longCol AS FLOAT)")) + .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)")) + .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(20, 5))")) + .withColumn("dateCol", date_add(current_date(), fileNum)) + .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)")) + .withColumn("stringCol", expr("CAST(dateCol AS STRING)")); + appendAsFile(df); + } + } +} diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetFlatDataWriteBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetFlatDataWriteBenchmark.java new file mode 100644 index 000000000000..11ede522fd28 --- /dev/null +++ b/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetFlatDataWriteBenchmark.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.benchmark.parquet; + +import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.Map; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.internal.SQLConf; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; + +import static org.apache.spark.sql.functions.expr; + +/** + * A benchmark that evaluates the performance of writing Parquet data with a flat schema + * using Iceberg and the built-in file source in Spark. + * + * To run this benchmark: + * + * ./gradlew :iceberg-spark:jmh + * -PjmhIncludeRegex=SparkParquetFlatDataWriteBenchmark + * -PjmhOutputPath=benchmark/parquet-flat-data-write-benchmark-result.txt + * + */ +public class SparkParquetFlatDataWriteBenchmark extends SparkParquetFlatDataBenchmark { + + private static final int NUM_ROWS = 5000000; + + @Setup + public void setup() { + setupSpark(); + } + + @TearDown + public void tearDown() throws IOException { + tearDownSpark(); + cleanupFiles(); + } + + @Benchmark + @Threads(1) + public void writeIceberg() { + String tableLocation = table().location(); + benchmarkData().write().format("iceberg").mode(SaveMode.Append).save(tableLocation); + } + + @Benchmark + @Threads(1) + public void writeFileSource() { + Map conf = Maps.newHashMap(); + conf.put(SQLConf.PARQUET_COMPRESSION().key(), "gzip"); + withSQLConf(conf, () -> benchmarkData().write().mode(SaveMode.Append).parquet(dataLocation())); + } + + private Dataset benchmarkData() { + return spark().range(NUM_ROWS) + .withColumnRenamed("id", "longCol") + .withColumn("intCol", expr("CAST(longCol AS INT)")) + .withColumn("floatCol", expr("CAST(longCol AS FLOAT)")) + .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)")) + .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(20, 5))")) + .withColumn("dateCol", expr("DATE_ADD(CURRENT_DATE(), (longCol % 20))")) + .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)")) + .withColumn("stringCol", expr("CAST(dateCol AS STRING)")) + .coalesce(1); + } +} diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetNestedDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetNestedDataBenchmark.java new file mode 100644 index 000000000000..cb6a8781a6d3 --- /dev/null +++ b/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetNestedDataBenchmark.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.benchmark.parquet; + +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.ConfigProperties; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.spark.benchmark.base.SparkBenchmark; +import org.apache.iceberg.types.Types; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +public class SparkParquetNestedDataBenchmark extends SparkBenchmark { + + @Override + protected Configuration initHadoopConf() { + Configuration conf = new Configuration(); + conf.set(ConfigProperties.COMPRESS_METADATA, "true"); + return conf; + } + + @Override + protected final Table initTable() { + Schema schema = new Schema( + required(0, "id", Types.LongType.get()), + optional(4, "nested", Types.StructType.of( + required(1, "col1", Types.StringType.get()), + required(2, "col2", Types.DoubleType.get()), + required(3, "col3", Types.LongType.get()) + )) + ); + PartitionSpec partitionSpec = PartitionSpec.unpartitioned(); + HadoopTables tables = new HadoopTables(hadoopConf()); + return tables.create(schema, partitionSpec, Maps.newHashMap(), newTableLocation()); + } +} diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetNestedDataFilterBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetNestedDataFilterBenchmark.java new file mode 100644 index 000000000000..b438c1199e38 --- /dev/null +++ b/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetNestedDataFilterBenchmark.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.benchmark.parquet; + +import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.Map; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.internal.SQLConf; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; + +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.spark.sql.functions.expr; +import static org.apache.spark.sql.functions.lit; +import static org.apache.spark.sql.functions.struct; + +/** + * A benchmark that evaluates the file skipping capabilities in the Spark data source for Iceberg. + * + * This class uses a dataset with nested data, where the records are clustered according to the + * column used in the filter predicate. + * + * The performance is compared to the built-in file source in Spark. + * + * To run this benchmark: + * + * ./gradlew :iceberg-spark:jmh + * -PjmhIncludeRegex=SparkParquetNestedDataFilterBenchmark + * -PjmhOutputPath=benchmark/parquet-nested-data-filter-benchmark-result.txt + * + */ +public class SparkParquetNestedDataFilterBenchmark extends SparkParquetNestedDataBenchmark { + + private static final String FILTER_COND = "nested.col3 == 0"; + private static final int NUM_FILES = 500; + private static final int NUM_ROWS = 10000; + + @Setup + public void setup() { + setupSpark(); + appendData(); + } + + @TearDown + public void tearDown() throws IOException { + tearDownSpark(); + cleanupFiles(); + } + + @Benchmark + @Threads(1) + public void readWithFilterIceberg() { + Map tableProperties = Maps.newHashMap(); + tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); + withTableProperties(tableProperties, () -> { + String tableLocation = table().location(); + Dataset df = spark().read().format("iceberg").load(tableLocation).filter(FILTER_COND); + materialize(df); + }); + } + + @Benchmark + @Threads(1) + public void readWithFilterFileSourceVectorized() { + Map conf = Maps.newHashMap(); + conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "true"); + conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024)); + withSQLConf(conf, () -> { + Dataset df = spark().read().parquet(dataLocation()).filter(FILTER_COND); + materialize(df); + }); + } + + @Benchmark + @Threads(1) + public void readWithFilterFileSourceNonVectorized() { + Map conf = Maps.newHashMap(); + conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "false"); + conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024)); + withSQLConf(conf, () -> { + Dataset df = spark().read().parquet(dataLocation()).filter(FILTER_COND); + materialize(df); + }); + } + + private void appendData() { + for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) { + Dataset df = spark().range(NUM_ROWS) + .withColumn( + "nested", + struct( + expr("CAST(id AS string) AS col1"), + expr("CAST(id AS double) AS col2"), + lit(fileNum).cast("long").as("col3") + )); + appendAsFile(df); + } + } +} diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetNestedDataReadBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetNestedDataReadBenchmark.java new file mode 100644 index 000000000000..4e2cdb5de3f2 --- /dev/null +++ b/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetNestedDataReadBenchmark.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.benchmark.parquet; + +import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.Map; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.internal.SQLConf; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; + +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.spark.sql.functions.expr; +import static org.apache.spark.sql.functions.lit; +import static org.apache.spark.sql.functions.struct; + +/** + * A benchmark that evaluates the performance of reading nested Parquet data using Iceberg + * and the built-in file source in Spark. + * + * To run this benchmark: + * + * ./gradlew :iceberg-spark:jmh + * -PjmhIncludeRegex=SparkParquetNestedDataReadBenchmark + * -PjmhOutputPath=benchmark/parquet-nested-data-read-benchmark-result.txt + * + */ +public class SparkParquetNestedDataReadBenchmark extends SparkParquetNestedDataBenchmark { + + private static final int NUM_FILES = 10; + private static final int NUM_ROWS = 1000000; + + @Setup + public void setup() { + setupSpark(); + appendData(); + } + + @TearDown + public void tearDown() throws IOException { + tearDownSpark(); + cleanupFiles(); + } + + @Benchmark + @Threads(1) + public void readIceberg() { + Map tableProperties = Maps.newHashMap(); + tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); + withTableProperties(tableProperties, () -> { + String tableLocation = table().location(); + Dataset df = spark().read().format("iceberg").load(tableLocation); + materialize(df); + }); + } + + @Benchmark + @Threads(1) + public void readFileSourceVectorized() { + Map conf = Maps.newHashMap(); + conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "true"); + conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024)); + withSQLConf(conf, () -> { + Dataset df = spark().read().parquet(dataLocation()); + materialize(df); + }); + } + + @Benchmark + @Threads(1) + public void readFileSourceNonVectorized() { + Map conf = Maps.newHashMap(); + conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "false"); + conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024)); + withSQLConf(conf, () -> { + Dataset df = spark().read().parquet(dataLocation()); + materialize(df); + }); + } + + @Benchmark + @Threads(1) + public void readWithProjectionIceberg() { + Map tableProperties = Maps.newHashMap(); + tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); + withTableProperties(tableProperties, () -> { + String tableLocation = table().location(); + Dataset df = spark().read().format("iceberg").load(tableLocation).selectExpr("nested.col3"); + materialize(df); + }); + } + + @Benchmark + @Threads(1) + public void readWithProjectionFileSourceVectorized() { + Map conf = Maps.newHashMap(); + conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "true"); + conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024)); + conf.put(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED().key(), "true"); + withSQLConf(conf, () -> { + Dataset df = spark().read().parquet(dataLocation()).selectExpr("nested.col3"); + materialize(df); + }); + } + + @Benchmark + @Threads(1) + public void readWithProjectionFileSourceNonVectorized() { + Map conf = Maps.newHashMap(); + conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "false"); + conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024)); + conf.put(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED().key(), "true"); + withSQLConf(conf, () -> { + Dataset df = spark().read().parquet(dataLocation()).selectExpr("nested.col3"); + materialize(df); + }); + } + + private void appendData() { + for (int fileNum = 0; fileNum < NUM_FILES; fileNum++) { + Dataset df = spark().range(NUM_ROWS) + .withColumn( + "nested", + struct( + expr("CAST(id AS string) AS col1"), + expr("CAST(id AS double) AS col2"), + lit(fileNum).cast("long").as("col3") + )); + appendAsFile(df); + } + } +} diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetNestedDataWriteBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetNestedDataWriteBenchmark.java new file mode 100644 index 000000000000..3c3e549c1fcb --- /dev/null +++ b/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetNestedDataWriteBenchmark.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.benchmark.parquet; + +import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.Map; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.internal.SQLConf; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; + +import static org.apache.spark.sql.functions.expr; +import static org.apache.spark.sql.functions.struct; + +/** + * A benchmark that evaluates the performance of writing nested Parquet data using Iceberg + * and the built-in file source in Spark. + * + * To run this benchmark: + * + * ./gradlew :iceberg-spark:jmh + * -PjmhIncludeRegex=SparkParquetNestedDataWriteBenchmark + * -PjmhOutputPath=benchmark/parquet-nested-data-write-benchmark-result.txt + * + */ +public class SparkParquetNestedDataWriteBenchmark extends SparkParquetNestedDataBenchmark { + + private static final int NUM_ROWS = 5000000; + + @Setup + public void setup() { + setupSpark(); + } + + @TearDown + public void tearDown() throws IOException { + tearDownSpark(); + cleanupFiles(); + } + + @Benchmark + @Threads(1) + public void writeIceberg() { + String tableLocation = table().location(); + benchmarkData().write().format("iceberg").mode(SaveMode.Append).save(tableLocation); + } + + @Benchmark + @Threads(1) + public void writeFileSource() { + Map conf = Maps.newHashMap(); + conf.put(SQLConf.PARQUET_COMPRESSION().key(), "gzip"); + withSQLConf(conf, () -> benchmarkData().write().mode(SaveMode.Append).parquet(dataLocation())); + } + + private Dataset benchmarkData() { + return spark().range(NUM_ROWS) + .withColumn( + "nested", + struct( + expr("CAST(id AS string) AS col1"), + expr("CAST(id AS double) AS col2"), + expr("id AS col3") + )) + .coalesce(1); + } +} From 08ac8a16ae16b3bd3ad9dfd2bc306fc5beb25f6a Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Fri, 24 May 2019 22:30:37 +0100 Subject: [PATCH 2/3] Add benchmarks for Parquet readers/writers & restructure code --- .../iceberg/spark/SparkBenchmarkUtil.java | 57 +++++ .../SparkParquetReadersFlatDataBenchmark.java | 215 ++++++++++++++++++ ...parkParquetReadersNestedDataBenchmark.java | 215 ++++++++++++++++++ .../SparkParquetWritersFlatDataBenchmark.java | 123 ++++++++++ ...parkParquetWritersNestedDataBenchmark.java | 122 ++++++++++ .../{benchmark/base => source}/Action.java | 2 +- .../IcebergSourceBenchmark.java} | 4 +- .../IcebergSourceFlatDataBenchmark.java} | 5 +- .../IcebergSourceNestedDataBenchmark.java} | 5 +- ...SourceFlatParquetDataFilterBenchmark.java} | 9 +- ...rgSourceFlatParquetDataReadBenchmark.java} | 9 +- ...gSourceFlatParquetDataWriteBenchmark.java} | 9 +- ...urceNestedParquetDataFilterBenchmark.java} | 9 +- ...SourceNestedParquetDataReadBenchmark.java} | 9 +- ...ourceNestedParquetDataWriteBenchmark.java} | 9 +- 15 files changed, 769 insertions(+), 33 deletions(-) create mode 100644 spark/src/jmh/java/org/apache/iceberg/spark/SparkBenchmarkUtil.java create mode 100644 spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java create mode 100644 spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java create mode 100644 spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java create mode 100644 spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java rename spark/src/jmh/java/org/apache/iceberg/spark/{benchmark/base => source}/Action.java (94%) rename spark/src/jmh/java/org/apache/iceberg/spark/{benchmark/base/SparkBenchmark.java => source/IcebergSourceBenchmark.java} (98%) rename spark/src/jmh/java/org/apache/iceberg/spark/{benchmark/parquet/SparkParquetFlatDataBenchmark.java => source/IcebergSourceFlatDataBenchmark.java} (92%) rename spark/src/jmh/java/org/apache/iceberg/spark/{benchmark/parquet/SparkParquetNestedDataBenchmark.java => source/IcebergSourceNestedDataBenchmark.java} (91%) rename spark/src/jmh/java/org/apache/iceberg/spark/{benchmark/parquet/SparkParquetFlatDataFilterBenchmark.java => source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java} (91%) rename spark/src/jmh/java/org/apache/iceberg/spark/{benchmark/parquet/SparkParquetFlatDataReadBenchmark.java => source/parquet/IcebergSourceFlatParquetDataReadBenchmark.java} (93%) rename spark/src/jmh/java/org/apache/iceberg/spark/{benchmark/parquet/SparkParquetFlatDataWriteBenchmark.java => source/parquet/IcebergSourceFlatParquetDataWriteBenchmark.java} (88%) rename spark/src/jmh/java/org/apache/iceberg/spark/{benchmark/parquet/SparkParquetNestedDataFilterBenchmark.java => source/parquet/IcebergSourceNestedParquetDataFilterBenchmark.java} (90%) rename spark/src/jmh/java/org/apache/iceberg/spark/{benchmark/parquet/SparkParquetNestedDataReadBenchmark.java => source/parquet/IcebergSourceNestedParquetDataReadBenchmark.java} (92%) rename spark/src/jmh/java/org/apache/iceberg/spark/{benchmark/parquet/SparkParquetNestedDataWriteBenchmark.java => source/parquet/IcebergSourceNestedParquetDataWriteBenchmark.java} (86%) diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/SparkBenchmarkUtil.java b/spark/src/jmh/java/org/apache/iceberg/spark/SparkBenchmarkUtil.java new file mode 100644 index 000000000000..6399bc7e5899 --- /dev/null +++ b/spark/src/jmh/java/org/apache/iceberg/spark/SparkBenchmarkUtil.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark; + +import com.google.common.collect.Lists; +import java.util.List; +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.catalyst.expressions.Attribute; +import org.apache.spark.sql.catalyst.expressions.AttributeReference; +import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection; +import org.apache.spark.sql.types.StructType; +import scala.collection.JavaConverters; + +public class SparkBenchmarkUtil { + + private SparkBenchmarkUtil() {} + + public static UnsafeProjection projection(Schema expectedSchema, Schema actualSchema) { + StructType struct = SparkSchemaUtil.convert(actualSchema); + + List refs = JavaConverters.seqAsJavaListConverter(struct.toAttributes()).asJava(); + List attrs = Lists.newArrayListWithExpectedSize(struct.fields().length); + List exprs = Lists.newArrayListWithExpectedSize(struct.fields().length); + + for (AttributeReference ref : refs) { + attrs.add(ref.toAttribute()); + } + + for (Types.NestedField field : expectedSchema.columns()) { + int indexInIterSchema = struct.fieldIndex(field.name()); + exprs.add(refs.get(indexInIterSchema)); + } + + return UnsafeProjection.create( + JavaConverters.asScalaBufferConverter(exprs).asScala().toSeq(), + JavaConverters.asScalaBufferConverter(attrs).asScala().toSeq()); + } +} diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java new file mode 100644 index 000000000000..f170a9251365 --- /dev/null +++ b/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.data.parquet; + +import com.google.common.collect.Iterables; +import java.io.File; +import java.io.IOException; +import java.util.List; +import org.apache.avro.generic.GenericData; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.data.RandomData; +import org.apache.iceberg.spark.data.SparkParquetReaders; +import org.apache.iceberg.spark.SparkBenchmarkUtil; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection; +import org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport; +import org.apache.spark.sql.types.StructType; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +/** + * A benchmark that evaluates the performance of reading Parquet data with a flat schema using + * Iceberg and Spark Parquet readers. + * + * To run this benchmark: + * + * ./gradlew :iceberg-spark:jmh + * -PjmhIncludeRegex=SparkParquetReadersFlatDataBenchmark + * -PjmhOutputPath=benchmark/spark-parquet-readers-flat-data-benchmark-result.txt + * + */ +@Fork(1) +@State(Scope.Benchmark) +@Warmup(iterations = 3) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.SingleShotTime) +public class SparkParquetReadersFlatDataBenchmark { + + private static final DynMethods.UnboundMethod APPLY_PROJECTION = DynMethods.builder("apply") + .impl(UnsafeProjection.class, InternalRow.class) + .build(); + private static final Schema SCHEMA = new Schema( + required(1, "longCol", Types.LongType.get()), + required(2, "intCol", Types.IntegerType.get()), + required(3, "floatCol", Types.FloatType.get()), + optional(4, "doubleCol", Types.DoubleType.get()), + optional(5, "decimalCol", Types.DecimalType.of(20, 5)), + optional(6, "dateCol", Types.DateType.get()), + optional(7, "timestampCol", Types.TimestampType.withZone()), + optional(8, "stringCol", Types.StringType.get())); + private static final Schema PROJECTED_SCHEMA = new Schema( + required(1, "longCol", Types.LongType.get()), + optional(5, "decimalCol", Types.DecimalType.of(20, 5)), + optional(8, "stringCol", Types.StringType.get())); + private static final int NUM_RECORDS = 10000000; + private File dataFile; + + @Setup + public void setup() throws IOException { + dataFile = File.createTempFile("parquet-flat-data-benchmark", ".parquet"); + List records = RandomData.generateList(SCHEMA, NUM_RECORDS, 0L); + try (FileAppender writer = Parquet.write(Files.localOutput(dataFile)) + .schema(SCHEMA) + .named("benchmark") + .build()) { + writer.addAll(records); + } + } + + @TearDown + public void tearDown() { + if (dataFile != null) { + dataFile.delete(); + } + } + + @Benchmark + @Threads(1) + public void readUsingIcebergReader(Blackhole blackHole) throws IOException { + try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) + .project(SCHEMA) + .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type)) + .build()) { + + for (InternalRow row : rows) { + blackHole.consume(row); + } + } + } + + @Benchmark + @Threads(1) + public void readUsingIcebergReaderUnsafe(Blackhole blackHole) throws IOException { + try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) + .project(SCHEMA) + .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type)) + .build()) { + + Iterable unsafeRows = Iterables.transform( + rows, + APPLY_PROJECTION.bind(SparkBenchmarkUtil.projection(SCHEMA, SCHEMA))::invoke); + + for (InternalRow row : unsafeRows) { + blackHole.consume(row); + } + } + } + + @Benchmark + @Threads(1) + public void readUsingSparkReader(Blackhole blackHole) throws IOException { + StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA); + try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) + .project(SCHEMA) + .readSupport(new ParquetReadSupport()) + .set("org.apache.spark.sql.parquet.row.requested_schema", sparkSchema.json()) + .set("spark.sql.parquet.binaryAsString", "false") + .set("spark.sql.parquet.int96AsTimestamp", "false") + .callInit() + .build()) { + + for (InternalRow row : rows) { + blackHole.consume(row); + } + } + } + + @Benchmark + @Threads(1) + public void readWithProjectionUsingIcebergReader(Blackhole blackHole) throws IOException { + try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) + .project(PROJECTED_SCHEMA) + .createReaderFunc(type -> SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type)) + .build()) { + + for (InternalRow row : rows) { + blackHole.consume(row); + } + } + } + + @Benchmark + @Threads(1) + public void readWithProjectionUsingIcebergReaderUnsafe(Blackhole blackHole) throws IOException { + try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) + .project(PROJECTED_SCHEMA) + .createReaderFunc(type -> SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type)) + .build()) { + + Iterable unsafeRows = Iterables.transform( + rows, + APPLY_PROJECTION.bind(SparkBenchmarkUtil.projection(PROJECTED_SCHEMA, PROJECTED_SCHEMA))::invoke); + + for (InternalRow row : unsafeRows) { + blackHole.consume(row); + } + } + } + + @Benchmark + @Threads(1) + public void readWithProjectionUsingSparkReader(Blackhole blackHole) throws IOException { + StructType sparkSchema = SparkSchemaUtil.convert(PROJECTED_SCHEMA); + try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) + .project(PROJECTED_SCHEMA) + .readSupport(new ParquetReadSupport()) + .set("org.apache.spark.sql.parquet.row.requested_schema", sparkSchema.json()) + .set("spark.sql.parquet.binaryAsString", "false") + .set("spark.sql.parquet.int96AsTimestamp", "false") + .callInit() + .build()) { + + for (InternalRow row : rows) { + blackHole.consume(row); + } + } + } +} diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java new file mode 100644 index 000000000000..0cb16d73619d --- /dev/null +++ b/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.data.parquet; + +import com.google.common.collect.Iterables; +import java.io.File; +import java.io.IOException; +import java.util.List; +import org.apache.avro.generic.GenericData; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.data.RandomData; +import org.apache.iceberg.spark.data.SparkParquetReaders; +import org.apache.iceberg.spark.SparkBenchmarkUtil; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection; +import org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport; +import org.apache.spark.sql.types.StructType; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +/** + * A benchmark that evaluates the performance of reading nested Parquet data using + * Iceberg and Spark Parquet readers. + * + * To run this benchmark: + * + * ./gradlew :iceberg-spark:jmh + * -PjmhIncludeRegex=SparkParquetReadersNestedDataBenchmark + * -PjmhOutputPath=benchmark/spark-parquet-readers-nested-data-benchmark-result.txt + * + */ +@Fork(1) +@State(Scope.Benchmark) +@Warmup(iterations = 3) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.SingleShotTime) +public class SparkParquetReadersNestedDataBenchmark { + + private static final DynMethods.UnboundMethod APPLY_PROJECTION = DynMethods.builder("apply") + .impl(UnsafeProjection.class, InternalRow.class) + .build(); + private static final Schema SCHEMA = new Schema( + required(0, "id", Types.LongType.get()), + optional(4, "nested", Types.StructType.of( + required(1, "col1", Types.StringType.get()), + required(2, "col2", Types.DoubleType.get()), + required(3, "col3", Types.LongType.get()) + )) + ); + private static final Schema PROJECTED_SCHEMA = new Schema( + optional(4, "nested", Types.StructType.of( + required(1, "col1", Types.StringType.get()) + )) + ); + private static final int NUM_RECORDS = 10000000; + private File dataFile; + + @Setup + public void setup() throws IOException { + dataFile = File.createTempFile("parquet-flat-data-benchmark", ".parquet"); + List records = RandomData.generateList(SCHEMA, NUM_RECORDS, 0L); + try (FileAppender writer = Parquet.write(Files.localOutput(dataFile)) + .schema(SCHEMA) + .named("benchmark") + .build()) { + writer.addAll(records); + } + } + + @TearDown + public void tearDown() { + if (dataFile != null) { + dataFile.delete(); + } + } + + @Benchmark + @Threads(1) + public void readUsingIcebergReader(Blackhole blackHole) throws IOException { + try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) + .project(SCHEMA) + .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type)) + .build()) { + + for (InternalRow row : rows) { + blackHole.consume(row); + } + } + } + + @Benchmark + @Threads(1) + public void readUsingIcebergReaderUnsafe(Blackhole blackHole) throws IOException { + try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) + .project(SCHEMA) + .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type)) + .build()) { + + Iterable unsafeRows = Iterables.transform( + rows, + APPLY_PROJECTION.bind(SparkBenchmarkUtil.projection(SCHEMA, SCHEMA))::invoke); + + for (InternalRow row : unsafeRows) { + blackHole.consume(row); + } + } + } + + @Benchmark + @Threads(1) + public void readUsingSparkReader(Blackhole blackHole) throws IOException { + StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA); + try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) + .project(SCHEMA) + .readSupport(new ParquetReadSupport()) + .set("org.apache.spark.sql.parquet.row.requested_schema", sparkSchema.json()) + .set("spark.sql.parquet.binaryAsString", "false") + .set("spark.sql.parquet.int96AsTimestamp", "false") + .callInit() + .build()) { + + for (InternalRow row : rows) { + blackHole.consume(row); + } + } + } + + @Benchmark + @Threads(1) + public void readWithProjectionUsingIcebergReader(Blackhole blackHole) throws IOException { + try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) + .project(PROJECTED_SCHEMA) + .createReaderFunc(type -> SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type)) + .build()) { + + for (InternalRow row : rows) { + blackHole.consume(row); + } + } + } + + @Benchmark + @Threads(1) + public void readWithProjectionUsingIcebergReaderUnsafe(Blackhole blackHole) throws IOException { + try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) + .project(PROJECTED_SCHEMA) + .createReaderFunc(type -> SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type)) + .build()) { + + Iterable unsafeRows = Iterables.transform( + rows, + APPLY_PROJECTION.bind(SparkBenchmarkUtil.projection(PROJECTED_SCHEMA, PROJECTED_SCHEMA))::invoke); + + for (InternalRow row : unsafeRows) { + blackHole.consume(row); + } + } + } + + @Benchmark + @Threads(1) + public void readWithProjectionUsingSparkReader(Blackhole blackHole) throws IOException { + StructType sparkSchema = SparkSchemaUtil.convert(PROJECTED_SCHEMA); + try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) + .project(PROJECTED_SCHEMA) + .readSupport(new ParquetReadSupport()) + .set("org.apache.spark.sql.parquet.row.requested_schema", sparkSchema.json()) + .set("spark.sql.parquet.binaryAsString", "false") + .set("spark.sql.parquet.int96AsTimestamp", "false") + .callInit() + .build()) { + + for (InternalRow row : rows) { + blackHole.consume(row); + } + } + } +} diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java new file mode 100644 index 000000000000..af1e940bde78 --- /dev/null +++ b/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.data.parquet; + +import java.io.File; +import java.io.IOException; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.data.RandomData; +import org.apache.iceberg.spark.data.SparkParquetWriters; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport; +import org.apache.spark.sql.types.StructType; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +/** + * A benchmark that evaluates the performance of writing Parquet data with a flat schema using + * Iceberg and Spark Parquet writers. + * + * To run this benchmark: + * + * ./gradlew :iceberg-spark:jmh + * -PjmhIncludeRegex=SparkParquetWritersFlatDataBenchmark + * -PjmhOutputPath=benchmark/spark-parquet-writers-flat-data-benchmark-result.txt + * + */ +@Fork(1) +@State(Scope.Benchmark) +@Warmup(iterations = 3) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.SingleShotTime) +public class SparkParquetWritersFlatDataBenchmark { + + private static final Schema SCHEMA = new Schema( + required(1, "longCol", Types.LongType.get()), + required(2, "intCol", Types.IntegerType.get()), + required(3, "floatCol", Types.FloatType.get()), + optional(4, "doubleCol", Types.DoubleType.get()), + optional(5, "decimalCol", Types.DecimalType.of(20, 5)), + optional(6, "dateCol", Types.DateType.get()), + optional(7, "timestampCol", Types.TimestampType.withZone()), + optional(8, "stringCol", Types.StringType.get())); + private static final int NUM_RECORDS = 1000000; + private Iterable rows; + private File dataFile; + + @Setup + public void setup() throws IOException { + rows = RandomData.generateSpark(SCHEMA, NUM_RECORDS, 0L); + dataFile = File.createTempFile("parquet-flat-data-benchmark", ".parquet"); + } + + @TearDown + public void tearDown() { + if (dataFile != null) { + dataFile.delete(); + } + } + + @Benchmark + @Threads(1) + public void writeUsingIcebergWriter() throws IOException { + try (FileAppender writer = Parquet.write(Files.localOutput(dataFile)) + .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(SCHEMA, msgType)) + .schema(SCHEMA) + .build()) { + + writer.addAll(rows); + } + } + + @Benchmark + @Threads(1) + public void writeUsingSparkWriter() throws IOException { + StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA); + try (FileAppender writer = Parquet.write(Files.localOutput(dataFile)) + .writeSupport(new ParquetWriteSupport()) + .set("org.apache.spark.sql.parquet.row.attributes", sparkSchema.json()) + .set("spark.sql.parquet.writeLegacyFormat", "false") + .set("spark.sql.parquet.binaryAsString", "false") + .set("spark.sql.parquet.int96AsTimestamp", "false") + .set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS") + .schema(SCHEMA) + .build()) { + + writer.addAll(rows); + } + } +} diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java new file mode 100644 index 000000000000..9d3126c7216e --- /dev/null +++ b/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.data.parquet; + +import java.io.File; +import java.io.IOException; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.data.RandomData; +import org.apache.iceberg.spark.data.SparkParquetWriters; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport; +import org.apache.spark.sql.types.StructType; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +/** + * A benchmark that evaluates the performance of writing nested Parquet data using + * Iceberg and Spark Parquet writers. + * + * To run this benchmark: + * + * ./gradlew :iceberg-spark:jmh + * -PjmhIncludeRegex=SparkParquetWritersNestedDataBenchmark + * -PjmhOutputPath=benchmark/spark-parquet-writers-nested-data-benchmark-result.txt + * + */ +@Fork(1) +@State(Scope.Benchmark) +@Warmup(iterations = 3) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.SingleShotTime) +public class SparkParquetWritersNestedDataBenchmark { + + private static final Schema SCHEMA = new Schema( + required(0, "id", Types.LongType.get()), + optional(4, "nested", Types.StructType.of( + required(1, "col1", Types.StringType.get()), + required(2, "col2", Types.DoubleType.get()), + required(3, "col3", Types.LongType.get()) + )) + ); + private static final int NUM_RECORDS = 1000000; + private Iterable rows; + private File dataFile; + + @Setup + public void setup() throws IOException { + rows = RandomData.generateSpark(SCHEMA, NUM_RECORDS, 0L); + dataFile = File.createTempFile("parquet-nested-data-benchmark", ".parquet"); + } + + @TearDown + public void tearDown() { + if (dataFile != null) { + dataFile.delete(); + } + } + + @Benchmark + @Threads(1) + public void writeUsingIcebergWriter() throws IOException { + try (FileAppender writer = Parquet.write(Files.localOutput(dataFile)) + .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(SCHEMA, msgType)) + .schema(SCHEMA) + .build()) { + + writer.addAll(rows); + } + } + + @Benchmark + @Threads(1) + public void writeUsingSparkWriter() throws IOException { + StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA); + try (FileAppender writer = Parquet.write(Files.localOutput(dataFile)) + .writeSupport(new ParquetWriteSupport()) + .set("org.apache.spark.sql.parquet.row.attributes", sparkSchema.json()) + .set("spark.sql.parquet.writeLegacyFormat", "false") + .set("spark.sql.parquet.binaryAsString", "false") + .set("spark.sql.parquet.int96AsTimestamp", "false") + .set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS") + .schema(SCHEMA) + .build()) { + + writer.addAll(rows); + } + } +} diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/base/Action.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/Action.java similarity index 94% rename from spark/src/jmh/java/org/apache/iceberg/spark/benchmark/base/Action.java rename to spark/src/jmh/java/org/apache/iceberg/spark/source/Action.java index 64adbef8d308..1820a801b2fb 100644 --- a/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/base/Action.java +++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/Action.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iceberg.spark.benchmark.base; +package org.apache.iceberg.spark.source; @FunctionalInterface public interface Action { diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/base/SparkBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java similarity index 98% rename from spark/src/jmh/java/org/apache/iceberg/spark/benchmark/base/SparkBenchmark.java rename to spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java index 290bc985fdbc..1648ee709c98 100644 --- a/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/base/SparkBenchmark.java +++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iceberg.spark.benchmark.base; +package org.apache.iceberg.spark.source; import java.io.IOException; import java.util.HashMap; @@ -50,7 +50,7 @@ @Warmup(iterations = 3) @Measurement(iterations = 5) @BenchmarkMode(Mode.SingleShotTime) -public abstract class SparkBenchmark { +public abstract class IcebergSourceBenchmark { private final Configuration hadoopConf = initHadoopConf(); private final Table table = initTable(); diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetFlatDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java similarity index 92% rename from spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetFlatDataBenchmark.java rename to spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java index ccd41d9c116c..72346a72748c 100644 --- a/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetFlatDataBenchmark.java +++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iceberg.spark.benchmark.parquet; +package org.apache.iceberg.spark.source; import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; @@ -26,13 +26,12 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.hadoop.HadoopTables; -import org.apache.iceberg.spark.benchmark.base.SparkBenchmark; import org.apache.iceberg.types.Types; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; -public abstract class SparkParquetFlatDataBenchmark extends SparkBenchmark { +public abstract class IcebergSourceFlatDataBenchmark extends IcebergSourceBenchmark { @Override protected Configuration initHadoopConf() { diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetNestedDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceNestedDataBenchmark.java similarity index 91% rename from spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetNestedDataBenchmark.java rename to spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceNestedDataBenchmark.java index cb6a8781a6d3..bc542579f417 100644 --- a/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetNestedDataBenchmark.java +++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceNestedDataBenchmark.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iceberg.spark.benchmark.parquet; +package org.apache.iceberg.spark.source; import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; @@ -26,13 +26,12 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.hadoop.HadoopTables; -import org.apache.iceberg.spark.benchmark.base.SparkBenchmark; import org.apache.iceberg.types.Types; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; -public class SparkParquetNestedDataBenchmark extends SparkBenchmark { +public abstract class IcebergSourceNestedDataBenchmark extends IcebergSourceBenchmark { @Override protected Configuration initHadoopConf() { diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetFlatDataFilterBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java similarity index 91% rename from spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetFlatDataFilterBenchmark.java rename to spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java index 4448c65b8cc8..192f90122b89 100644 --- a/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetFlatDataFilterBenchmark.java +++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java @@ -17,11 +17,12 @@ * under the License. */ -package org.apache.iceberg.spark.benchmark.parquet; +package org.apache.iceberg.spark.source.parquet; import com.google.common.collect.Maps; import java.io.IOException; import java.util.Map; +import org.apache.iceberg.spark.source.IcebergSourceFlatDataBenchmark; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.internal.SQLConf; @@ -46,11 +47,11 @@ * To run this benchmark: * * ./gradlew :iceberg-spark:jmh - * -PjmhIncludeRegex=SparkParquetFlatDataFilterBenchmark - * -PjmhOutputPath=benchmark/parquet-flat-data-filter-benchmark-result.txt + * -PjmhIncludeRegex=IcebergSourceFlatParquetDataFilterBenchmark + * -PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-filter-benchmark-result.txt * */ -public class SparkParquetFlatDataFilterBenchmark extends SparkParquetFlatDataBenchmark { +public class IcebergSourceFlatParquetDataFilterBenchmark extends IcebergSourceFlatDataBenchmark { private static final String FILTER_COND = "dateCol == date_add(current_date(), 1)"; private static final int NUM_FILES = 500; diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetFlatDataReadBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataReadBenchmark.java similarity index 93% rename from spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetFlatDataReadBenchmark.java rename to spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataReadBenchmark.java index 16972d79ddad..2cc4a3d7c87b 100644 --- a/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetFlatDataReadBenchmark.java +++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataReadBenchmark.java @@ -17,11 +17,12 @@ * under the License. */ -package org.apache.iceberg.spark.benchmark.parquet; +package org.apache.iceberg.spark.source.parquet; import com.google.common.collect.Maps; import java.io.IOException; import java.util.Map; +import org.apache.iceberg.spark.source.IcebergSourceFlatDataBenchmark; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.internal.SQLConf; @@ -42,11 +43,11 @@ * To run this benchmark: * * ./gradlew :iceberg-spark:jmh - * -PjmhIncludeRegex=SparkParquetFlatDataReadBenchmark - * -PjmhOutputPath=benchmark/parquet-flat-data-read-benchmark-result.txt + * -PjmhIncludeRegex=IcebergSourceFlatParquetDataReadBenchmark + * -PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-read-benchmark-result.txt * */ -public class SparkParquetFlatDataReadBenchmark extends SparkParquetFlatDataBenchmark { +public class IcebergSourceFlatParquetDataReadBenchmark extends IcebergSourceFlatDataBenchmark { private static final int NUM_FILES = 10; private static final int NUM_ROWS = 1000000; diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetFlatDataWriteBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataWriteBenchmark.java similarity index 88% rename from spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetFlatDataWriteBenchmark.java rename to spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataWriteBenchmark.java index 11ede522fd28..7c83abc5260d 100644 --- a/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetFlatDataWriteBenchmark.java +++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataWriteBenchmark.java @@ -17,11 +17,12 @@ * under the License. */ -package org.apache.iceberg.spark.benchmark.parquet; +package org.apache.iceberg.spark.source.parquet; import com.google.common.collect.Maps; import java.io.IOException; import java.util.Map; +import org.apache.iceberg.spark.source.IcebergSourceFlatDataBenchmark; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; @@ -40,11 +41,11 @@ * To run this benchmark: * * ./gradlew :iceberg-spark:jmh - * -PjmhIncludeRegex=SparkParquetFlatDataWriteBenchmark - * -PjmhOutputPath=benchmark/parquet-flat-data-write-benchmark-result.txt + * -PjmhIncludeRegex=IcebergSourceFlatParquetDataWriteBenchmark + * -PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-write-benchmark-result.txt * */ -public class SparkParquetFlatDataWriteBenchmark extends SparkParquetFlatDataBenchmark { +public class IcebergSourceFlatParquetDataWriteBenchmark extends IcebergSourceFlatDataBenchmark { private static final int NUM_ROWS = 5000000; diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetNestedDataFilterBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataFilterBenchmark.java similarity index 90% rename from spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetNestedDataFilterBenchmark.java rename to spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataFilterBenchmark.java index b438c1199e38..c53c61e53523 100644 --- a/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetNestedDataFilterBenchmark.java +++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataFilterBenchmark.java @@ -17,11 +17,12 @@ * under the License. */ -package org.apache.iceberg.spark.benchmark.parquet; +package org.apache.iceberg.spark.source.parquet; import com.google.common.collect.Maps; import java.io.IOException; import java.util.Map; +import org.apache.iceberg.spark.source.IcebergSourceNestedDataBenchmark; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.internal.SQLConf; @@ -46,11 +47,11 @@ * To run this benchmark: * * ./gradlew :iceberg-spark:jmh - * -PjmhIncludeRegex=SparkParquetNestedDataFilterBenchmark - * -PjmhOutputPath=benchmark/parquet-nested-data-filter-benchmark-result.txt + * -PjmhIncludeRegex=IcebergSourceNestedParquetDataFilterBenchmark + * -PjmhOutputPath=benchmark/iceberg-source-nested-parquet-data-filter-benchmark-result.txt * */ -public class SparkParquetNestedDataFilterBenchmark extends SparkParquetNestedDataBenchmark { +public class IcebergSourceNestedParquetDataFilterBenchmark extends IcebergSourceNestedDataBenchmark { private static final String FILTER_COND = "nested.col3 == 0"; private static final int NUM_FILES = 500; diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetNestedDataReadBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataReadBenchmark.java similarity index 92% rename from spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetNestedDataReadBenchmark.java rename to spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataReadBenchmark.java index 4e2cdb5de3f2..8d308ad3f651 100644 --- a/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetNestedDataReadBenchmark.java +++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataReadBenchmark.java @@ -17,11 +17,12 @@ * under the License. */ -package org.apache.iceberg.spark.benchmark.parquet; +package org.apache.iceberg.spark.source.parquet; import com.google.common.collect.Maps; import java.io.IOException; import java.util.Map; +import org.apache.iceberg.spark.source.IcebergSourceNestedDataBenchmark; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.internal.SQLConf; @@ -42,11 +43,11 @@ * To run this benchmark: * * ./gradlew :iceberg-spark:jmh - * -PjmhIncludeRegex=SparkParquetNestedDataReadBenchmark - * -PjmhOutputPath=benchmark/parquet-nested-data-read-benchmark-result.txt + * -PjmhIncludeRegex=IcebergSourceNestedParquetDataReadBenchmark + * -PjmhOutputPath=benchmark/iceberg-source-nested-parquet-data-read-benchmark-result.txt * */ -public class SparkParquetNestedDataReadBenchmark extends SparkParquetNestedDataBenchmark { +public class IcebergSourceNestedParquetDataReadBenchmark extends IcebergSourceNestedDataBenchmark { private static final int NUM_FILES = 10; private static final int NUM_ROWS = 1000000; diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetNestedDataWriteBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataWriteBenchmark.java similarity index 86% rename from spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetNestedDataWriteBenchmark.java rename to spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataWriteBenchmark.java index 3c3e549c1fcb..a8189d9e7004 100644 --- a/spark/src/jmh/java/org/apache/iceberg/spark/benchmark/parquet/SparkParquetNestedDataWriteBenchmark.java +++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataWriteBenchmark.java @@ -17,11 +17,12 @@ * under the License. */ -package org.apache.iceberg.spark.benchmark.parquet; +package org.apache.iceberg.spark.source.parquet; import com.google.common.collect.Maps; import java.io.IOException; import java.util.Map; +import org.apache.iceberg.spark.source.IcebergSourceNestedDataBenchmark; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; @@ -41,11 +42,11 @@ * To run this benchmark: * * ./gradlew :iceberg-spark:jmh - * -PjmhIncludeRegex=SparkParquetNestedDataWriteBenchmark - * -PjmhOutputPath=benchmark/parquet-nested-data-write-benchmark-result.txt + * -PjmhIncludeRegex=IcebergSourceNestedParquetDataWriteBenchmark + * -PjmhOutputPath=benchmark/iceberg-source-nested-parquet-data-write-benchmark-result.txt * */ -public class SparkParquetNestedDataWriteBenchmark extends SparkParquetNestedDataBenchmark { +public class IcebergSourceNestedParquetDataWriteBenchmark extends IcebergSourceNestedDataBenchmark { private static final int NUM_ROWS = 5000000; From f09ee06c4704c8dc42eae2c8007cf455d0246e9a Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Thu, 6 Jun 2019 00:33:02 +0100 Subject: [PATCH 3/3] Minor style fixes --- build.gradle | 1 - .../SparkParquetReadersFlatDataBenchmark.java | 26 +++++++-------- ...parkParquetReadersNestedDataBenchmark.java | 32 +++++++++---------- .../SparkParquetWritersFlatDataBenchmark.java | 4 +-- ...parkParquetWritersNestedDataBenchmark.java | 4 +-- ...gSourceFlatParquetDataFilterBenchmark.java | 4 +-- ...ergSourceFlatParquetDataReadBenchmark.java | 4 +-- ...rgSourceFlatParquetDataWriteBenchmark.java | 4 +-- ...ourceNestedParquetDataFilterBenchmark.java | 4 +-- ...gSourceNestedParquetDataReadBenchmark.java | 4 +-- ...SourceNestedParquetDataWriteBenchmark.java | 4 +-- 11 files changed, 45 insertions(+), 46 deletions(-) diff --git a/build.gradle b/build.gradle index cf2cc8834c62..70de24ff427b 100644 --- a/build.gradle +++ b/build.gradle @@ -87,7 +87,6 @@ subprojects { scalaVersion = '2.11' sparkVersion = '2.4.0' caffeineVersion = "2.7.0" - jmhVersion = '1.21' } diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java index f170a9251365..92fbdee7d97e 100644 --- a/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java +++ b/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java @@ -30,10 +30,10 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.spark.SparkBenchmarkUtil; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.RandomData; import org.apache.iceberg.spark.data.SparkParquetReaders; -import org.apache.iceberg.spark.SparkBenchmarkUtil; import org.apache.iceberg.types.Types; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.UnsafeProjection; @@ -93,7 +93,7 @@ public class SparkParquetReadersFlatDataBenchmark { private File dataFile; @Setup - public void setup() throws IOException { + public void setupBenchmark() throws IOException { dataFile = File.createTempFile("parquet-flat-data-benchmark", ".parquet"); List records = RandomData.generateList(SCHEMA, NUM_RECORDS, 0L); try (FileAppender writer = Parquet.write(Files.localOutput(dataFile)) @@ -105,7 +105,7 @@ public void setup() throws IOException { } @TearDown - public void tearDown() { + public void tearDownBenchmark() { if (dataFile != null) { dataFile.delete(); } @@ -127,7 +127,7 @@ public void readUsingIcebergReader(Blackhole blackHole) throws IOException { @Benchmark @Threads(1) - public void readUsingIcebergReaderUnsafe(Blackhole blackHole) throws IOException { + public void readUsingIcebergReaderUnsafe(Blackhole blackhole) throws IOException { try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) .project(SCHEMA) .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type)) @@ -138,14 +138,14 @@ public void readUsingIcebergReaderUnsafe(Blackhole blackHole) throws IOException APPLY_PROJECTION.bind(SparkBenchmarkUtil.projection(SCHEMA, SCHEMA))::invoke); for (InternalRow row : unsafeRows) { - blackHole.consume(row); + blackhole.consume(row); } } } @Benchmark @Threads(1) - public void readUsingSparkReader(Blackhole blackHole) throws IOException { + public void readUsingSparkReader(Blackhole blackhole) throws IOException { StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA); try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) .project(SCHEMA) @@ -157,28 +157,28 @@ public void readUsingSparkReader(Blackhole blackHole) throws IOException { .build()) { for (InternalRow row : rows) { - blackHole.consume(row); + blackhole.consume(row); } } } @Benchmark @Threads(1) - public void readWithProjectionUsingIcebergReader(Blackhole blackHole) throws IOException { + public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws IOException { try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) .project(PROJECTED_SCHEMA) .createReaderFunc(type -> SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type)) .build()) { for (InternalRow row : rows) { - blackHole.consume(row); + blackhole.consume(row); } } } @Benchmark @Threads(1) - public void readWithProjectionUsingIcebergReaderUnsafe(Blackhole blackHole) throws IOException { + public void readWithProjectionUsingIcebergReaderUnsafe(Blackhole blackhole) throws IOException { try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) .project(PROJECTED_SCHEMA) .createReaderFunc(type -> SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type)) @@ -189,14 +189,14 @@ public void readWithProjectionUsingIcebergReaderUnsafe(Blackhole blackHole) thro APPLY_PROJECTION.bind(SparkBenchmarkUtil.projection(PROJECTED_SCHEMA, PROJECTED_SCHEMA))::invoke); for (InternalRow row : unsafeRows) { - blackHole.consume(row); + blackhole.consume(row); } } } @Benchmark @Threads(1) - public void readWithProjectionUsingSparkReader(Blackhole blackHole) throws IOException { + public void readWithProjectionUsingSparkReader(Blackhole blackhole) throws IOException { StructType sparkSchema = SparkSchemaUtil.convert(PROJECTED_SCHEMA); try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) .project(PROJECTED_SCHEMA) @@ -208,7 +208,7 @@ public void readWithProjectionUsingSparkReader(Blackhole blackHole) throws IOExc .build()) { for (InternalRow row : rows) { - blackHole.consume(row); + blackhole.consume(row); } } } diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java index 0cb16d73619d..199d884da1b0 100644 --- a/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java +++ b/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java @@ -30,10 +30,10 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.spark.SparkBenchmarkUtil; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.RandomData; import org.apache.iceberg.spark.data.SparkParquetReaders; -import org.apache.iceberg.spark.SparkBenchmarkUtil; import org.apache.iceberg.types.Types; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.UnsafeProjection; @@ -93,8 +93,8 @@ public class SparkParquetReadersNestedDataBenchmark { private File dataFile; @Setup - public void setup() throws IOException { - dataFile = File.createTempFile("parquet-flat-data-benchmark", ".parquet"); + public void setupBenchmark() throws IOException { + dataFile = File.createTempFile("parquet-nested-data-benchmark", ".parquet"); List records = RandomData.generateList(SCHEMA, NUM_RECORDS, 0L); try (FileAppender writer = Parquet.write(Files.localOutput(dataFile)) .schema(SCHEMA) @@ -105,7 +105,7 @@ public void setup() throws IOException { } @TearDown - public void tearDown() { + public void tearDownBenchmark() { if (dataFile != null) { dataFile.delete(); } @@ -113,21 +113,21 @@ public void tearDown() { @Benchmark @Threads(1) - public void readUsingIcebergReader(Blackhole blackHole) throws IOException { + public void readUsingIcebergReader(Blackhole blackhole) throws IOException { try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) .project(SCHEMA) .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type)) .build()) { for (InternalRow row : rows) { - blackHole.consume(row); + blackhole.consume(row); } } } @Benchmark @Threads(1) - public void readUsingIcebergReaderUnsafe(Blackhole blackHole) throws IOException { + public void readUsingIcebergReaderUnsafe(Blackhole blackhole) throws IOException { try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) .project(SCHEMA) .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type)) @@ -138,14 +138,14 @@ public void readUsingIcebergReaderUnsafe(Blackhole blackHole) throws IOException APPLY_PROJECTION.bind(SparkBenchmarkUtil.projection(SCHEMA, SCHEMA))::invoke); for (InternalRow row : unsafeRows) { - blackHole.consume(row); + blackhole.consume(row); } } } @Benchmark @Threads(1) - public void readUsingSparkReader(Blackhole blackHole) throws IOException { + public void readUsingSparkReader(Blackhole blackhole) throws IOException { StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA); try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) .project(SCHEMA) @@ -157,28 +157,28 @@ public void readUsingSparkReader(Blackhole blackHole) throws IOException { .build()) { for (InternalRow row : rows) { - blackHole.consume(row); + blackhole.consume(row); } } } @Benchmark @Threads(1) - public void readWithProjectionUsingIcebergReader(Blackhole blackHole) throws IOException { + public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws IOException { try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) .project(PROJECTED_SCHEMA) .createReaderFunc(type -> SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type)) .build()) { for (InternalRow row : rows) { - blackHole.consume(row); + blackhole.consume(row); } } } @Benchmark @Threads(1) - public void readWithProjectionUsingIcebergReaderUnsafe(Blackhole blackHole) throws IOException { + public void readWithProjectionUsingIcebergReaderUnsafe(Blackhole blackhole) throws IOException { try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) .project(PROJECTED_SCHEMA) .createReaderFunc(type -> SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type)) @@ -189,14 +189,14 @@ public void readWithProjectionUsingIcebergReaderUnsafe(Blackhole blackHole) thro APPLY_PROJECTION.bind(SparkBenchmarkUtil.projection(PROJECTED_SCHEMA, PROJECTED_SCHEMA))::invoke); for (InternalRow row : unsafeRows) { - blackHole.consume(row); + blackhole.consume(row); } } } @Benchmark @Threads(1) - public void readWithProjectionUsingSparkReader(Blackhole blackHole) throws IOException { + public void readWithProjectionUsingSparkReader(Blackhole blackhole) throws IOException { StructType sparkSchema = SparkSchemaUtil.convert(PROJECTED_SCHEMA); try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) .project(PROJECTED_SCHEMA) @@ -208,7 +208,7 @@ public void readWithProjectionUsingSparkReader(Blackhole blackHole) throws IOExc .build()) { for (InternalRow row : rows) { - blackHole.consume(row); + blackhole.consume(row); } } } diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java index af1e940bde78..8cb5b07020e5 100644 --- a/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java +++ b/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java @@ -79,13 +79,13 @@ public class SparkParquetWritersFlatDataBenchmark { private File dataFile; @Setup - public void setup() throws IOException { + public void setupBenchmark() throws IOException { rows = RandomData.generateSpark(SCHEMA, NUM_RECORDS, 0L); dataFile = File.createTempFile("parquet-flat-data-benchmark", ".parquet"); } @TearDown - public void tearDown() { + public void tearDownBenchmark() { if (dataFile != null) { dataFile.delete(); } diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java index 9d3126c7216e..dd395f519916 100644 --- a/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java +++ b/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java @@ -78,13 +78,13 @@ public class SparkParquetWritersNestedDataBenchmark { private File dataFile; @Setup - public void setup() throws IOException { + public void setupBenchmark() throws IOException { rows = RandomData.generateSpark(SCHEMA, NUM_RECORDS, 0L); dataFile = File.createTempFile("parquet-nested-data-benchmark", ".parquet"); } @TearDown - public void tearDown() { + public void tearDownBenchmark() { if (dataFile != null) { dataFile.delete(); } diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java index 192f90122b89..1c9164593eaa 100644 --- a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java +++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java @@ -58,13 +58,13 @@ public class IcebergSourceFlatParquetDataFilterBenchmark extends IcebergSourceFl private static final int NUM_ROWS = 10000; @Setup - public void setup() { + public void setupBenchmark() { setupSpark(); appendData(); } @TearDown - public void tearDown() throws IOException { + public void tearDownBenchmark() throws IOException { tearDownSpark(); cleanupFiles(); } diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataReadBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataReadBenchmark.java index 2cc4a3d7c87b..37a63369981c 100644 --- a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataReadBenchmark.java +++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataReadBenchmark.java @@ -53,13 +53,13 @@ public class IcebergSourceFlatParquetDataReadBenchmark extends IcebergSourceFlat private static final int NUM_ROWS = 1000000; @Setup - public void setup() { + public void setupBenchmark() { setupSpark(); appendData(); } @TearDown - public void tearDown() throws IOException { + public void tearDownBenchmark() throws IOException { tearDownSpark(); cleanupFiles(); } diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataWriteBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataWriteBenchmark.java index 7c83abc5260d..ab62f53e7008 100644 --- a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataWriteBenchmark.java +++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataWriteBenchmark.java @@ -50,12 +50,12 @@ public class IcebergSourceFlatParquetDataWriteBenchmark extends IcebergSourceFla private static final int NUM_ROWS = 5000000; @Setup - public void setup() { + public void setupBenchmark() { setupSpark(); } @TearDown - public void tearDown() throws IOException { + public void tearDownBenchmark() throws IOException { tearDownSpark(); cleanupFiles(); } diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataFilterBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataFilterBenchmark.java index c53c61e53523..c12f3ee668a2 100644 --- a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataFilterBenchmark.java +++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataFilterBenchmark.java @@ -58,13 +58,13 @@ public class IcebergSourceNestedParquetDataFilterBenchmark extends IcebergSource private static final int NUM_ROWS = 10000; @Setup - public void setup() { + public void setupBenchmark() { setupSpark(); appendData(); } @TearDown - public void tearDown() throws IOException { + public void tearDownBenchmark() throws IOException { tearDownSpark(); cleanupFiles(); } diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataReadBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataReadBenchmark.java index 8d308ad3f651..acba2e5d330d 100644 --- a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataReadBenchmark.java +++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataReadBenchmark.java @@ -53,13 +53,13 @@ public class IcebergSourceNestedParquetDataReadBenchmark extends IcebergSourceNe private static final int NUM_ROWS = 1000000; @Setup - public void setup() { + public void setupBenchmark() { setupSpark(); appendData(); } @TearDown - public void tearDown() throws IOException { + public void tearDownBenchmark() throws IOException { tearDownSpark(); cleanupFiles(); } diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataWriteBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataWriteBenchmark.java index a8189d9e7004..0bd98610da99 100644 --- a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataWriteBenchmark.java +++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataWriteBenchmark.java @@ -51,12 +51,12 @@ public class IcebergSourceNestedParquetDataWriteBenchmark extends IcebergSourceN private static final int NUM_ROWS = 5000000; @Setup - public void setup() { + public void setupBenchmark() { setupSpark(); } @TearDown - public void tearDown() throws IOException { + public void tearDownBenchmark() throws IOException { tearDownSpark(); cleanupFiles(); }