Skip to content

Conversation

@bbossy
Copy link
Contributor

@bbossy bbossy commented Jun 11, 2017

What changes were proposed in this pull request?

This PR changes InMemoryFileIndex.listLeafFiles behaviour to launch at most one spark job to list leaf files.

##JIRA
https://issues.apache.org/jira/browse/SPARK-21056

Given partitioned file relation (e.g. parquet):
root/a=../b=../c=..
InMemoryFileIndex.listLeafFiles runs numberOfPartitions(a) times numberOfPartitions(b) spark jobs sequentially to list leaf files, if both numberOfPartitions(a) and numberOfPartitions(b) are below spark.sql.sources.parallelPartitionDiscovery.threshold and numberOfPartitions(c) is above spark.sql.sources.parallelPartitionDiscovery.threshold

Since the jobs are run sequentially, the overhead of the jobs dominates and the file listing operation can become significantly slower than listing the files from the driver.
I propose that InMemoryFileIndex.listLeafFiles should launch at most one spark job for listing leaf files.

How was this patch tested?

Adapted existing tests to match expected behaviour.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be safe to use the same instance of fs for all the paths in a InMemoryFileIndex? If this is the case, I can move this back to where it was before.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's the same instance and should be reused

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Fixed.

@gatorsmile
Copy link
Member

Could you please update the PR description by copying the contents from the JIRA?

Any performance number you can share?

@bbossy
Copy link
Contributor Author

bbossy commented Jun 12, 2017

@gatorsmile :

I ran a synthetic scenario to show what changes, since deploying this branch would be more involved.

I created two very simple relations on a small HDFS cluster (4 datanodes). Running spark with master local[16]. The machine has 16 physical cores plus 16 hyper threaded. The namenode is on a remote machine in the same network.

Setup:

scala> case class Foo(a: Int, b: Int, c: Int, d: Int)
defined class Foo

// manypartitions: 4 times 4 times 100 partitions. Parallel listing kicks in listing level 'c' both without and with this PR
scala> val data = for {
     | a <- 1.to(4)
     | b <- 1.to(4)
     | c <- 1.to(100)
     | } yield Foo(a,b,c,100)
data: scala.collection.immutable.IndexedSeq[Foo] = Vector(Foo(1,1,1,100), ...

scala> data.toDS.write.partitionBy("a", "b", "c").parquet("hdfs://namenode/user/bbossy/manypartitions")

// morepartitions: 10 times 10 times 100 partitions. Before this PR, 100 parallel listing jobs are spawned to list c, since each b contains more directories than the threshold. 
// With this PR, one parallel listing job is spawned to list all b partitions, since at level b, there are more paths to list than the threshold.
scala> val data = for {
     | a <- 1.to(10)
     | b <- 1.to(10)
     | c <- 1.to(100)
     | } yield Foo(a,b,c,1000)
data: scala.collection.immutable.IndexedSeq[Foo] = Vector(Foo(1,1,1,1000), ...
scala> data.toDS.write.partitionBy("a", "b", "c").parquet("hdfs://namenode/user/bbossy/morepartitions")

Using master branch before my commits:

scala> :pa
// Entering paste mode (ctrl-D to finish)

def time[R](block: => R): R = {
  val t0 = System.currentTimeMillis()
  val result = block
  println("Elapsed time: " + (System.currentTimeMillis - t0) + "ms")
  result
}

// Exiting paste mode, now interpreting.

time: [R](block: => R)R

scala> time(spark.read.parquet("hdfs://namenode/user/bbossy/manypartitions").collect)
Elapsed time: 6506ms
res2: Array[org.apache.spark.sql.Row] = Array([100,3,2,4], ...
scala> time(spark.read.parquet("hdfs://namenode/user/bbossy/manypartitions").collect)
Elapsed time: 2905ms
res3: Array[org.apache.spark.sql.Row] = Array([100,3,2,4], ...
scala> time(spark.read.parquet("hdfs://namenode/user/bbossy/manypartitions").collect)
Elapsed time: 2744ms
res4: Array[org.apache.spark.sql.Row] = Array([100,3,2,4], ...
scala> time(spark.read.parquet("hdfs://namenode/user/bbossy/manypartitions").collect)
Elapsed time: 2683ms
res5: Array[org.apache.spark.sql.Row] = Array([100,3,2,4], ...


scala> time(spark.read.parquet("hdfs://namenode/user/bbossy/morepartitions").collect)
Elapsed time: 16068ms
res6: Array[org.apache.spark.sql.Row] = Array([1000,6,8,31], ...
scala> time(spark.read.parquet("hdfs://namenode/user/bbossy/morepartitions").collect)
Elapsed time: 16047ms
res7: Array[org.apache.spark.sql.Row] = Array([1000,6,8,31], ...
scala> time(spark.read.parquet("hdfs://namenode/user/bbossy/morepartitions").collect)
Elapsed time: 15691ms
res8: Array[org.apache.spark.sql.Row] = Array([1000,6,8,31], ...
scala> time(spark.read.parquet("hdfs://namenode/user/bbossy/morepartitions").collect)
Elapsed time: 15767ms
res9: Array[org.apache.spark.sql.Row] = Array([1000,6,8,31], ...
scala>

UI timeline:
screen shot 2017-06-12 at 13 51 48

Using this PR:

// omitting def time...

scala> time(spark.read.parquet("hdfs://namenode/user/bbossy/manypartitions").collect)
Elapsed time: 6790ms
res0: Array[org.apache.spark.sql.Row] = Array([100,3,2,4], ...
scala> time(spark.read.parquet("hdfs://namenode/user/bbossy/manypartitions").collect)
Elapsed time: 4481ms
res1: Array[org.apache.spark.sql.Row] = Array([100,3,2,4], ...
scala> time(spark.read.parquet("hdfs://namenode/user/bbossy/manypartitions").collect)
Elapsed time: 4465ms
res2: Array[org.apache.spark.sql.Row] = Array([100,3,2,4], ...
scala> time(spark.read.parquet("hdfs://namenode/user/bbossy/manypartitions").collect)
Elapsed time: 4103ms
res3: Array[org.apache.spark.sql.Row] = Array([100,3,2,4], ...


scala> time(spark.read.parquet("hdfs://namenode/user/bbossy/morepartitions").collect)
Elapsed time: 4717ms
res4: Array[org.apache.spark.sql.Row] = Array([1000,6,8,31], ...
scala> time(spark.read.parquet("hdfs://namenode/user/bbossy/morepartitions").collect)
Elapsed time: 4434ms
res5: Array[org.apache.spark.sql.Row] = Array([1000,6,8,31], ...
scala> time(spark.read.parquet("hdfs://namenode/user/bbossy/morepartitions").collect)
Elapsed time: 5219ms
res6: Array[org.apache.spark.sql.Row] = Array([1000,6,8,31], ...
scala> time(spark.read.parquet("hdfs://namenode/user/bbossy/morepartitions").collect)
Elapsed time: 4429ms
res7: Array[org.apache.spark.sql.Row] = Array([1000,6,8,31], ...
scala>

UI timeline:
screen shot 2017-06-12 at 13 57 15

Is there something more specific that I should look into?

@bbossy
Copy link
Contributor Author

bbossy commented Jun 13, 2017

ping @gatorsmile @srowen and possibly @cloud-fan : Would like to hear your thoughts on this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about

if (paths.isEmpty) {
  Nil
} else {
  val fs = paths.head.getFileSystem(hadoopConf)
  ......
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we merge these flatMaps?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like

paths.flatMap { path =>
  try {
    val status = fs.get.listStatus(path)
    val filteredStatuses = statuses.filterNot(status => shouldFilterOut(status.getPath.getName))
    ....
  } catch ...
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then we can still keep the previous code structure

Copy link
Member

@kiszk kiszk Jun 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: style issue. this if-then-else should be moved to left with 2 spaces

@mallman
Copy link
Contributor

mallman commented Jun 16, 2017

@bbossy I've built and deployed a branch of Spark 2.2 with your patch and compared its behavior to the same branch of Spark 2.2 without your patch. I'm seeing different behavior, but not what I expected.

My test table has three partition columns, ds, h and chunk. There are 1 ds values, 2 h values, and 51 chunk values, split into 27 and 24 partitions under the two h directories. I set spark.sql.sources.parallelPartitionDiscovery.threshold to 10. I believe this fits the scenario you're trying to remedy.

I use spark.read.parquet to load the table. When I load the table using the unpatched branch, Spark launches three jobs with 27, 24 and 1 stages, in that order. When I load the table using the patched branch, Spark launches three jobs with 51, 1 and 51 stages, in that order. Does this match your expectations? I was expecting to see Spark launch two jobs.

@bbossy
Copy link
Contributor Author

bbossy commented Jun 18, 2017

@mallman I'm not sure where this difference in behaviour is coming from. The following test in FileIndexSuite passes:

 test("mallman's scenario") {
    withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "10") {
      withTempDir { dir =>
        for (chunk <- 1.to(27)) {
          val f = new File(dir, s"ds=1/h=one/chunk=$chunk/file.parquet")
          f.getParentFile.mkdirs()
          f.createNewFile()
        }
        for (chunk <- 1.to(24)) {
          val f = new File(dir, s"ds=1/h=two/chunk=$chunk/file.parquet")
          f.getParentFile.mkdirs()
          f.createNewFile()
        }
        HiveCatalogMetrics.reset()
        assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
        new InMemoryFileIndex(spark, Seq(new Path(dir.getCanonicalPath)), Map.empty, None)
        assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 1)
      }
    }
  }

Does it match your scenario?

I'll dig around a bit later to see if I can come up with an explanation.

@bbossy
Copy link
Contributor Author

bbossy commented Jun 18, 2017

@cloud-fan Could you take another look, please?

try {
val fStatuses = fs.listStatus(path)
val filtered = fStatuses.filterNot(status => shouldFilterOut(status.getPath.getName))
if (filtered.nonEmpty) {
Copy link
Contributor

@cloud-fan cloud-fan Jun 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: filtered.map(path -> _), so that we don't need the if-else here, and the flatMap there

fStatuses.map { f => path -> f }
}.partition { case (_, fStatus) => fStatus.isDirectory }
val pathsToList = dirs.map { case (_, fStatus) => fStatus.getPath }
val nestedFiles = if (pathsToList.nonEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this if check?

@cloud-fan
Copy link
Contributor

let's wait @mallman 's response to make sure this patch does fix the problem

@mallman
Copy link
Contributor

mallman commented Jun 21, 2017

Hi @bbossy

Does it match your scenario?

It does not match my scenario. I'm reading files from HDFS. In your test, you're reading files from the local filesystem. Can you try a test using files stored in HDFS?

Also, I'm not testing with InMemoryFileIndex directly. Rather, I'm using spark.read.parquet. Can you try a test with that scenario?

@HyukjinKwon
Copy link
Member

gentle ping @bbossy, I just want to be sure if it is in progress in any way.

@bbossy
Copy link
Contributor Author

bbossy commented Jul 24, 2017

@HyukjinKwon I'll see that I can address the outstanding review comments in the next day or two.

@asfgit asfgit closed this in 3a45c7f Aug 5, 2017
zifeif2 pushed a commit to zifeif2/spark that referenced this pull request Nov 22, 2025
## What changes were proposed in this pull request?

This PR proposes to close stale PRs, mostly the same instances with apache#18017

Closes apache#14085 - [SPARK-16408][SQL] SparkSQL Added file get Exception: is a directory …
Closes apache#14239 - [SPARK-16593] [CORE] [WIP] Provide a pre-fetch mechanism to accelerate shuffle stage.
Closes apache#14567 - [SPARK-16992][PYSPARK] Python Pep8 formatting and import reorganisation
Closes apache#14579 - [SPARK-16921][PYSPARK] RDD/DataFrame persist()/cache() should return Python context managers
Closes apache#14601 - [SPARK-13979][Core] Killed executor is re spawned without AWS key…
Closes apache#14830 - [SPARK-16992][PYSPARK][DOCS] import sort and autopep8 on Pyspark examples
Closes apache#14963 - [SPARK-16992][PYSPARK] Virtualenv for Pylint and pep8 in lint-python
Closes apache#15227 - [SPARK-17655][SQL]Remove unused variables declarations and definations in a WholeStageCodeGened stage
Closes apache#15240 - [SPARK-17556] [CORE] [SQL] Executor side broadcast for broadcast joins
Closes apache#15405 - [SPARK-15917][CORE] Added support for number of executors in Standalone [WIP]
Closes apache#16099 - [SPARK-18665][SQL] set statement state to "ERROR" after user cancel job
Closes apache#16445 - [SPARK-19043][SQL]Make SparkSQLSessionManager more configurable
Closes apache#16618 - [SPARK-14409][ML][WIP] Add RankingEvaluator
Closes apache#16766 - [SPARK-19426][SQL] Custom coalesce for Dataset
Closes apache#16832 - [SPARK-19490][SQL] ignore case sensitivity when filtering hive partition columns
Closes apache#17052 - [SPARK-19690][SS] Join a streaming DataFrame with a batch DataFrame which has an aggregation may not work
Closes apache#17267 - [SPARK-19926][PYSPARK] Make pyspark exception more user-friendly
Closes apache#17371 - [SPARK-19903][PYSPARK][SS] window operator miss the `watermark` metadata of time column
Closes apache#17401 - [SPARK-18364][YARN] Expose metrics for YarnShuffleService
Closes apache#17519 - [SPARK-15352][Doc] follow-up: add configuration docs for topology-aware block replication
Closes apache#17530 - [SPARK-5158] Access kerberized HDFS from Spark standalone
Closes apache#17854 - [SPARK-20564][Deploy] Reduce massive executor failures when executor count is large (>2000)
Closes apache#17979 - [SPARK-19320][MESOS][WIP]allow specifying a hard limit on number of gpus required in each spark executor when running on mesos
Closes apache#18127 - [SPARK-6628][SQL][Branch-2.1] Fix ClassCastException when executing sql statement 'insert into' on hbase table
Closes apache#18236 - [SPARK-21015] Check field name is not null and empty in GenericRowWit…
Closes apache#18269 - [SPARK-21056][SQL] Use at most one spark job to list files in InMemoryFileIndex
Closes apache#18328 - [SPARK-21121][SQL] Support changing storage level via the spark.sql.inMemoryColumnarStorage.level variable
Closes apache#18354 - [SPARK-18016][SQL][CATALYST][BRANCH-2.1] Code Generation: Constant Pool Limit - Class Splitting
Closes apache#18383 - [SPARK-21167][SS] Set kafka clientId while fetch messages
Closes apache#18414 - [SPARK-21169] [core] Make sure to update application status to RUNNING if executors are accepted and RUNNING after recovery
Closes apache#18432 - resolve com.esotericsoftware.kryo.KryoException
Closes apache#18490 - [SPARK-21269][Core][WIP] Fix FetchFailedException when enable maxReqSizeShuffleToMem and KryoSerializer
Closes apache#18585 - SPARK-21359
Closes apache#18609 - Spark SQL merge small files to big files Update InsertIntoHiveTable.scala

Added:
Closes apache#18308 - [SPARK-21099][Spark Core] INFO Log Message Using Incorrect Executor I…
Closes apache#18599 - [SPARK-21372] spark writes one log file even I set the number of spark_rotate_log to 0
Closes apache#18619 - [SPARK-21397][BUILD]Maven shade plugin adding dependency-reduced-pom.xml to …
Closes apache#18667 - Fix the simpleString used in error messages
Closes apache#18782 - Branch 2.1

Added:
Closes apache#17694 - [SPARK-12717][PYSPARK] Resolving race condition with pyspark broadcasts when using multiple threads

Added:
Closes apache#16456 - [SPARK-18994] clean up the local directories for application in future by annother thread
Closes apache#18683 - [SPARK-21474][CORE] Make number of parallel fetches from a reducer configurable
Closes apache#18690 - [SPARK-21334][CORE] Add metrics reporting service to External Shuffle Server

Added:
Closes apache#18827 - Merge pull request 1 from apache/master

## How was this patch tested?

N/A

Author: hyukjinkwon <[email protected]>

Closes apache#18780 from HyukjinKwon/close-prs.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants