diff --git a/build.gradle b/build.gradle index 51d438234d69..70de24ff427b 100644 --- a/build.gradle +++ b/build.gradle @@ -21,7 +21,8 @@ buildscript { repositories { jcenter() gradlePluginPortal() - maven { url "http://palantir.bintray.com/releases" } + maven { url "http://palantir.bintray.com/releases" } + maven { url "https://plugins.gradle.org/m2/" } } dependencies { classpath 'com.github.jengelman.gradle.plugins:shadow:5.0.0' @@ -30,6 +31,7 @@ buildscript { classpath 'com.palantir.baseline:gradle-baseline-java:0.55.0' classpath 'com.diffplug.spotless:spotless-plugin-gradle:3.14.0' classpath 'gradle.plugin.org.inferred:gradle-processors:2.1.0' + classpath 'me.champeau.gradle:jmh-gradle-plugin:0.4.8' } } @@ -85,6 +87,7 @@ subprojects { scalaVersion = '2.11' sparkVersion = '2.4.0' caffeineVersion = "2.7.0" + jmhVersion = '1.21' } sourceCompatibility = '1.8' @@ -160,6 +163,21 @@ configure(baselineProjects) { } } +def jmhProjects = [ project("iceberg-spark") ] + +configure(jmhProjects) { + apply plugin: 'me.champeau.gradle.jmh' + + jmh { + jmhVersion = jmhVersion + failOnError = true + forceGC = true + includeTests = true + humanOutputFile = file(jmhOutputPath) + include = [jmhIncludeRegex] + } +} + project(':iceberg-api') { dependencies { testCompile "org.apache.avro:avro:$avroVersion" diff --git a/gradle.properties b/gradle.properties new file mode 100644 index 000000000000..f2ff982d9caf --- /dev/null +++ b/gradle.properties @@ -0,0 +1,2 @@ +jmhOutputPath=build/reports/jmh/human-readable-output.txt +jmhIncludeRegex=.* diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/SparkBenchmarkUtil.java b/spark/src/jmh/java/org/apache/iceberg/spark/SparkBenchmarkUtil.java new file mode 100644 index 000000000000..6399bc7e5899 --- /dev/null +++ b/spark/src/jmh/java/org/apache/iceberg/spark/SparkBenchmarkUtil.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark; + +import com.google.common.collect.Lists; +import java.util.List; +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.catalyst.expressions.Attribute; +import org.apache.spark.sql.catalyst.expressions.AttributeReference; +import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection; +import org.apache.spark.sql.types.StructType; +import scala.collection.JavaConverters; + +public class SparkBenchmarkUtil { + + private SparkBenchmarkUtil() {} + + public static UnsafeProjection projection(Schema expectedSchema, Schema actualSchema) { + StructType struct = SparkSchemaUtil.convert(actualSchema); + + List refs = JavaConverters.seqAsJavaListConverter(struct.toAttributes()).asJava(); + List attrs = Lists.newArrayListWithExpectedSize(struct.fields().length); + List exprs = Lists.newArrayListWithExpectedSize(struct.fields().length); + + for (AttributeReference ref : refs) { + attrs.add(ref.toAttribute()); + } + + for (Types.NestedField field : expectedSchema.columns()) { + int indexInIterSchema = struct.fieldIndex(field.name()); + exprs.add(refs.get(indexInIterSchema)); + } + + return UnsafeProjection.create( + JavaConverters.asScalaBufferConverter(exprs).asScala().toSeq(), + JavaConverters.asScalaBufferConverter(attrs).asScala().toSeq()); + } +} diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java new file mode 100644 index 000000000000..92fbdee7d97e --- /dev/null +++ b/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.data.parquet; + +import com.google.common.collect.Iterables; +import java.io.File; +import java.io.IOException; +import java.util.List; +import org.apache.avro.generic.GenericData; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.spark.SparkBenchmarkUtil; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.data.RandomData; +import org.apache.iceberg.spark.data.SparkParquetReaders; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection; +import org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport; +import org.apache.spark.sql.types.StructType; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +/** + * A benchmark that evaluates the performance of reading Parquet data with a flat schema using + * Iceberg and Spark Parquet readers. + * + * To run this benchmark: + * + * ./gradlew :iceberg-spark:jmh + * -PjmhIncludeRegex=SparkParquetReadersFlatDataBenchmark + * -PjmhOutputPath=benchmark/spark-parquet-readers-flat-data-benchmark-result.txt + * + */ +@Fork(1) +@State(Scope.Benchmark) +@Warmup(iterations = 3) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.SingleShotTime) +public class SparkParquetReadersFlatDataBenchmark { + + private static final DynMethods.UnboundMethod APPLY_PROJECTION = DynMethods.builder("apply") + .impl(UnsafeProjection.class, InternalRow.class) + .build(); + private static final Schema SCHEMA = new Schema( + required(1, "longCol", Types.LongType.get()), + required(2, "intCol", Types.IntegerType.get()), + required(3, "floatCol", Types.FloatType.get()), + optional(4, "doubleCol", Types.DoubleType.get()), + optional(5, "decimalCol", Types.DecimalType.of(20, 5)), + optional(6, "dateCol", Types.DateType.get()), + optional(7, "timestampCol", Types.TimestampType.withZone()), + optional(8, "stringCol", Types.StringType.get())); + private static final Schema PROJECTED_SCHEMA = new Schema( + required(1, "longCol", Types.LongType.get()), + optional(5, "decimalCol", Types.DecimalType.of(20, 5)), + optional(8, "stringCol", Types.StringType.get())); + private static final int NUM_RECORDS = 10000000; + private File dataFile; + + @Setup + public void setupBenchmark() throws IOException { + dataFile = File.createTempFile("parquet-flat-data-benchmark", ".parquet"); + List records = RandomData.generateList(SCHEMA, NUM_RECORDS, 0L); + try (FileAppender writer = Parquet.write(Files.localOutput(dataFile)) + .schema(SCHEMA) + .named("benchmark") + .build()) { + writer.addAll(records); + } + } + + @TearDown + public void tearDownBenchmark() { + if (dataFile != null) { + dataFile.delete(); + } + } + + @Benchmark + @Threads(1) + public void readUsingIcebergReader(Blackhole blackHole) throws IOException { + try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) + .project(SCHEMA) + .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type)) + .build()) { + + for (InternalRow row : rows) { + blackHole.consume(row); + } + } + } + + @Benchmark + @Threads(1) + public void readUsingIcebergReaderUnsafe(Blackhole blackhole) throws IOException { + try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) + .project(SCHEMA) + .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type)) + .build()) { + + Iterable unsafeRows = Iterables.transform( + rows, + APPLY_PROJECTION.bind(SparkBenchmarkUtil.projection(SCHEMA, SCHEMA))::invoke); + + for (InternalRow row : unsafeRows) { + blackhole.consume(row); + } + } + } + + @Benchmark + @Threads(1) + public void readUsingSparkReader(Blackhole blackhole) throws IOException { + StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA); + try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) + .project(SCHEMA) + .readSupport(new ParquetReadSupport()) + .set("org.apache.spark.sql.parquet.row.requested_schema", sparkSchema.json()) + .set("spark.sql.parquet.binaryAsString", "false") + .set("spark.sql.parquet.int96AsTimestamp", "false") + .callInit() + .build()) { + + for (InternalRow row : rows) { + blackhole.consume(row); + } + } + } + + @Benchmark + @Threads(1) + public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws IOException { + try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) + .project(PROJECTED_SCHEMA) + .createReaderFunc(type -> SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type)) + .build()) { + + for (InternalRow row : rows) { + blackhole.consume(row); + } + } + } + + @Benchmark + @Threads(1) + public void readWithProjectionUsingIcebergReaderUnsafe(Blackhole blackhole) throws IOException { + try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) + .project(PROJECTED_SCHEMA) + .createReaderFunc(type -> SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type)) + .build()) { + + Iterable unsafeRows = Iterables.transform( + rows, + APPLY_PROJECTION.bind(SparkBenchmarkUtil.projection(PROJECTED_SCHEMA, PROJECTED_SCHEMA))::invoke); + + for (InternalRow row : unsafeRows) { + blackhole.consume(row); + } + } + } + + @Benchmark + @Threads(1) + public void readWithProjectionUsingSparkReader(Blackhole blackhole) throws IOException { + StructType sparkSchema = SparkSchemaUtil.convert(PROJECTED_SCHEMA); + try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) + .project(PROJECTED_SCHEMA) + .readSupport(new ParquetReadSupport()) + .set("org.apache.spark.sql.parquet.row.requested_schema", sparkSchema.json()) + .set("spark.sql.parquet.binaryAsString", "false") + .set("spark.sql.parquet.int96AsTimestamp", "false") + .callInit() + .build()) { + + for (InternalRow row : rows) { + blackhole.consume(row); + } + } + } +} diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java new file mode 100644 index 000000000000..199d884da1b0 --- /dev/null +++ b/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.data.parquet; + +import com.google.common.collect.Iterables; +import java.io.File; +import java.io.IOException; +import java.util.List; +import org.apache.avro.generic.GenericData; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.spark.SparkBenchmarkUtil; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.data.RandomData; +import org.apache.iceberg.spark.data.SparkParquetReaders; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection; +import org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport; +import org.apache.spark.sql.types.StructType; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +/** + * A benchmark that evaluates the performance of reading nested Parquet data using + * Iceberg and Spark Parquet readers. + * + * To run this benchmark: + * + * ./gradlew :iceberg-spark:jmh + * -PjmhIncludeRegex=SparkParquetReadersNestedDataBenchmark + * -PjmhOutputPath=benchmark/spark-parquet-readers-nested-data-benchmark-result.txt + * + */ +@Fork(1) +@State(Scope.Benchmark) +@Warmup(iterations = 3) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.SingleShotTime) +public class SparkParquetReadersNestedDataBenchmark { + + private static final DynMethods.UnboundMethod APPLY_PROJECTION = DynMethods.builder("apply") + .impl(UnsafeProjection.class, InternalRow.class) + .build(); + private static final Schema SCHEMA = new Schema( + required(0, "id", Types.LongType.get()), + optional(4, "nested", Types.StructType.of( + required(1, "col1", Types.StringType.get()), + required(2, "col2", Types.DoubleType.get()), + required(3, "col3", Types.LongType.get()) + )) + ); + private static final Schema PROJECTED_SCHEMA = new Schema( + optional(4, "nested", Types.StructType.of( + required(1, "col1", Types.StringType.get()) + )) + ); + private static final int NUM_RECORDS = 10000000; + private File dataFile; + + @Setup + public void setupBenchmark() throws IOException { + dataFile = File.createTempFile("parquet-nested-data-benchmark", ".parquet"); + List records = RandomData.generateList(SCHEMA, NUM_RECORDS, 0L); + try (FileAppender writer = Parquet.write(Files.localOutput(dataFile)) + .schema(SCHEMA) + .named("benchmark") + .build()) { + writer.addAll(records); + } + } + + @TearDown + public void tearDownBenchmark() { + if (dataFile != null) { + dataFile.delete(); + } + } + + @Benchmark + @Threads(1) + public void readUsingIcebergReader(Blackhole blackhole) throws IOException { + try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) + .project(SCHEMA) + .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type)) + .build()) { + + for (InternalRow row : rows) { + blackhole.consume(row); + } + } + } + + @Benchmark + @Threads(1) + public void readUsingIcebergReaderUnsafe(Blackhole blackhole) throws IOException { + try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) + .project(SCHEMA) + .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, type)) + .build()) { + + Iterable unsafeRows = Iterables.transform( + rows, + APPLY_PROJECTION.bind(SparkBenchmarkUtil.projection(SCHEMA, SCHEMA))::invoke); + + for (InternalRow row : unsafeRows) { + blackhole.consume(row); + } + } + } + + @Benchmark + @Threads(1) + public void readUsingSparkReader(Blackhole blackhole) throws IOException { + StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA); + try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) + .project(SCHEMA) + .readSupport(new ParquetReadSupport()) + .set("org.apache.spark.sql.parquet.row.requested_schema", sparkSchema.json()) + .set("spark.sql.parquet.binaryAsString", "false") + .set("spark.sql.parquet.int96AsTimestamp", "false") + .callInit() + .build()) { + + for (InternalRow row : rows) { + blackhole.consume(row); + } + } + } + + @Benchmark + @Threads(1) + public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws IOException { + try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) + .project(PROJECTED_SCHEMA) + .createReaderFunc(type -> SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type)) + .build()) { + + for (InternalRow row : rows) { + blackhole.consume(row); + } + } + } + + @Benchmark + @Threads(1) + public void readWithProjectionUsingIcebergReaderUnsafe(Blackhole blackhole) throws IOException { + try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) + .project(PROJECTED_SCHEMA) + .createReaderFunc(type -> SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type)) + .build()) { + + Iterable unsafeRows = Iterables.transform( + rows, + APPLY_PROJECTION.bind(SparkBenchmarkUtil.projection(PROJECTED_SCHEMA, PROJECTED_SCHEMA))::invoke); + + for (InternalRow row : unsafeRows) { + blackhole.consume(row); + } + } + } + + @Benchmark + @Threads(1) + public void readWithProjectionUsingSparkReader(Blackhole blackhole) throws IOException { + StructType sparkSchema = SparkSchemaUtil.convert(PROJECTED_SCHEMA); + try (CloseableIterable rows = Parquet.read(Files.localInput(dataFile)) + .project(PROJECTED_SCHEMA) + .readSupport(new ParquetReadSupport()) + .set("org.apache.spark.sql.parquet.row.requested_schema", sparkSchema.json()) + .set("spark.sql.parquet.binaryAsString", "false") + .set("spark.sql.parquet.int96AsTimestamp", "false") + .callInit() + .build()) { + + for (InternalRow row : rows) { + blackhole.consume(row); + } + } + } +} diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java new file mode 100644 index 000000000000..8cb5b07020e5 --- /dev/null +++ b/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.data.parquet; + +import java.io.File; +import java.io.IOException; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.data.RandomData; +import org.apache.iceberg.spark.data.SparkParquetWriters; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport; +import org.apache.spark.sql.types.StructType; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +/** + * A benchmark that evaluates the performance of writing Parquet data with a flat schema using + * Iceberg and Spark Parquet writers. + * + * To run this benchmark: + * + * ./gradlew :iceberg-spark:jmh + * -PjmhIncludeRegex=SparkParquetWritersFlatDataBenchmark + * -PjmhOutputPath=benchmark/spark-parquet-writers-flat-data-benchmark-result.txt + * + */ +@Fork(1) +@State(Scope.Benchmark) +@Warmup(iterations = 3) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.SingleShotTime) +public class SparkParquetWritersFlatDataBenchmark { + + private static final Schema SCHEMA = new Schema( + required(1, "longCol", Types.LongType.get()), + required(2, "intCol", Types.IntegerType.get()), + required(3, "floatCol", Types.FloatType.get()), + optional(4, "doubleCol", Types.DoubleType.get()), + optional(5, "decimalCol", Types.DecimalType.of(20, 5)), + optional(6, "dateCol", Types.DateType.get()), + optional(7, "timestampCol", Types.TimestampType.withZone()), + optional(8, "stringCol", Types.StringType.get())); + private static final int NUM_RECORDS = 1000000; + private Iterable rows; + private File dataFile; + + @Setup + public void setupBenchmark() throws IOException { + rows = RandomData.generateSpark(SCHEMA, NUM_RECORDS, 0L); + dataFile = File.createTempFile("parquet-flat-data-benchmark", ".parquet"); + } + + @TearDown + public void tearDownBenchmark() { + if (dataFile != null) { + dataFile.delete(); + } + } + + @Benchmark + @Threads(1) + public void writeUsingIcebergWriter() throws IOException { + try (FileAppender writer = Parquet.write(Files.localOutput(dataFile)) + .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(SCHEMA, msgType)) + .schema(SCHEMA) + .build()) { + + writer.addAll(rows); + } + } + + @Benchmark + @Threads(1) + public void writeUsingSparkWriter() throws IOException { + StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA); + try (FileAppender writer = Parquet.write(Files.localOutput(dataFile)) + .writeSupport(new ParquetWriteSupport()) + .set("org.apache.spark.sql.parquet.row.attributes", sparkSchema.json()) + .set("spark.sql.parquet.writeLegacyFormat", "false") + .set("spark.sql.parquet.binaryAsString", "false") + .set("spark.sql.parquet.int96AsTimestamp", "false") + .set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS") + .schema(SCHEMA) + .build()) { + + writer.addAll(rows); + } + } +} diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java new file mode 100644 index 000000000000..dd395f519916 --- /dev/null +++ b/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.data.parquet; + +import java.io.File; +import java.io.IOException; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.data.RandomData; +import org.apache.iceberg.spark.data.SparkParquetWriters; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport; +import org.apache.spark.sql.types.StructType; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +/** + * A benchmark that evaluates the performance of writing nested Parquet data using + * Iceberg and Spark Parquet writers. + * + * To run this benchmark: + * + * ./gradlew :iceberg-spark:jmh + * -PjmhIncludeRegex=SparkParquetWritersNestedDataBenchmark + * -PjmhOutputPath=benchmark/spark-parquet-writers-nested-data-benchmark-result.txt + * + */ +@Fork(1) +@State(Scope.Benchmark) +@Warmup(iterations = 3) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.SingleShotTime) +public class SparkParquetWritersNestedDataBenchmark { + + private static final Schema SCHEMA = new Schema( + required(0, "id", Types.LongType.get()), + optional(4, "nested", Types.StructType.of( + required(1, "col1", Types.StringType.get()), + required(2, "col2", Types.DoubleType.get()), + required(3, "col3", Types.LongType.get()) + )) + ); + private static final int NUM_RECORDS = 1000000; + private Iterable rows; + private File dataFile; + + @Setup + public void setupBenchmark() throws IOException { + rows = RandomData.generateSpark(SCHEMA, NUM_RECORDS, 0L); + dataFile = File.createTempFile("parquet-nested-data-benchmark", ".parquet"); + } + + @TearDown + public void tearDownBenchmark() { + if (dataFile != null) { + dataFile.delete(); + } + } + + @Benchmark + @Threads(1) + public void writeUsingIcebergWriter() throws IOException { + try (FileAppender writer = Parquet.write(Files.localOutput(dataFile)) + .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(SCHEMA, msgType)) + .schema(SCHEMA) + .build()) { + + writer.addAll(rows); + } + } + + @Benchmark + @Threads(1) + public void writeUsingSparkWriter() throws IOException { + StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA); + try (FileAppender writer = Parquet.write(Files.localOutput(dataFile)) + .writeSupport(new ParquetWriteSupport()) + .set("org.apache.spark.sql.parquet.row.attributes", sparkSchema.json()) + .set("spark.sql.parquet.writeLegacyFormat", "false") + .set("spark.sql.parquet.binaryAsString", "false") + .set("spark.sql.parquet.int96AsTimestamp", "false") + .set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS") + .schema(SCHEMA) + .build()) { + + writer.addAll(rows); + } + } +} diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/Action.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/Action.java new file mode 100644 index 000000000000..1820a801b2fb --- /dev/null +++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/Action.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +@FunctionalInterface +public interface Action { + void invoke(); +} diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java new file mode 100644 index 000000000000..1648ee709c98 --- /dev/null +++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.Table; +import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.types.StructType; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import static org.apache.iceberg.TableProperties.WRITE_NEW_DATA_LOCATION; + +@Fork(1) +@State(Scope.Benchmark) +@Warmup(iterations = 3) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.SingleShotTime) +public abstract class IcebergSourceBenchmark { + + private final Configuration hadoopConf = initHadoopConf(); + private final Table table = initTable(); + private SparkSession spark; + + protected abstract Configuration initHadoopConf(); + + protected final Configuration hadoopConf() { + return hadoopConf; + } + + protected abstract Table initTable(); + + protected final Table table() { + return table; + } + + protected final SparkSession spark() { + return spark; + } + + protected String newTableLocation() { + String tmpDir = hadoopConf.get("hadoop.tmp.dir"); + Path tablePath = new Path(tmpDir, "spark-iceberg-table-" + UUID.randomUUID()); + return tablePath.toString(); + } + + protected String dataLocation() { + Map properties = table.properties(); + return properties.getOrDefault(WRITE_NEW_DATA_LOCATION, String.format("%s/data", table.location())); + } + + protected void cleanupFiles() throws IOException { + try (FileSystem fileSystem = FileSystem.get(hadoopConf)) { + Path dataPath = new Path(dataLocation()); + fileSystem.delete(dataPath, true); + Path tablePath = new Path(table.location()); + fileSystem.delete(tablePath, true); + } + } + + protected void setupSpark() { + spark = SparkSession.builder() + .config("spark.ui.enabled", false) + .master("local") + .getOrCreate(); + Configuration sparkHadoopConf = spark.sparkContext().hadoopConfiguration(); + hadoopConf.forEach(entry -> sparkHadoopConf.set(entry.getKey(), entry.getValue())); + } + + protected void tearDownSpark() { + spark.stop(); + } + + protected void materialize(Dataset ds) { + ds.queryExecution().toRdd().toJavaRDD().foreach(record -> { }); + } + + protected void appendAsFile(Dataset ds) { + // ensure the schema is precise (including nullability) + StructType sparkSchema = SparkSchemaUtil.convert(table.schema()); + spark.createDataFrame(ds.rdd(), sparkSchema) + .coalesce(1) + .write() + .format("iceberg") + .mode(SaveMode.Append) + .save(table.location()); + } + + protected void withSQLConf(Map conf, Action action) { + SQLConf sqlConf = SQLConf.get(); + + Map currentConfValues = new HashMap<>(); + conf.keySet().forEach(confKey -> { + if (sqlConf.contains(confKey)) { + String currentConfValue = sqlConf.getConfString(confKey); + currentConfValues.put(confKey, currentConfValue); + } + }); + + conf.forEach((confKey, confValue) -> { + if (SQLConf.staticConfKeys().contains(confKey)) { + throw new RuntimeException("Cannot modify the value of a static config: " + confKey); + } + sqlConf.setConfString(confKey, confValue); + }); + + try { + action.invoke(); + } finally { + conf.forEach((confKey, confValue) -> { + if (currentConfValues.containsKey(confKey)) { + sqlConf.setConfString(confKey, currentConfValues.get(confKey)); + } else { + sqlConf.unsetConf(confKey); + } + }); + } + } + + protected void withTableProperties(Map props, Action action) { + Map tableProps = table.properties(); + Map currentPropValues = new HashMap<>(); + props.keySet().forEach(propKey -> { + if (tableProps.containsKey(propKey)) { + String currentPropValue = tableProps.get(propKey); + currentPropValues.put(propKey, currentPropValue); + } + }); + + UpdateProperties updateProperties = table.updateProperties(); + props.forEach(updateProperties::set); + updateProperties.commit(); + + try { + action.invoke(); + } finally { + UpdateProperties restoreProperties = table.updateProperties(); + props.forEach((propKey, propValue) -> { + if (currentPropValues.containsKey(propKey)) { + restoreProperties.set(propKey, currentPropValues.get(propKey)); + } else { + restoreProperties.remove(propKey); + } + }); + restoreProperties.commit(); + } + } +} diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java new file mode 100644 index 000000000000..72346a72748c --- /dev/null +++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.ConfigProperties; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.types.Types; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +public abstract class IcebergSourceFlatDataBenchmark extends IcebergSourceBenchmark { + + @Override + protected Configuration initHadoopConf() { + Configuration conf = new Configuration(); + conf.set(ConfigProperties.COMPRESS_METADATA, "true"); + return conf; + } + + @Override + protected final Table initTable() { + Schema schema = new Schema( + required(1, "longCol", Types.LongType.get()), + required(2, "intCol", Types.IntegerType.get()), + required(3, "floatCol", Types.FloatType.get()), + optional(4, "doubleCol", Types.DoubleType.get()), + optional(5, "decimalCol", Types.DecimalType.of(20, 5)), + optional(6, "dateCol", Types.DateType.get()), + optional(7, "timestampCol", Types.TimestampType.withZone()), + optional(8, "stringCol", Types.StringType.get())); + PartitionSpec partitionSpec = PartitionSpec.unpartitioned(); + HadoopTables tables = new HadoopTables(hadoopConf()); + return tables.create(schema, partitionSpec, Maps.newHashMap(), newTableLocation()); + } +} diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceNestedDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceNestedDataBenchmark.java new file mode 100644 index 000000000000..bc542579f417 --- /dev/null +++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceNestedDataBenchmark.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.ConfigProperties; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.types.Types; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +public abstract class IcebergSourceNestedDataBenchmark extends IcebergSourceBenchmark { + + @Override + protected Configuration initHadoopConf() { + Configuration conf = new Configuration(); + conf.set(ConfigProperties.COMPRESS_METADATA, "true"); + return conf; + } + + @Override + protected final Table initTable() { + Schema schema = new Schema( + required(0, "id", Types.LongType.get()), + optional(4, "nested", Types.StructType.of( + required(1, "col1", Types.StringType.get()), + required(2, "col2", Types.DoubleType.get()), + required(3, "col3", Types.LongType.get()) + )) + ); + PartitionSpec partitionSpec = PartitionSpec.unpartitioned(); + HadoopTables tables = new HadoopTables(hadoopConf()); + return tables.create(schema, partitionSpec, Maps.newHashMap(), newTableLocation()); + } +} diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java new file mode 100644 index 000000000000..1c9164593eaa --- /dev/null +++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source.parquet; + +import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.Map; +import org.apache.iceberg.spark.source.IcebergSourceFlatDataBenchmark; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.internal.SQLConf; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; + +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.spark.sql.functions.current_date; +import static org.apache.spark.sql.functions.date_add; +import static org.apache.spark.sql.functions.expr; + +/** + * A benchmark that evaluates the file skipping capabilities in the Spark data source for Iceberg. + * + * This class uses a dataset with a flat schema, where the records are clustered according to the + * column used in the filter predicate. + * + * The performance is compared to the built-in file source in Spark. + * + * To run this benchmark: + * + * ./gradlew :iceberg-spark:jmh + * -PjmhIncludeRegex=IcebergSourceFlatParquetDataFilterBenchmark + * -PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-filter-benchmark-result.txt + * + */ +public class IcebergSourceFlatParquetDataFilterBenchmark extends IcebergSourceFlatDataBenchmark { + + private static final String FILTER_COND = "dateCol == date_add(current_date(), 1)"; + private static final int NUM_FILES = 500; + private static final int NUM_ROWS = 10000; + + @Setup + public void setupBenchmark() { + setupSpark(); + appendData(); + } + + @TearDown + public void tearDownBenchmark() throws IOException { + tearDownSpark(); + cleanupFiles(); + } + + @Benchmark + @Threads(1) + public void readWithFilterIceberg() { + Map tableProperties = Maps.newHashMap(); + tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); + withTableProperties(tableProperties, () -> { + String tableLocation = table().location(); + Dataset df = spark().read().format("iceberg").load(tableLocation).filter(FILTER_COND); + materialize(df); + }); + } + + @Benchmark + @Threads(1) + public void readWithFilterFileSourceVectorized() { + Map conf = Maps.newHashMap(); + conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "true"); + conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024)); + withSQLConf(conf, () -> { + Dataset df = spark().read().parquet(dataLocation()).filter(FILTER_COND); + materialize(df); + }); + } + + @Benchmark + @Threads(1) + public void readWithFilterFileSourceNonVectorized() { + Map conf = Maps.newHashMap(); + conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "false"); + conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024)); + withSQLConf(conf, () -> { + Dataset df = spark().read().parquet(dataLocation()).filter(FILTER_COND); + materialize(df); + }); + } + + private void appendData() { + for (int fileNum = 1; fileNum < NUM_FILES; fileNum++) { + Dataset df = spark().range(NUM_ROWS) + .withColumnRenamed("id", "longCol") + .withColumn("intCol", expr("CAST(longCol AS INT)")) + .withColumn("floatCol", expr("CAST(longCol AS FLOAT)")) + .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)")) + .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(20, 5))")) + .withColumn("dateCol", date_add(current_date(), fileNum)) + .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)")) + .withColumn("stringCol", expr("CAST(dateCol AS STRING)")); + appendAsFile(df); + } + } +} diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataReadBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataReadBenchmark.java new file mode 100644 index 000000000000..37a63369981c --- /dev/null +++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataReadBenchmark.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source.parquet; + +import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.Map; +import org.apache.iceberg.spark.source.IcebergSourceFlatDataBenchmark; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.internal.SQLConf; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; + +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.spark.sql.functions.current_date; +import static org.apache.spark.sql.functions.date_add; +import static org.apache.spark.sql.functions.expr; + +/** + * A benchmark that evaluates the performance of reading Parquet data with a flat schema + * using Iceberg and the built-in file source in Spark. + * + * To run this benchmark: + * + * ./gradlew :iceberg-spark:jmh + * -PjmhIncludeRegex=IcebergSourceFlatParquetDataReadBenchmark + * -PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-read-benchmark-result.txt + * + */ +public class IcebergSourceFlatParquetDataReadBenchmark extends IcebergSourceFlatDataBenchmark { + + private static final int NUM_FILES = 10; + private static final int NUM_ROWS = 1000000; + + @Setup + public void setupBenchmark() { + setupSpark(); + appendData(); + } + + @TearDown + public void tearDownBenchmark() throws IOException { + tearDownSpark(); + cleanupFiles(); + } + + @Benchmark + @Threads(1) + public void readIceberg() { + Map tableProperties = Maps.newHashMap(); + tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); + withTableProperties(tableProperties, () -> { + String tableLocation = table().location(); + Dataset df = spark().read().format("iceberg").load(tableLocation); + materialize(df); + }); + } + + @Benchmark + @Threads(1) + public void readFileSourceVectorized() { + Map conf = Maps.newHashMap(); + conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "true"); + conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024)); + withSQLConf(conf, () -> { + Dataset df = spark().read().parquet(dataLocation()); + materialize(df); + }); + } + + @Benchmark + @Threads(1) + public void readFileSourceNonVectorized() { + Map conf = Maps.newHashMap(); + conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "false"); + conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024)); + withSQLConf(conf, () -> { + Dataset df = spark().read().parquet(dataLocation()); + materialize(df); + }); + } + + @Benchmark + @Threads(1) + public void readWithProjectionIceberg() { + Map tableProperties = Maps.newHashMap(); + tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); + withTableProperties(tableProperties, () -> { + String tableLocation = table().location(); + Dataset df = spark().read().format("iceberg").load(tableLocation).select("longCol"); + materialize(df); + }); + } + + @Benchmark + @Threads(1) + public void readWithProjectionFileSourceVectorized() { + Map conf = Maps.newHashMap(); + conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "true"); + conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024)); + withSQLConf(conf, () -> { + Dataset df = spark().read().parquet(dataLocation()).select("longCol"); + materialize(df); + }); + } + + @Benchmark + @Threads(1) + public void readWithProjectionFileSourceNonVectorized() { + Map conf = Maps.newHashMap(); + conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "false"); + conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024)); + withSQLConf(conf, () -> { + Dataset df = spark().read().parquet(dataLocation()).select("longCol"); + materialize(df); + }); + } + + private void appendData() { + for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) { + Dataset df = spark().range(NUM_ROWS) + .withColumnRenamed("id", "longCol") + .withColumn("intCol", expr("CAST(longCol AS INT)")) + .withColumn("floatCol", expr("CAST(longCol AS FLOAT)")) + .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)")) + .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(20, 5))")) + .withColumn("dateCol", date_add(current_date(), fileNum)) + .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)")) + .withColumn("stringCol", expr("CAST(dateCol AS STRING)")); + appendAsFile(df); + } + } +} diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataWriteBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataWriteBenchmark.java new file mode 100644 index 000000000000..ab62f53e7008 --- /dev/null +++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataWriteBenchmark.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source.parquet; + +import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.Map; +import org.apache.iceberg.spark.source.IcebergSourceFlatDataBenchmark; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.internal.SQLConf; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; + +import static org.apache.spark.sql.functions.expr; + +/** + * A benchmark that evaluates the performance of writing Parquet data with a flat schema + * using Iceberg and the built-in file source in Spark. + * + * To run this benchmark: + * + * ./gradlew :iceberg-spark:jmh + * -PjmhIncludeRegex=IcebergSourceFlatParquetDataWriteBenchmark + * -PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-write-benchmark-result.txt + * + */ +public class IcebergSourceFlatParquetDataWriteBenchmark extends IcebergSourceFlatDataBenchmark { + + private static final int NUM_ROWS = 5000000; + + @Setup + public void setupBenchmark() { + setupSpark(); + } + + @TearDown + public void tearDownBenchmark() throws IOException { + tearDownSpark(); + cleanupFiles(); + } + + @Benchmark + @Threads(1) + public void writeIceberg() { + String tableLocation = table().location(); + benchmarkData().write().format("iceberg").mode(SaveMode.Append).save(tableLocation); + } + + @Benchmark + @Threads(1) + public void writeFileSource() { + Map conf = Maps.newHashMap(); + conf.put(SQLConf.PARQUET_COMPRESSION().key(), "gzip"); + withSQLConf(conf, () -> benchmarkData().write().mode(SaveMode.Append).parquet(dataLocation())); + } + + private Dataset benchmarkData() { + return spark().range(NUM_ROWS) + .withColumnRenamed("id", "longCol") + .withColumn("intCol", expr("CAST(longCol AS INT)")) + .withColumn("floatCol", expr("CAST(longCol AS FLOAT)")) + .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)")) + .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(20, 5))")) + .withColumn("dateCol", expr("DATE_ADD(CURRENT_DATE(), (longCol % 20))")) + .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)")) + .withColumn("stringCol", expr("CAST(dateCol AS STRING)")) + .coalesce(1); + } +} diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataFilterBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataFilterBenchmark.java new file mode 100644 index 000000000000..c12f3ee668a2 --- /dev/null +++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataFilterBenchmark.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source.parquet; + +import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.Map; +import org.apache.iceberg.spark.source.IcebergSourceNestedDataBenchmark; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.internal.SQLConf; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; + +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.spark.sql.functions.expr; +import static org.apache.spark.sql.functions.lit; +import static org.apache.spark.sql.functions.struct; + +/** + * A benchmark that evaluates the file skipping capabilities in the Spark data source for Iceberg. + * + * This class uses a dataset with nested data, where the records are clustered according to the + * column used in the filter predicate. + * + * The performance is compared to the built-in file source in Spark. + * + * To run this benchmark: + * + * ./gradlew :iceberg-spark:jmh + * -PjmhIncludeRegex=IcebergSourceNestedParquetDataFilterBenchmark + * -PjmhOutputPath=benchmark/iceberg-source-nested-parquet-data-filter-benchmark-result.txt + * + */ +public class IcebergSourceNestedParquetDataFilterBenchmark extends IcebergSourceNestedDataBenchmark { + + private static final String FILTER_COND = "nested.col3 == 0"; + private static final int NUM_FILES = 500; + private static final int NUM_ROWS = 10000; + + @Setup + public void setupBenchmark() { + setupSpark(); + appendData(); + } + + @TearDown + public void tearDownBenchmark() throws IOException { + tearDownSpark(); + cleanupFiles(); + } + + @Benchmark + @Threads(1) + public void readWithFilterIceberg() { + Map tableProperties = Maps.newHashMap(); + tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); + withTableProperties(tableProperties, () -> { + String tableLocation = table().location(); + Dataset df = spark().read().format("iceberg").load(tableLocation).filter(FILTER_COND); + materialize(df); + }); + } + + @Benchmark + @Threads(1) + public void readWithFilterFileSourceVectorized() { + Map conf = Maps.newHashMap(); + conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "true"); + conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024)); + withSQLConf(conf, () -> { + Dataset df = spark().read().parquet(dataLocation()).filter(FILTER_COND); + materialize(df); + }); + } + + @Benchmark + @Threads(1) + public void readWithFilterFileSourceNonVectorized() { + Map conf = Maps.newHashMap(); + conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "false"); + conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024)); + withSQLConf(conf, () -> { + Dataset df = spark().read().parquet(dataLocation()).filter(FILTER_COND); + materialize(df); + }); + } + + private void appendData() { + for (int fileNum = 1; fileNum <= NUM_FILES; fileNum++) { + Dataset df = spark().range(NUM_ROWS) + .withColumn( + "nested", + struct( + expr("CAST(id AS string) AS col1"), + expr("CAST(id AS double) AS col2"), + lit(fileNum).cast("long").as("col3") + )); + appendAsFile(df); + } + } +} diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataReadBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataReadBenchmark.java new file mode 100644 index 000000000000..acba2e5d330d --- /dev/null +++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataReadBenchmark.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source.parquet; + +import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.Map; +import org.apache.iceberg.spark.source.IcebergSourceNestedDataBenchmark; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.internal.SQLConf; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; + +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.spark.sql.functions.expr; +import static org.apache.spark.sql.functions.lit; +import static org.apache.spark.sql.functions.struct; + +/** + * A benchmark that evaluates the performance of reading nested Parquet data using Iceberg + * and the built-in file source in Spark. + * + * To run this benchmark: + * + * ./gradlew :iceberg-spark:jmh + * -PjmhIncludeRegex=IcebergSourceNestedParquetDataReadBenchmark + * -PjmhOutputPath=benchmark/iceberg-source-nested-parquet-data-read-benchmark-result.txt + * + */ +public class IcebergSourceNestedParquetDataReadBenchmark extends IcebergSourceNestedDataBenchmark { + + private static final int NUM_FILES = 10; + private static final int NUM_ROWS = 1000000; + + @Setup + public void setupBenchmark() { + setupSpark(); + appendData(); + } + + @TearDown + public void tearDownBenchmark() throws IOException { + tearDownSpark(); + cleanupFiles(); + } + + @Benchmark + @Threads(1) + public void readIceberg() { + Map tableProperties = Maps.newHashMap(); + tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); + withTableProperties(tableProperties, () -> { + String tableLocation = table().location(); + Dataset df = spark().read().format("iceberg").load(tableLocation); + materialize(df); + }); + } + + @Benchmark + @Threads(1) + public void readFileSourceVectorized() { + Map conf = Maps.newHashMap(); + conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "true"); + conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024)); + withSQLConf(conf, () -> { + Dataset df = spark().read().parquet(dataLocation()); + materialize(df); + }); + } + + @Benchmark + @Threads(1) + public void readFileSourceNonVectorized() { + Map conf = Maps.newHashMap(); + conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "false"); + conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024)); + withSQLConf(conf, () -> { + Dataset df = spark().read().parquet(dataLocation()); + materialize(df); + }); + } + + @Benchmark + @Threads(1) + public void readWithProjectionIceberg() { + Map tableProperties = Maps.newHashMap(); + tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); + withTableProperties(tableProperties, () -> { + String tableLocation = table().location(); + Dataset df = spark().read().format("iceberg").load(tableLocation).selectExpr("nested.col3"); + materialize(df); + }); + } + + @Benchmark + @Threads(1) + public void readWithProjectionFileSourceVectorized() { + Map conf = Maps.newHashMap(); + conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "true"); + conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024)); + conf.put(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED().key(), "true"); + withSQLConf(conf, () -> { + Dataset df = spark().read().parquet(dataLocation()).selectExpr("nested.col3"); + materialize(df); + }); + } + + @Benchmark + @Threads(1) + public void readWithProjectionFileSourceNonVectorized() { + Map conf = Maps.newHashMap(); + conf.put(SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key(), "false"); + conf.put(SQLConf.FILES_OPEN_COST_IN_BYTES().key(), Integer.toString(128 * 1024 * 1024)); + conf.put(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED().key(), "true"); + withSQLConf(conf, () -> { + Dataset df = spark().read().parquet(dataLocation()).selectExpr("nested.col3"); + materialize(df); + }); + } + + private void appendData() { + for (int fileNum = 0; fileNum < NUM_FILES; fileNum++) { + Dataset df = spark().range(NUM_ROWS) + .withColumn( + "nested", + struct( + expr("CAST(id AS string) AS col1"), + expr("CAST(id AS double) AS col2"), + lit(fileNum).cast("long").as("col3") + )); + appendAsFile(df); + } + } +} diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataWriteBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataWriteBenchmark.java new file mode 100644 index 000000000000..0bd98610da99 --- /dev/null +++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataWriteBenchmark.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source.parquet; + +import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.Map; +import org.apache.iceberg.spark.source.IcebergSourceNestedDataBenchmark; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.internal.SQLConf; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; + +import static org.apache.spark.sql.functions.expr; +import static org.apache.spark.sql.functions.struct; + +/** + * A benchmark that evaluates the performance of writing nested Parquet data using Iceberg + * and the built-in file source in Spark. + * + * To run this benchmark: + * + * ./gradlew :iceberg-spark:jmh + * -PjmhIncludeRegex=IcebergSourceNestedParquetDataWriteBenchmark + * -PjmhOutputPath=benchmark/iceberg-source-nested-parquet-data-write-benchmark-result.txt + * + */ +public class IcebergSourceNestedParquetDataWriteBenchmark extends IcebergSourceNestedDataBenchmark { + + private static final int NUM_ROWS = 5000000; + + @Setup + public void setupBenchmark() { + setupSpark(); + } + + @TearDown + public void tearDownBenchmark() throws IOException { + tearDownSpark(); + cleanupFiles(); + } + + @Benchmark + @Threads(1) + public void writeIceberg() { + String tableLocation = table().location(); + benchmarkData().write().format("iceberg").mode(SaveMode.Append).save(tableLocation); + } + + @Benchmark + @Threads(1) + public void writeFileSource() { + Map conf = Maps.newHashMap(); + conf.put(SQLConf.PARQUET_COMPRESSION().key(), "gzip"); + withSQLConf(conf, () -> benchmarkData().write().mode(SaveMode.Append).parquet(dataLocation())); + } + + private Dataset benchmarkData() { + return spark().range(NUM_ROWS) + .withColumn( + "nested", + struct( + expr("CAST(id AS string) AS col1"), + expr("CAST(id AS double) AS col2"), + expr("id AS col3") + )) + .coalesce(1); + } +}