From ee631d2d98f72d99da00d8922fc4cf6a66cf063c Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Tue, 31 May 2016 11:27:41 -0700 Subject: [PATCH 1/2] Drop cache on appends and overwrites --- .../spark/sql/execution/CacheManager.scala | 24 +++++++++++++++++++ .../InsertIntoHadoopFsRelationCommand.scala | 4 ++++ .../parquet/ParquetQuerySuite.scala | 22 +++++++++++++++++ 3 files changed, 50 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index c8bdb0d22c9f8..063e21265fe78 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -19,10 +19,13 @@ package org.apache.spark.sql.execution import java.util.concurrent.locks.ReentrantReadWriteLock +import org.apache.hadoop.fs.{FileSystem, Path} + import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.Dataset +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK @@ -157,4 +160,25 @@ private[sql] class CacheManager extends Logging { case _ => } } + + /** + * Invalidates the cache of any data that contains `qualifiedPath` in one or more + * `HadoopFsRelation` node(s) as part of its logical plan. + */ + private[sql] def invalidateCachedPath(fs: FileSystem, qualifiedPath: Path): Unit = writeLock { + cachedData.foreach { + case data if data.plan.find { + case lr: LogicalRelation => lr.relation match { + case hr: HadoopFsRelation => + hr.location.paths + .map(_.makeQualified(fs.getUri, fs.getWorkingDirectory)) + .contains(qualifiedPath) + } + }.isDefined => + val dataIndex = cachedData.indexWhere(cd => data.plan.sameResult(cd.plan)) + data.cachedRepresentation.uncache(blocking = false) + cachedData.remove(dataIndex) + case _ => + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 1426dcf4697ff..928b73e337365 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -91,8 +91,12 @@ private[sql] case class InsertIntoHadoopFsRelationCommand( throw new IOException(s"Unable to clear output " + s"directory $qualifiedOutputPath prior to writing to it") } + // Invalidate all caches with this in path + sparkSession.sharedState.cacheManager.invalidateCachedPath(fs, qualifiedOutputPath) true case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) => + // Invalidate all caches with this in path + sparkSession.sharedState.cacheManager.invalidateCachedPath(fs, qualifiedOutputPath) true case (SaveMode.Ignore, exists) => !exists diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 0a2fb0ef50661..2100950096ddf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -67,6 +67,28 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext TableIdentifier("tmp"), ignoreIfNotExists = true) } + test("drop cache on overwrite") { + withTempDir { dir => + val path = dir.toString + spark.range(1000).write.mode("overwrite").parquet(path) + val df = sqlContext.read.parquet(path).cache() + assert(df.count() == 1000) + sqlContext.range(10).write.mode("overwrite").parquet(path) + assert(sqlContext.read.parquet(path).count() == 10) + } + } + + test("drop cache on append") { + withTempDir { dir => + val path = dir.toString + spark.range(1000).write.mode("append").parquet(path) + val df = sqlContext.read.parquet(path).cache() + assert(df.count() == 1000) + sqlContext.range(10).write.mode("append").parquet(path) + assert(sqlContext.read.parquet(path).count() == 1010) + } + } + test("self-join") { // 4 rows, cells of column 1 of row 2 and row 4 are null val data = (1 to 4).map { i => From a21013ab27a7400c850aebe66dfcd0e7e6c2ea3c Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Tue, 31 May 2016 18:24:26 -0700 Subject: [PATCH 2/2] more elegant solution --- .../spark/sql/execution/CacheManager.scala | 24 ------------------- .../InsertIntoHadoopFsRelationCommand.scala | 4 ---- .../datasources/ListingFileCatalog.scala | 3 ++- .../parquet/ParquetQuerySuite.scala | 18 +++++++------- 4 files changed, 12 insertions(+), 37 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 063e21265fe78..c8bdb0d22c9f8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -19,13 +19,10 @@ package org.apache.spark.sql.execution import java.util.concurrent.locks.ReentrantReadWriteLock -import org.apache.hadoop.fs.{FileSystem, Path} - import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.Dataset -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK @@ -160,25 +157,4 @@ private[sql] class CacheManager extends Logging { case _ => } } - - /** - * Invalidates the cache of any data that contains `qualifiedPath` in one or more - * `HadoopFsRelation` node(s) as part of its logical plan. - */ - private[sql] def invalidateCachedPath(fs: FileSystem, qualifiedPath: Path): Unit = writeLock { - cachedData.foreach { - case data if data.plan.find { - case lr: LogicalRelation => lr.relation match { - case hr: HadoopFsRelation => - hr.location.paths - .map(_.makeQualified(fs.getUri, fs.getWorkingDirectory)) - .contains(qualifiedPath) - } - }.isDefined => - val dataIndex = cachedData.indexWhere(cd => data.plan.sameResult(cd.plan)) - data.cachedRepresentation.uncache(blocking = false) - cachedData.remove(dataIndex) - case _ => - } - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 928b73e337365..1426dcf4697ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -91,12 +91,8 @@ private[sql] case class InsertIntoHadoopFsRelationCommand( throw new IOException(s"Unable to clear output " + s"directory $qualifiedOutputPath prior to writing to it") } - // Invalidate all caches with this in path - sparkSession.sharedState.cacheManager.invalidateCachedPath(fs, qualifiedOutputPath) true case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) => - // Invalidate all caches with this in path - sparkSession.sharedState.cacheManager.invalidateCachedPath(fs, qualifiedOutputPath) true case (SaveMode.Ignore, exists) => !exists diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala index 644e5d65d612c..8fece96139453 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala @@ -121,7 +121,8 @@ class ListingFileCatalog( } override def equals(other: Any): Boolean = other match { - case hdfs: ListingFileCatalog => paths.toSet == hdfs.paths.toSet + case hdfs: ListingFileCatalog => + cachedLeafDirToChildrenFiles == hdfs.cachedLeafDirToChildrenFiles case _ => false } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 2100950096ddf..b06be59649248 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -67,25 +67,27 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext TableIdentifier("tmp"), ignoreIfNotExists = true) } - test("drop cache on overwrite") { + test("SPARK-15678: not use cache on overwrite") { withTempDir { dir => val path = dir.toString spark.range(1000).write.mode("overwrite").parquet(path) - val df = sqlContext.read.parquet(path).cache() + val df = spark.read.parquet(path).cache() assert(df.count() == 1000) - sqlContext.range(10).write.mode("overwrite").parquet(path) - assert(sqlContext.read.parquet(path).count() == 10) + spark.range(10).write.mode("overwrite").parquet(path) + assert(df.count() == 1000) + assert(spark.read.parquet(path).count() == 10) } } - test("drop cache on append") { + test("SPARK-15678: not use cache on append") { withTempDir { dir => val path = dir.toString spark.range(1000).write.mode("append").parquet(path) - val df = sqlContext.read.parquet(path).cache() + val df = spark.read.parquet(path).cache() + assert(df.count() == 1000) + spark.range(10).write.mode("append").parquet(path) assert(df.count() == 1000) - sqlContext.range(10).write.mode("append").parquet(path) - assert(sqlContext.read.parquet(path).count() == 1010) + assert(spark.read.parquet(path).count() == 1010) } }