Skip to content

Conversation

@maropu
Copy link
Member

@maropu maropu commented Nov 5, 2015

Refactored #5178 and added unit tests.

@SparkQA
Copy link

SparkQA commented Nov 5, 2015

Test build #45071 has finished for PR 9478 at commit 849eaa7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class GetLocalDirsPath(blockManagerId: BlockManagerId) extends ToBlockManagerMaster\n

@SparkQA
Copy link

SparkQA commented Nov 5, 2015

Test build #45106 has finished for PR 9478 at commit 5c00a5a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class GetLocalDirsPath(blockManagerId: BlockManagerId) extends ToBlockManagerMaster\n

@maropu maropu force-pushed the LocalReadBlockManager branch from 5c00a5a to 8d0a2f0 Compare November 5, 2015 15:33
@SparkQA
Copy link

SparkQA commented Nov 5, 2015

Test build #45117 has finished for PR 9478 at commit 8d0a2f0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class GetLocalDirsPath(blockManagerId: BlockManagerId) extends ToBlockManagerMaster\n

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: each arg on its own line, with return value on same line as last arg

@maropu
Copy link
Member Author

maropu commented Nov 5, 2015

retest this please

@SparkQA
Copy link

SparkQA commented Nov 6, 2015

Test build #45163 has finished for PR 9478 at commit 8d0a2f0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class GetLocalDirsPath(blockManagerId: BlockManagerId) extends ToBlockManagerMaster\n

@SparkQA
Copy link

SparkQA commented Nov 6, 2015

Test build #45166 has finished for PR 9478 at commit 211f0b9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class GetLocalDirsPath(blockManagerId: BlockManagerId) extends ToBlockManagerMaster\n

@SparkQA
Copy link

SparkQA commented Nov 6, 2015

Test build #45170 has finished for PR 9478 at commit cebc895.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class GetLocalDirsPath(blockManagerId: BlockManagerId) extends ToBlockManagerMaster\n

@maropu
Copy link
Member Author

maropu commented Nov 6, 2015

@andrewor14 Could you review this and give some suggestions?
btw, spark-dev said feature freeze done at v1.6, so is it better to change the target at v1.7 for this work?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't belong here. BlockDataManager should know nothing about shuffles.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed getShuffleBlockData into getBlockData. Is this fix correct for your comments?

@andrewor14
Copy link
Contributor

@maropu Thanks for taking this over. I took a pass and I believe the high level approach is correct. However, it seems that the abstractions can be simplified a little. Please also fix the style issues that I pointed out.

@andrewor14
Copy link
Contributor

By the way, have you done some benchmarking to measure how much this actually saves? I wonder if the gains are actually all that significant. Maybe it actually matters if we're fetching many small blocks, but even then I'm not 100% sure.

@maropu
Copy link
Member Author

maropu commented Nov 11, 2015

@andrewor14 Thx for your reviews.
I'll fix it and also do benchmarks .

@maropu maropu force-pushed the LocalReadBlockManager branch from cebc895 to 6b8f7bf Compare November 11, 2015 03:59
@SparkQA
Copy link

SparkQA commented Nov 11, 2015

Test build #45591 has finished for PR 9478 at commit 6b8f7bf.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class GetLocalDirsPath(blockManagerId: BlockManagerId) extends ToBlockManagerMaster\n * public class JavaLBFGSExample\n * class LDA @Since(\"1.6.0\") (\n * case class Metadata(\n * require(className == expectedClassName, s\"Error loading metadata: Expected class name\" +\n * class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Seq[T], val offset: Int)\n * class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int, val step: Int)\n

@andrewor14
Copy link
Contributor

@maropu any updates? Did you have a chance to do the benchmarks?

@maropu
Copy link
Member Author

maropu commented Nov 26, 2015

@andrewor14 I tried quick benchmarks though, I saw little difference, so I'm taking various tests on it.
I'll get the result in a day.

@maropu maropu force-pushed the LocalReadBlockManager branch from 6b8f7bf to 1653691 Compare November 26, 2015 03:25
@SparkQA
Copy link

SparkQA commented Nov 26, 2015

Test build #46739 has finished for PR 9478 at commit 1653691.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 26, 2015

Test build #46741 has finished for PR 9478 at commit 4ca2d72.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class GetLocalDirsPath(blockManagerId: BlockManagerId) extends ToBlockManagerMaster\n * public final class OneWayMessage implements RequestMessage\n * case class Cast(child: Expression, dataType: DataType) extends UnaryExpression\n * class SparkPlanInfo(\n * class SQLMetricInfo(\n * case class SparkListenerSQLExecutionStart(\n * case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)\n

@maropu maropu force-pushed the LocalReadBlockManager branch from 4ca2d72 to ba94687 Compare November 27, 2015 02:05
@SparkQA
Copy link

SparkQA commented Nov 27, 2015

Test build #46794 has finished for PR 9478 at commit ba94687.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class GetLocalDirsPath(blockManagerId: BlockManagerId) extends ToBlockManagerMaster\n

@maropu maropu force-pushed the LocalReadBlockManager branch from 0325d35 to 303abcd Compare November 27, 2015 06:56
@maropu
Copy link
Member Author

maropu commented Nov 27, 2015

@andrewor14 ISTM that this pr has a little effect on performance even in case of many partitions involved in shuffle.

Test settings:

  • 10 test runs except for the first run
  • 4 workers in single c4.8xlarge (each worker has 12g mem)
  • spark.driver.memory=1g
  • spark.executor.memory=12g
  • spark.shuffle.bypassNetworkAccess=true
  • spark.sql.shuffle.partitions=1000
  • total shuffle size = around 1.5g
  • query executed:
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

val schema = StructType(
  Seq(
    StructField(s"col0", IntegerType, true),
    StructField(s"col1", StringType, true)
  )
)

val rdd =
  sc.parallelize((1 to 36), 36).flatMap { j => (1 to 5000000).map ( i =>
    Row(i, s"${i}"))
  }

sqlContext.createDataFrame(rdd, schema).registerTempTable("shuffle")

sqlContext.sql("cache table shuffle")

def timer[R](block: => R): R = {
  val t0 = System.nanoTime()
  val result = block
  val t1 = System.nanoTime()
  println("Elapsed time: " + ((t1 - t0 + 0.0) / 1000000000.0)+ "s")
  result
}

timer { sql(s"""select col0, col1 from shuffle cluster by col0""").queryExecution.executedPlan(1).execute().foreach(_ => {})
  • w/o bypassing
    Elapsed time: 9.344261745s,10.039675768s,11.644405718s,9.600332115s,9.745275262s,11.469732737s,11.495929827s,9.036307879s,9.159725732s,9.142030108s
  • w/ bypassing
    Elapsed time: 8.529972448s
    9.302905215s,9.933504313s,7.681615877s,9.16150229s,9.655643012s,8.948393461s,8.048391038s,8.780075475s,9.826335759s

I think that this patch might avoid issues caused by netty (i.e., gc pressures and network troubles) in a corner case though,
there is no big performance gain in this quick benchmarks.

@SparkQA
Copy link

SparkQA commented Nov 27, 2015

Test build #46806 has finished for PR 9478 at commit 303abcd.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):\n * case class GetLocalDirsPath(blockManagerId: BlockManagerId) extends ToBlockManagerMaster\n

@andrewor14
Copy link
Contributor

@maropu thanks for running the benchmarks. It seems that the gains are not really significant enough to warrant all the complexity this patch adds. This is to a certain extent as expected since network is generally pretty fast, and Spark tends to bottleneck on CPU rather than I/O or network.

@andrewor14
Copy link
Contributor

Can you close this PR?

@maropu
Copy link
Member Author

maropu commented Dec 1, 2015

@andrewor14 Okay and thanks. Also, can you close SPARK-6521? I left comments there.

@maropu maropu closed this Dec 1, 2015
@maropu maropu deleted the LocalReadBlockManager branch July 5, 2017 11:48
@maropu maropu restored the LocalReadBlockManager branch July 5, 2017 11:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants