From 83c9e69cf780c3e8324e30b15297eb86414195f3 Mon Sep 17 00:00:00 2001 From: vkorukanti Date: Thu, 29 Feb 2024 09:39:41 -0800 Subject: [PATCH 1/4] [Kernel][LogReplay] Make a single read request for all checkpoint files --- build.sbt | 2 + .../scala/io/delta/golden/GoldenTables.scala | 1 + .../internal/replay/ActionsIterator.java | 78 ++++-- .../delta/kernel/types/BasePrimitiveType.java | 2 +- .../defaults/client/DefaultTableClient.java | 2 +- .../BenchmarkParallelCheckpointReading.java | 230 ++++++++++++++++++ .../defaults/LogReplayMetricsSuite.scala | 122 ++++++++-- 7 files changed, 399 insertions(+), 38 deletions(-) create mode 100644 kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/BenchmarkParallelCheckpointReading.java diff --git a/build.sbt b/build.sbt index a9f01fa413b..1070bb63a88 100644 --- a/build.sbt +++ b/build.sbt @@ -283,6 +283,8 @@ lazy val kernelDefaults = (project in file("kernel/kernel-defaults")) "commons-io" % "commons-io" % "2.8.0" % "test", "com.novocode" % "junit-interface" % "0.11" % "test", "org.slf4j" % "slf4j-log4j12" % "1.7.36" % "test", + "org.openjdk.jmh" % "jmh-core" % "1.37" % "test", + "org.openjdk.jmh" % "jmh-generator-annprocess" % "1.37" % "test", "org.apache.spark" %% "spark-hive" % sparkVersion % "test" classifier "tests", "org.apache.spark" %% "spark-sql" % sparkVersion % "test" classifier "tests", diff --git a/connectors/golden-tables/src/test/scala/io/delta/golden/GoldenTables.scala b/connectors/golden-tables/src/test/scala/io/delta/golden/GoldenTables.scala index cd734de340d..0b082cb6a36 100644 --- a/connectors/golden-tables/src/test/scala/io/delta/golden/GoldenTables.scala +++ b/connectors/golden-tables/src/test/scala/io/delta/golden/GoldenTables.scala @@ -378,6 +378,7 @@ class GoldenTables extends QueryTest with SharedSparkSession { val commitInfoFile = CommitInfo( version = Some(0L), + inCommitTimestamp = None, timestamp = new Timestamp(1540415658000L), userId = Some("user_0"), userName = Some("username_0"), diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java index 3a7633e28a3..606b11a1459 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java @@ -18,10 +18,7 @@ import java.io.IOException; import java.io.UncheckedIOException; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.Optional; +import java.util.*; import io.delta.kernel.client.TableClient; import io.delta.kernel.data.ColumnarBatch; @@ -31,7 +28,9 @@ import io.delta.kernel.internal.util.FileNames; import io.delta.kernel.internal.util.Utils; +import static io.delta.kernel.internal.util.FileNames.checkpointVersion; import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator; +import static io.delta.kernel.internal.util.Utils.toCloseableIterator; /** * This class takes as input a list of delta files (.json, .checkpoint.parquet) and produces an @@ -45,17 +44,19 @@ class ActionsIterator implements CloseableIterator { private final TableClient tableClient; /** - * Iterator over the files. + * Linked list of iterator files (commit files and/or checkpoint files) + * {@link LinkedList} to allow removing the head of the list and also to peek at the head + * of the list. The {@link Iterator} doesn't provide a way to peek. *

- * Each file will be split (by 1, or more) to yield an iterator of FileDataReadResults. + * Each of these files return an iterator of {@link ColumnarBatch} containing the actions */ - private final Iterator filesIter; + private final LinkedList filesList; private final StructType readSchema; /** * The current (ColumnarBatch, isFromCheckpoint) tuple. Whenever this iterator - * is exhausted, we will try and fetch the next one from the `filesIter`. + * is exhausted, we will try and fetch the next one from the `filesList`. *

* If it is ever empty, that means there are no more batches to produce. */ @@ -68,7 +69,8 @@ class ActionsIterator implements CloseableIterator { List files, StructType readSchema) { this.tableClient = tableClient; - this.filesIter = files.iterator(); + this.filesList = new LinkedList<>(); + this.filesList.addAll(files); this.readSchema = readSchema; this.actionsIter = Optional.empty(); } @@ -115,7 +117,7 @@ public void close() throws IOException { /** * If the current `actionsIter` has no more elements, this function finds the next - * non-empty file in `filesIter` and uses it to set `actionsIter`. + * non-empty file in `filesList` and uses it to set `actionsIter`. */ private void tryEnsureNextActionsIterIsReady() { if (actionsIter.isPresent()) { @@ -132,7 +134,7 @@ private void tryEnsureNextActionsIterIsReady() { } // Search for the next non-empty file and use that iter - while (filesIter.hasNext()) { + while (!filesList.isEmpty()) { actionsIter = Optional.of(getNextActionsIter()); if (actionsIter.get().hasNext()) { @@ -149,23 +151,22 @@ private void tryEnsureNextActionsIterIsReady() { } /** - * Get the next file from `filesIter` (.json or .checkpoint.parquet), contextualize it + * Get the next file from `filesList` (.json or .checkpoint.parquet), contextualize it * (allow the connector to split it), and then read it + inject the `isFromCheckpoint` * information. *

- * Requires that `filesIter.hasNext` is true. + * Requires that `filesList.isEmpty` is false. */ private CloseableIterator getNextActionsIter() { - final FileStatus nextFile = filesIter.next(); - - // TODO: [#1965] It should be possible to read our JSON and parquet files - // many-at-once instead of one at a time. - + final FileStatus nextFile = filesList.pop(); try { if (nextFile.getPath().endsWith(".json")) { final long fileVersion = FileNames.deltaVersion(nextFile.getPath()); - // Read that file + // We can not read multiple JSON files in parallel (like the checkpoint files), + // because each one has a different version, and we need to associate the version + // with actions read from the JSON file for further optimizations later on. + final CloseableIterator dataIter = tableClient.getJsonHandler().readJsonFiles( singletonCloseableIterator(nextFile), @@ -174,12 +175,17 @@ private CloseableIterator getNextActionsIter() { return combine(dataIter, false /* isFromCheckpoint */, fileVersion); } else if (nextFile.getPath().endsWith(".parquet")) { - final long fileVersion = FileNames.checkpointVersion(nextFile.getPath()); + final long fileVersion = checkpointVersion(nextFile.getPath()); + + // Try to retrieve the remaining checkpoint files (if there are any) and issue + // read request for all in one go, so that the {@link ParquetHandler} can do + // optimizations like reading multiple files in parallel. + CloseableIterator checkpointFilesIter = + retrieveRemainingCheckpointFiles(nextFile, fileVersion); - // Read that file final CloseableIterator dataIter = tableClient.getParquetHandler().readParquetFiles( - singletonCloseableIterator(nextFile), + checkpointFilesIter, readSchema, Optional.empty()); @@ -218,4 +224,32 @@ public void close() throws IOException { } }; } + + /** + * Given a checkpoint file, retrieve all the files that are part of the same checkpoint. + *

+ * This is done by looking at the file name and finding all the files that have the same + * version number. + */ + private CloseableIterator retrieveRemainingCheckpointFiles( + FileStatus checkpointFile, + long version) { + + // Filter out all the files that are not part of the same checkpoint + final List checkpointFiles = new ArrayList<>(); + + // Add the already retrieved checkpoint file to the list. + checkpointFiles.add(checkpointFile); + + while (filesList.peek() != null && + filesList.peek().getPath().endsWith(".parquet") && + checkpointVersion(filesList.peek().getPath()) == version) { + checkpointFiles.add(filesList.pop()); + } + + // Remove the files from the list + filesList.removeAll(checkpointFiles); + + return toCloseableIterator(checkpointFiles.iterator()); + } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/types/BasePrimitiveType.java b/kernel/kernel-api/src/main/java/io/delta/kernel/types/BasePrimitiveType.java index b5c4b4527d4..0b83f7aaf47 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/types/BasePrimitiveType.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/types/BasePrimitiveType.java @@ -27,7 +27,7 @@ public abstract class BasePrimitiveType extends DataType { * Create a primitive type {@link DataType} * * @param primitiveTypeName Primitive type name. - * @return + * @return {@link DataType} for given primitive type name */ public static DataType createPrimitive(String primitiveTypeName) { return Optional.ofNullable(nameToPrimitiveTypeMap.get().get(primitiveTypeName)) diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/client/DefaultTableClient.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/client/DefaultTableClient.java index 78ab128dbdd..972fd06ef99 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/client/DefaultTableClient.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/client/DefaultTableClient.java @@ -54,7 +54,7 @@ public ParquetHandler getParquetHandler() { * Create an instance of {@link DefaultTableClient}. * * @param hadoopConf Hadoop configuration to use. - * @return + * @return an instance of {@link DefaultTableClient}. */ public static DefaultTableClient create(Configuration hadoopConf) { return new DefaultTableClient(hadoopConf); diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/BenchmarkParallelCheckpointReading.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/BenchmarkParallelCheckpointReading.java new file mode 100644 index 00000000000..56e06547819 --- /dev/null +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/BenchmarkParallelCheckpointReading.java @@ -0,0 +1,230 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.defaults.benchmarks; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.*; +import static java.util.concurrent.Executors.newFixedThreadPool; + +import org.apache.hadoop.conf.Configuration; +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; + +import io.delta.kernel.*; +import io.delta.kernel.client.ParquetHandler; +import io.delta.kernel.client.TableClient; +import io.delta.kernel.data.*; +import io.delta.kernel.expressions.Predicate; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.FileStatus; + +import io.delta.kernel.internal.util.Utils; + +import io.delta.kernel.defaults.client.DefaultParquetHandler; +import io.delta.kernel.defaults.client.DefaultTableClient; + +import io.delta.kernel.defaults.internal.parquet.ParquetFileReader; + +/** + * Benchmark to measure the performance of reading multi-part checkpoint files, using a custom + * ParquetHandler that reads the files in parallel. To run this benchmark (from delta repo root): + * 1) Generate the test table by following the instructions at `testTablePath` member variable. + * $ build/sbt + * sbt:delta> project kernelDefaults + * sbt:delta> set fork in run := true + * sbt:delta> test:runMain io.delta.kernel.defaults.benchmarks.BenchmarkParallelCheckpointReading + */ +@State(Scope.Benchmark) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Warmup(iterations = 3) +@Fork(1) +public class BenchmarkParallelCheckpointReading { + + /** + * Following are the steps to generate a simple large table with multi-part checkpoint files + *

+ * bin/spark-shell --packages io.delta:delta-spark_2.12:3.1.0 \ --conf + * "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \ --conf + * "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \ --conf + * "spark.databricks.delta.checkpoint.partSize=100000" + *

+ * # Within the Spark shell, run the following commands + *

+ * spark.range(0, 100000) .withColumn("pCol", 'id % 100000) .repartition(10) + * .write.format("delta") .partitionBy("pCol") .mode("append") + * .save("~/test-tables/large-table") + *

+ * # Repeat the above steps for each of the next ranges # 100000 to 200000, 200000 to 300000 etc + * until enough log entries are reached. + *

+ * # Then create a checkpoint import org.apache.spark.sql.delta.DeltaLog + *

+ * # This step create a multi-part checkpoint with each checkpoint containing 100K records. + * DeltaLog.forTable(spark, "~/test-tables/large-table").checkpoint() + */ + private static final String testTablePath = " fill the path here"; + + @State(Scope.Benchmark) + public static class BenchmarkData { + // Variations of number of threads to read the multi-part checkpoint files + // When thread count is 0, we read using the current default parquet handler implementation + // In all other cases we use the custom parallel parquet handler implementation defined + // in this benchmark + @Param({"0", "1", "2", "4", "10", "20"}) + private int parallelReaderCount = 0; + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + public void benchmark(BenchmarkData benchmarkData, Blackhole blackhole) throws Exception { + TableClient tableClient = createTableClient(benchmarkData.parallelReaderCount); + Table table = Table.forPath( + createTableClient(benchmarkData.parallelReaderCount), + testTablePath); + + Snapshot snapshot = table.getLatestSnapshot(tableClient); + ScanBuilder scanBuilder = snapshot.getScanBuilder(tableClient); + Scan scan = scanBuilder.build(); + + Row row = scan.getScanState(tableClient); + long fileSize = 0; + try (CloseableIterator batchIter = scan.getScanFiles(tableClient)) { + while (batchIter.hasNext()) { + FilteredColumnarBatch batch = batchIter.next(); + try (CloseableIterator rowIter = batch.getRows()) { + while (rowIter.hasNext()) { + Row r = rowIter.next(); + long size = r.getStruct(0).getLong(2); + fileSize += size; + } + } + } + } + + // Consume the result to avoid dead code elimination by the JIT compiler + blackhole.consume(fileSize); + } + + public static void main(String[] args) throws Exception { + org.openjdk.jmh.Main.main(args); + } + + private static TableClient createTableClient(int numberOfParallelThreads) { + Configuration hadoopConf = new Configuration(); + if (numberOfParallelThreads <= 0) { + return DefaultTableClient.create(hadoopConf); + } + + return new DefaultTableClient(hadoopConf) { + @Override + public ParquetHandler getParquetHandler() { + return new ParallelParquetHandler(hadoopConf, numberOfParallelThreads); + } + }; + } + + /** + * Custom implementation of {@link ParquetHandler} to read the Parquet files in parallel. Reason + * for this not being in the {@link DefaultParquetHandler} is that this implementation keeps the + * contents of the Parquet files in memory, which is not suitable for default implementation + * without a proper design that allows limits on the memory usage. If the parallel reading of + * checkpoint becomes a common in connectors, we can look at adding the functionality in the + * default implementation. + */ + static class ParallelParquetHandler extends DefaultParquetHandler { + private final Configuration hadoopConf; + private final int numberOfParallelThreads; + + ParallelParquetHandler(Configuration hadoopConf, int numberOfParallelThreads) { + super(hadoopConf); + this.hadoopConf = hadoopConf; + this.numberOfParallelThreads = numberOfParallelThreads; + } + + @Override + public CloseableIterator readParquetFiles( + CloseableIterator fileIter, + StructType physicalSchema, + Optional predicate) throws IOException { + return new CloseableIterator() { + // Executor service will be closed as part of the returned `CloseableIterator`'s + // close method. + private final ExecutorService executorService = + newFixedThreadPool(numberOfParallelThreads); + private Iterator>> futuresIter; + private Iterator currentBatchIter; + + @Override + public void close() throws IOException { + Utils.closeCloseables(fileIter, () -> executorService.shutdown()); + } + + @Override + public boolean hasNext() { + submitReadRequestsIfNotDone(); + if (currentBatchIter != null && currentBatchIter.hasNext()) { + return true; + } + + if (futuresIter.hasNext()) { + try { + currentBatchIter = futuresIter.next().get().iterator(); + return hasNext(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + return false; + } + + @Override + public ColumnarBatch next() { + return currentBatchIter.next(); + } + + private void submitReadRequestsIfNotDone() { + if (futuresIter != null) { + return; + } + List>> futures = new ArrayList<>(); + while (fileIter.hasNext()) { + String path = fileIter.next().getPath(); + futures.add( + executorService.submit( + () -> parquetFileReader(path, physicalSchema))); + } + futuresIter = futures.iterator(); + } + }; + } + + List parquetFileReader(String filePath, StructType readSchema) { + ParquetFileReader reader = new ParquetFileReader(hadoopConf); + try (CloseableIterator batchIter = reader.read(filePath, readSchema)) { + List batches = new ArrayList<>(); + while (batchIter.hasNext()) { + batches.add(batchIter.next()); + } + return batches; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayMetricsSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayMetricsSuite.scala index 3c4ee7d04c8..51193d3ac90 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayMetricsSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayMetricsSuite.scala @@ -24,15 +24,20 @@ import io.delta.kernel.defaults.client.{DefaultJsonHandler, DefaultParquetHandle import io.delta.kernel.expressions.Predicate import io.delta.kernel.internal.fs.Path import io.delta.kernel.internal.util.FileNames +import io.delta.kernel.internal.util.Utils.toCloseableIterator import io.delta.kernel.types.StructType import io.delta.kernel.utils.{CloseableIterator, FileStatus} import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.delta.DeltaLog +import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.functions.col import org.apache.spark.sql.test.SharedSparkSession import java.util.Optional +import scala.collection.convert.ImplicitConversions._ +import scala.collection.mutable.ArrayBuffer class LogReplayMetricsSuite extends QueryTest with SharedSparkSession @@ -53,14 +58,46 @@ class LogReplayMetricsSuite extends QueryTest withTempDir { dir => f(dir, tableClient) } } - private def loadSnapshotAssertMetrics( + private def loadPandMCheckMetrics( tableClient: MetricsTableClient, table: Table, expJsonVersionsRead: Seq[Long], - expParquetVersionsRead: Seq[Long]): Unit = { + expParquetVersionsRead: Seq[Long], + expParquetReadSetSizes: Seq[Long] = Nil): Unit = { tableClient.resetMetrics() table.getLatestSnapshot(tableClient).getSchema(tableClient) + assertMetrics( + tableClient, + expJsonVersionsRead, + expParquetVersionsRead, + expParquetReadSetSizes) + } + + private def loadScanFilesCheckMetrics( + tableClient: MetricsTableClient, + table: Table, + expJsonVersionsRead: Seq[Long], + expParquetVersionsRead: Seq[Long], + expParquetReadSetSizes: Seq[Long]): Unit = { + tableClient.resetMetrics() + val scan = table.getLatestSnapshot(tableClient).getScanBuilder(tableClient).build() + // get all scan files and iterate through them to trigger the metrics collection + val scanFiles = scan.getScanFiles(tableClient) + while (scanFiles.hasNext) scanFiles.next() + + assertMetrics( + tableClient, + expJsonVersionsRead, + expParquetVersionsRead, + expParquetReadSetSizes) + } + + def assertMetrics( + tableClient: MetricsTableClient, + expJsonVersionsRead: Seq[Long], + expParquetVersionsRead: Seq[Long], + expParquetReadSetSizes: Seq[Long]): Unit = { val actualJsonVersionsRead = tableClient.getJsonHandler.getVersionsRead val actualParquetVersionsRead = tableClient.getParquetHandler.getVersionsRead @@ -72,11 +109,25 @@ class LogReplayMetricsSuite extends QueryTest actualParquetVersionsRead === expParquetVersionsRead, s"Expected to read parquet " + s"versions $expParquetVersionsRead but read $actualParquetVersionsRead" ) + + if (expParquetReadSetSizes.nonEmpty) { + val actualParquetReadSetSizes = tableClient.getParquetHandler.checkpointReadRequestSizes + assert( + actualParquetReadSetSizes === expParquetReadSetSizes, s"Expected parquet read set sizes " + + s"$expParquetReadSetSizes but read $actualParquetReadSetSizes" + ) + } } private def appendCommit(path: String): Unit = spark.range(10).write.format("delta").mode("append").save(path) + private def checkpoint(path: String, actionsPerFile: Int): Unit = { + withSQLConf(DeltaSQLConf.DELTA_CHECKPOINT_PART_SIZE.key -> actionsPerFile.toString) { + DeltaLog.forTable(spark, path).checkpoint() + } + } + /////////// // Tests // /////////// @@ -88,7 +139,7 @@ class LogReplayMetricsSuite extends QueryTest for (_ <- 0 to 9) { appendCommit(path) } val table = Table.forPath(tc, path) - loadSnapshotAssertMetrics(tc, table, 9L to 0L by -1L, Nil) + loadPandMCheckMetrics(tc, table, 9L to 0L by -1L, Nil) } } @@ -99,7 +150,7 @@ class LogReplayMetricsSuite extends QueryTest for (_ <- 0 to 14) { appendCommit(path) } val table = Table.forPath(tc, path) - loadSnapshotAssertMetrics(tc, table, 14L to 11L by -1L, Seq(10)) + loadPandMCheckMetrics(tc, table, 14L to 11L by -1L, Seq(10), Seq(1)) } } @@ -121,7 +172,7 @@ class LogReplayMetricsSuite extends QueryTest for (_ <- 14 to 16) { appendCommit(path) } val table = Table.forPath(tc, path) - loadSnapshotAssertMetrics(tc, table, 16L to 13L by -1L, Nil) + loadPandMCheckMetrics(tc, table, 16L to 13L by -1L, Nil) } } @@ -139,7 +190,7 @@ class LogReplayMetricsSuite extends QueryTest // A hint is now saved at v14 - loadSnapshotAssertMetrics(tc, table, Nil, Nil) + loadPandMCheckMetrics(tc, table, Nil, Nil) } } @@ -157,20 +208,20 @@ class LogReplayMetricsSuite extends QueryTest // Case: only one version change appendCommit(path) // v15 - loadSnapshotAssertMetrics(tc, table, Seq(15), Nil) + loadPandMCheckMetrics(tc, table, Seq(15), Nil) // A hint is now saved at v15 // Case: several version changes for (_ <- 16 to 19) { appendCommit(path) } - loadSnapshotAssertMetrics(tc, table, 19L to 16L by -1L, Nil) + loadPandMCheckMetrics(tc, table, 19L to 16L by -1L, Nil) // A hint is now saved at v19 // Case: [delta-io/delta#2262] [Fix me!] Read the entire checkpoint at v20, even if v20.json // and v19 hint are available appendCommit(path) // v20 - loadSnapshotAssertMetrics(tc, table, Nil, Seq(20)) + loadPandMCheckMetrics(tc, table, Nil, Seq(20)) } } @@ -195,7 +246,7 @@ class LogReplayMetricsSuite extends QueryTest .mode("append") .save(path) - loadSnapshotAssertMetrics(tc, table, Seq(4), Nil) + loadPandMCheckMetrics(tc, table, Seq(4), Nil) // a hint is now saved at v4 @@ -208,7 +259,39 @@ class LogReplayMetricsSuite extends QueryTest |) |""".stripMargin) - loadSnapshotAssertMetrics(tc, table, Seq(5), Nil) + loadPandMCheckMetrics(tc, table, Seq(5), Nil) + } + } + + test("read a table with multi-part checkpoint") { + withTempDirAndTableClient { (dir, tc) => + val path = dir.getAbsolutePath + + for (_ <- 0 to 14) { appendCommit(path) } + + // there should be one checkpoint file at version 10 + loadScanFilesCheckMetrics( + tc, + Table.forPath(tc, path), + expJsonVersionsRead = 14L to 11L by -1L, + expParquetVersionsRead = Seq(10), + // we read the checkpoint twice: once for the P &M and once for the scan files + expParquetReadSetSizes = Seq(1, 1)) + + // create a multi-part checkpoint + checkpoint(path, actionsPerFile = 2) + + // Reset metrics. + tc.resetMetrics() + + // expect the Parquet read set to contain one request with size of 15 + loadScanFilesCheckMetrics( + tc, + Table.forPath(tc, path), + expJsonVersionsRead = Nil, + expParquetVersionsRead = Seq(14), + // we read the checkpoint twice: once for the P &M and once for the scan files + expParquetReadSetSizes = Seq(15, 15)) } } } @@ -242,7 +325,10 @@ class MetricsTableClient(config: Configuration) extends TableClient { * 10.checkpoint.parquet) read */ trait FileReadMetrics { self: Object => - private val versionsRead = scala.collection.mutable.ArrayBuffer[Long]() + private val versionsRead = ArrayBuffer[Long]() + + // Number of checkpoint files requested read in each readParquetFiles call + val checkpointReadRequestSizes = new ArrayBuffer[Long]() private def updateVersionsRead(fileStatus: FileStatus): Unit = { val path = new Path(fileStatus.getPath) @@ -258,7 +344,10 @@ trait FileReadMetrics { self: Object => def getVersionsRead: Seq[Long] = versionsRead - def resetMetrics(): Unit = versionsRead.clear() + def resetMetrics(): Unit = { + versionsRead.clear() + checkpointReadRequestSizes.clear() + } def collectReadFiles(fileIter: CloseableIterator[FileStatus]): CloseableIterator[FileStatus] = { fileIter.map(file => { @@ -290,6 +379,11 @@ class MetricsParquetHandler(config: Configuration) fileIter: CloseableIterator[FileStatus], physicalSchema: StructType, predicate: Optional[Predicate]): CloseableIterator[ColumnarBatch] = { - super.readParquetFiles(collectReadFiles(fileIter), physicalSchema, predicate) + val fileReadSet = fileIter.toSeq + checkpointReadRequestSizes += fileReadSet.size + super.readParquetFiles( + collectReadFiles(toCloseableIterator(fileReadSet.iterator)), + physicalSchema, + predicate) } } From e586ef332c1f5b254d75219f520cefc1c1e1081c Mon Sep 17 00:00:00 2001 From: vkorukanti Date: Mon, 11 Mar 2024 15:06:00 -0700 Subject: [PATCH 2/4] review --- .../internal/replay/ActionsIterator.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java index 606b11a1459..792c64ffe76 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java @@ -151,9 +151,8 @@ private void tryEnsureNextActionsIterIsReady() { } /** - * Get the next file from `filesList` (.json or .checkpoint.parquet), contextualize it - * (allow the connector to split it), and then read it + inject the `isFromCheckpoint` - * information. + * Get the next file from `filesList` (.json or .checkpoint.parquet) + * read it + inject the `isFromCheckpoint` information. *

* Requires that `filesList.isEmpty` is false. */ @@ -165,7 +164,9 @@ private CloseableIterator getNextActionsIter() { // We can not read multiple JSON files in parallel (like the checkpoint files), // because each one has a different version, and we need to associate the version - // with actions read from the JSON file for further optimizations later on. + // with actions read from the JSON file for further optimizations later on (faster + // metadata & protocol loading in subsequent runs by remembering the version of + // the last version where the metadata and protocol are found). final CloseableIterator dataIter = tableClient.getJsonHandler().readJsonFiles( @@ -241,15 +242,14 @@ private CloseableIterator retrieveRemainingCheckpointFiles( // Add the already retrieved checkpoint file to the list. checkpointFiles.add(checkpointFile); - while (filesList.peek() != null && - filesList.peek().getPath().endsWith(".parquet") && - checkpointVersion(filesList.peek().getPath()) == version) { + FileStatus peek = filesList.peek(); + while (peek != null && + peek.getPath().endsWith(".parquet") && + checkpointVersion(peek.getPath()) == version) { checkpointFiles.add(filesList.pop()); + peek = filesList.peek(); } - // Remove the files from the list - filesList.removeAll(checkpointFiles); - return toCloseableIterator(checkpointFiles.iterator()); } } From e7c82aa2cb8f108a05e345aa24888269c678563c Mon Sep 17 00:00:00 2001 From: vkorukanti Date: Mon, 11 Mar 2024 16:18:56 -0700 Subject: [PATCH 3/4] update/fix comments --- .../BenchmarkParallelCheckpointReading.java | 61 ++++++++++++------- 1 file changed, 40 insertions(+), 21 deletions(-) diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/BenchmarkParallelCheckpointReading.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/BenchmarkParallelCheckpointReading.java index 56e06547819..d5c2be85231 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/BenchmarkParallelCheckpointReading.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/BenchmarkParallelCheckpointReading.java @@ -44,11 +44,30 @@ /** * Benchmark to measure the performance of reading multi-part checkpoint files, using a custom * ParquetHandler that reads the files in parallel. To run this benchmark (from delta repo root): - * 1) Generate the test table by following the instructions at `testTablePath` member variable. - * $ build/sbt - * sbt:delta> project kernelDefaults - * sbt:delta> set fork in run := true - * sbt:delta> test:runMain io.delta.kernel.defaults.benchmarks.BenchmarkParallelCheckpointReading + *

+ *

+ * Sample benchmarks on a table with checkpoint (13) parts containing total of 1.3mil actions on + * Macbook Pro M2 Max with table stored locally. + *

{@code
+ * Benchmark  (parallelReaderCount)  Mode  Cnt Score Error  Units
+ * benchmark                      0  avgt    5  1565.520 ±  20.551  ms/op
+ * benchmark                      1  avgt    5  1064.850 ±  19.699  ms/op
+ * benchmark                      2  avgt    5   785.918 ± 176.285  ms/op
+ * benchmark                      4  avgt    5   729.487 ±  51.470  ms/op
+ * benchmark                     10  avgt    5   693.757 ±  41.252  ms/op
+ * benchmark                     20  avgt    5   702.656 ±  19.145  ms/op
+ * }
*/ @State(Scope.Benchmark) @OutputTimeUnit(TimeUnit.MILLISECONDS) @@ -58,27 +77,27 @@ public class BenchmarkParallelCheckpointReading { /** * Following are the steps to generate a simple large table with multi-part checkpoint files - *

- * bin/spark-shell --packages io.delta:delta-spark_2.12:3.1.0 \ --conf - * "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \ --conf - * "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \ --conf - * "spark.databricks.delta.checkpoint.partSize=100000" - *

+ *

{@code
+     * bin/spark-shell --packages io.delta:delta-spark_2.12:3.1.0 \
+     *   --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
+     *   --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
+     *   --conf "spark.databricks.delta.checkpoint.partSize=100000"
+     *
      * # Within the Spark shell, run the following commands
-     * 

- * spark.range(0, 100000) .withColumn("pCol", 'id % 100000) .repartition(10) - * .write.format("delta") .partitionBy("pCol") .mode("append") - * .save("~/test-tables/large-table") - *

+ * scala> spark.range(0, 100000) .withColumn("pCol", 'id % 100000) .repartition(10) + * .write.format("delta") .partitionBy("pCol") .mode("append") + * .save("~/test-tables/large-table") + * * # Repeat the above steps for each of the next ranges # 100000 to 200000, 200000 to 300000 etc * until enough log entries are reached. - *

- * # Then create a checkpoint import org.apache.spark.sql.delta.DeltaLog - *

+ * + * # Then create a checkpoint * # This step create a multi-part checkpoint with each checkpoint containing 100K records. - * DeltaLog.forTable(spark, "~/test-tables/large-table").checkpoint() + * scala> import org.apache.spark.sql.delta.DeltaLog + * scala> DeltaLog.forTable(spark, "~/test-tables/large-table").checkpoint() + * }

*/ - private static final String testTablePath = " fill the path here"; + public static final String testTablePath = " fill the path here"; @State(Scope.Benchmark) public static class BenchmarkData { From 93682e5820deba43a1fb314e230717e40228a35d Mon Sep 17 00:00:00 2001 From: vkorukanti Date: Mon, 11 Mar 2024 16:53:57 -0700 Subject: [PATCH 4/4] review --- .../kernel/internal/replay/ActionsIterator.java | 17 ++++++++++------- .../BenchmarkParallelCheckpointReading.java | 6 +++--- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java index 792c64ffe76..9340cbc3a2a 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java @@ -26,9 +26,11 @@ import io.delta.kernel.utils.CloseableIterator; import io.delta.kernel.utils.FileStatus; +import io.delta.kernel.internal.fs.Path; import io.delta.kernel.internal.util.FileNames; import io.delta.kernel.internal.util.Utils; -import static io.delta.kernel.internal.util.FileNames.checkpointVersion; +import static io.delta.kernel.internal.fs.Path.getName; +import static io.delta.kernel.internal.util.FileNames.*; import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator; import static io.delta.kernel.internal.util.Utils.toCloseableIterator; @@ -158,9 +160,10 @@ private void tryEnsureNextActionsIterIsReady() { */ private CloseableIterator getNextActionsIter() { final FileStatus nextFile = filesList.pop(); + final Path nextFilePath = new Path(nextFile.getPath()); try { - if (nextFile.getPath().endsWith(".json")) { - final long fileVersion = FileNames.deltaVersion(nextFile.getPath()); + if (isCommitFile(nextFilePath.getName())) { + final long fileVersion = FileNames.deltaVersion(nextFilePath); // We can not read multiple JSON files in parallel (like the checkpoint files), // because each one has a different version, and we need to associate the version @@ -175,8 +178,8 @@ private CloseableIterator getNextActionsIter() { Optional.empty()); return combine(dataIter, false /* isFromCheckpoint */, fileVersion); - } else if (nextFile.getPath().endsWith(".parquet")) { - final long fileVersion = checkpointVersion(nextFile.getPath()); + } else if (isCheckpointFile(nextFilePath.getName())) { + final long fileVersion = checkpointVersion(nextFilePath); // Try to retrieve the remaining checkpoint files (if there are any) and issue // read request for all in one go, so that the {@link ParquetHandler} can do @@ -236,7 +239,7 @@ private CloseableIterator retrieveRemainingCheckpointFiles( FileStatus checkpointFile, long version) { - // Filter out all the files that are not part of the same checkpoint + // Find the contiguous parquet files that are part of the same checkpoint final List checkpointFiles = new ArrayList<>(); // Add the already retrieved checkpoint file to the list. @@ -244,7 +247,7 @@ private CloseableIterator retrieveRemainingCheckpointFiles( FileStatus peek = filesList.peek(); while (peek != null && - peek.getPath().endsWith(".parquet") && + isCheckpointFile(getName(peek.getPath())) && checkpointVersion(peek.getPath()) == version) { checkpointFiles.add(filesList.pop()); peek = filesList.peek(); diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/BenchmarkParallelCheckpointReading.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/BenchmarkParallelCheckpointReading.java index d5c2be85231..688cad94cfa 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/BenchmarkParallelCheckpointReading.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/BenchmarkParallelCheckpointReading.java @@ -113,15 +113,15 @@ public static class BenchmarkData { @BenchmarkMode(Mode.AverageTime) public void benchmark(BenchmarkData benchmarkData, Blackhole blackhole) throws Exception { TableClient tableClient = createTableClient(benchmarkData.parallelReaderCount); - Table table = Table.forPath( - createTableClient(benchmarkData.parallelReaderCount), - testTablePath); + Table table = Table.forPath(tableClient, testTablePath); Snapshot snapshot = table.getLatestSnapshot(tableClient); ScanBuilder scanBuilder = snapshot.getScanBuilder(tableClient); Scan scan = scanBuilder.build(); + // Scan state is not used, but get it so that we simulate the real use case. Row row = scan.getScanState(tableClient); + blackhole.consume(row); // To avoid dead code elimination by the JIT compiler long fileSize = 0; try (CloseableIterator batchIter = scan.getScanFiles(tableClient)) { while (batchIter.hasNext()) {