Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Subqu
import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan, ResolvedHint}
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.command.CommandUtils
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileTable}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK

Expand Down Expand Up @@ -253,15 +254,30 @@ class CacheManager extends Logging {
plan match {
case lr: LogicalRelation => lr.relation match {
case hr: HadoopFsRelation =>
val prefixToInvalidate = qualifiedPath.toString
val invalidate = hr.location.rootPaths
.map(_.makeQualified(fs.getUri, fs.getWorkingDirectory).toString)
.exists(_.startsWith(prefixToInvalidate))
if (invalidate) hr.location.refresh()
invalidate
refreshFileIndexIfNecessary(hr.location, fs, qualifiedPath)
case _ => false
}

case DataSourceV2Relation(fileTable: FileTable, _, _) =>
refreshFileIndexIfNecessary(fileTable.fileIndex, fs, qualifiedPath)

case _ => false
}
}

/**
* Refresh the given [[FileIndex]] if any of its root paths starts with `qualifiedPath`.
* @return whether the [[FileIndex]] is refreshed.
*/
private def refreshFileIndexIfNecessary(
fileIndex: FileIndex,
fs: FileSystem,
qualifiedPath: Path): Boolean = {
val prefixToInvalidate = qualifiedPath.toString
val needToRefresh = fileIndex.rootPaths
.map(_.makeQualified(fs.getUri, fs.getWorkingDirectory).toString)
.exists(_.startsWith(prefixToInvalidate))
if (needToRefresh) fileIndex.refresh()
needToRefresh
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,38 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo
}
}

test("Do not use cache on overwrite") {
Seq("", "orc").foreach { useV1SourceReaderList =>
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> useV1SourceReaderList) {
withTempDir { dir =>
val path = dir.toString
spark.range(1000).write.mode("overwrite").orc(path)
Copy link
Member

@dongjoon-hyun dongjoon-hyun Apr 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gengliangwang . In this suite, we need to test all available data sources like the following instead of using .orc.

Seq("csv", "orc", "text").foreach { format =>

Copy link
Member

@dongjoon-hyun dongjoon-hyun Apr 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you generalize this test case?
Also, please add JIRA issue for Parquet DSv2 Migration as a TODO comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun The cache invalidation is for all file sources. Testing ORC here is quite sufficient, just like only Parquet is tested in #13566.

Copy link
Member

@dongjoon-hyun dongjoon-hyun Apr 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that is the logic, let's hold on this until Parquet is ready to migrate. We don't need to move around the test logic from here to there.

We are going to migrate Parquet anyway, aren't we?

Copy link
Member

@dongjoon-hyun dongjoon-hyun Apr 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, this test suite is designed to verify for all data sources from the beginning.

val df = spark.read.orc(path).cache()
assert(df.count() == 1000)
spark.range(10).write.mode("overwrite").orc(path)
assert(df.count() == 10)
assert(spark.read.orc(path).count() == 10)
}
}
}
}

test("Do not use cache on append") {
Seq("", "orc").foreach { useV1SourceReaderList =>
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> useV1SourceReaderList) {
withTempDir { dir =>
val path = dir.toString
spark.range(1000).write.mode("append").orc(path)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

val df = spark.read.orc(path).cache()
assert(df.count() == 1000)
spark.range(10).write.mode("append").orc(path)
assert(df.count() == 1010)
assert(spark.read.orc(path).count() == 1010)
}
}
}
}

test("Return correct results when data columns overlap with partition columns") {
Seq("parquet", "orc", "json").foreach { format =>
withTempPath { path =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,30 +70,6 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
TableIdentifier("tmp"), ignoreIfNotExists = true, purge = false)
}

test("SPARK-15678: not use cache on overwrite") {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete the original test case for following reasons:

  1. The cache invalidation is for all file sources
  2. The two test cases in ParquetQuerySuite is covered by the new test cases of this PR.
  3. The Parquet data source is not migrated to V2 yet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

withTempDir { dir =>
val path = dir.toString
spark.range(1000).write.mode("overwrite").parquet(path)
val df = spark.read.parquet(path).cache()
assert(df.count() == 1000)
spark.range(10).write.mode("overwrite").parquet(path)
assert(df.count() == 10)
assert(spark.read.parquet(path).count() == 10)
}
}

test("SPARK-15678: not use cache on append") {
withTempDir { dir =>
val path = dir.toString
spark.range(1000).write.mode("append").parquet(path)
val df = spark.read.parquet(path).cache()
assert(df.count() == 1000)
spark.range(10).write.mode("append").parquet(path)
assert(df.count() == 1010)
assert(spark.read.parquet(path).count() == 1010)
}
}

test("self-join") {
// 4 rows, cells of column 1 of row 2 and row 4 are null
val data = (1 to 4).map { i =>
Expand Down