Skip to content

Conversation

@davies
Copy link
Contributor

@davies davies commented Aug 16, 2014

Using external sort to support sort large datasets in reduce stage.

@JoshRosen
Copy link
Contributor

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Aug 16, 2014

QA tests have started for PR 1978 at commit 55602ee.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 16, 2014

QA tests have finished for PR 1978 at commit 55602ee.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • abstract class Serializer
    • abstract class SerializerInstance
    • abstract class SerializationStream
    • abstract class DeserializationStream
    • class ExternalSorter(object):

@JoshRosen
Copy link
Contributor

I think we also need to add a license statement to the LICENSE file (like we've done with CloudPickle and Py4J).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As of my new PR, this will need to be changed to "SPARK_LOCAL_DIRS" (plural).

@JoshRosen
Copy link
Contributor

Why do we need to use heapq3? Is there a way to support this feature using the standard Python 2.7 heapq?

@davies
Copy link
Contributor Author

davies commented Aug 19, 2014

In Python 2.6/7, heapq.merge() do not support key and reverse.

davies added 2 commits August 19, 2014 15:09
change SPARK_LOCAL_DIR into SPARK_LOCAL_DIRS
@davies
Copy link
Contributor Author

davies commented Aug 19, 2014

cc @mateiz

@SparkQA
Copy link

SparkQA commented Aug 19, 2014

QA tests have started for PR 1978 at commit 644abaf.

  • This patch does not merge cleanly!

@SparkQA
Copy link

SparkQA commented Aug 19, 2014

QA tests have finished for PR 1978 at commit 644abaf.

  • This patch fails unit tests.
  • This patch does not merge cleanly!

@davies
Copy link
Contributor Author

davies commented Aug 20, 2014

Jenkins, retest this please.

Conflicts:
	python/pyspark/tests.py
@SparkQA
Copy link

SparkQA commented Aug 20, 2014

QA tests have started for PR 1978 at commit 644abaf.

  • This patch does not merge cleanly!

@SparkQA
Copy link

SparkQA commented Aug 20, 2014

QA tests have finished for PR 1978 at commit 644abaf.

  • This patch fails unit tests.
  • This patch does not merge cleanly!

@SparkQA
Copy link

SparkQA commented Aug 20, 2014

QA tests have started for PR 1978 at commit eb53ca6.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 20, 2014

QA tests have finished for PR 1978 at commit eb53ca6.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ExternalSorter(object):

@JoshRosen
Copy link
Contributor

Test failure is due to RAT complaining about some temporary files left behind by another test:

=========================================================================
Running Apache RAT checks
=========================================================================
Could not find Apache license headers in the following files:
 !????? /home/jenkins/workspace/SparkPullRequestBuilder@2/mllib/checkpoint/.temp.crc
 !????? /home/jenkins/workspace/SparkPullRequestBuilder@2/mllib/checkpoint/temp

There's a few ways to fix this:

  • Add an exclude to .rat-excludes
  • Modify the pull request builder to use git clean to remove these untracked files from the working tree.
  • Configure RAT to only check files tracked by git (by using git ls-files)

@JoshRosen
Copy link
Contributor

@massie @jey @pwendell Is there a reason why it would be unsafe to run git clean in the pull request builder? Would this inadvertently delete any files that it needs, such as Spark configurations?

@JoshRosen
Copy link
Contributor

I ssh'd into the worker and deleted that checkpoint directory, so maybe it will work now.

Jenkins, retest this please.

@davies
Copy link
Contributor Author

davies commented Aug 20, 2014

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Aug 20, 2014

QA tests have started for PR 1978 at commit eb53ca6.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 20, 2014

QA tests have finished for PR 1978 at commit eb53ca6.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Conflicts:
	python/pyspark/rdd.py
	python/pyspark/shuffle.py
@davies
Copy link
Contributor Author

davies commented Aug 20, 2014

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Aug 20, 2014

QA tests have started for PR 1978 at commit 1f075ed.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 21, 2014

QA tests have finished for PR 1978 at commit 1f075ed.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo)
    • class ExternalSorter(object):

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because there will be multiple Python worker processes running on the same node, if they all need to spill, it looks like they'll use the same directories in order here. Can you instead start each one at a random ID and then increment that to have it cycle through?

I'm not sure whether this can also affect the external hashing code, but if so, it would be good to fix that too (as a separate JIRA).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically I'm worried that everyone writes to disk1 first, then everyone writes to disk2, etc, and we only use one disk at a time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, maybe shuffling the directories randomly in the begging would be better.

PS: Could you have a configured policy to choose local disks, such as use the first one AMAP, it's will be useful when one of the local disks is SSD.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good question. We don't have that yet, but in the future we'll have support for multiple local storage levels.

@davies
Copy link
Contributor Author

davies commented Aug 26, 2014

@mateiz I had addressed above comments, it also fix the same problem for external merger, please take another look again, thx.

@SparkQA
Copy link

SparkQA commented Aug 26, 2014

QA tests have started for PR 1978 at commit b125d2f.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 26, 2014

QA tests have finished for PR 1978 at commit b125d2f.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ExternalSorter(object):

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tested that this actually spills any data? I guess it does because the bare Python interpreter already consumes more than 1 MB?

@mateiz
Copy link
Contributor

mateiz commented Aug 26, 2014

Looks pretty good, just added one question about the test

@davies
Copy link
Contributor Author

davies commented Aug 26, 2014

@mateiz added checking for spilled bytes in tests.

@SparkQA
Copy link

SparkQA commented Aug 26, 2014

QA tests have started for PR 1978 at commit bbcd9ba.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 26, 2014

QA tests have finished for PR 1978 at commit bbcd9ba.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • $FWDIR/bin/spark-submit --class org.apache.spark.repl.Main "$
    • $FWDIR/bin/spark-submit --class org.apache.spark.repl.Main "$
    • case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo)
    • In multiclass classification, all$2^`
    • public final class JavaDecisionTree
    • class KMeansModel (val clusterCenters: Array[Vector]) extends Serializable
    • class BoundedFloat(float):
    • class ExternalSorter(object):
    • class JoinedRow2 extends Row
    • class JoinedRow3 extends Row
    • class JoinedRow4 extends Row
    • class JoinedRow5 extends Row
    • class GenericRow(protected[sql] val values: Array[Any]) extends Row
    • abstract class MutableValue extends Serializable
    • final class MutableInt extends MutableValue
    • final class MutableFloat extends MutableValue
    • final class MutableBoolean extends MutableValue
    • final class MutableDouble extends MutableValue
    • final class MutableShort extends MutableValue
    • final class MutableLong extends MutableValue
    • final class MutableByte extends MutableValue
    • final class MutableAny extends MutableValue
    • final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableRow
    • case class CountDistinct(expressions: Seq[Expression]) extends PartialAggregate
    • case class CollectHashSet(expressions: Seq[Expression]) extends AggregateExpression
    • case class CollectHashSetFunction(
    • case class CombineSetsAndCount(inputSet: Expression) extends AggregateExpression
    • case class CombineSetsAndCountFunction(
    • case class CountDistinctFunction(
    • case class MaxOf(left: Expression, right: Expression) extends Expression
    • case class NewSet(elementType: DataType) extends LeafExpression
    • case class AddItemToSet(item: Expression, set: Expression) extends Expression
    • case class CombineSets(left: Expression, right: Expression) extends BinaryExpression
    • case class CountSet(child: Expression) extends UnaryExpression
    • case class ExplainCommand(plan: LogicalPlan, extended: Boolean = false) extends Command

@mateiz
Copy link
Contributor

mateiz commented Aug 26, 2014

Cool, thanks! Going to merge this.

@mateiz
Copy link
Contributor

mateiz commented Aug 26, 2014

BTW can you send a PR for the randomizing change to branch-1.1? I don't think we'll add sorting in branch-1.1 since it's a new feature, but we can add that randomizing patch as a bug fix. Or do you think it won't matter much?

@asfgit asfgit closed this in f1e71d4 Aug 27, 2014
@davies
Copy link
Contributor Author

davies commented Aug 27, 2014

@mateiz PR #2152

xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
Using external sort to support sort large datasets in reduce stage.

Author: Davies Liu <[email protected]>

Closes apache#1978 from davies/sort and squashes the following commits:

bbcd9ba [Davies Liu] check spilled bytes in tests
b125d2f [Davies Liu] add test for external sort in rdd
eae0176 [Davies Liu] choose different disks from different processes and instances
1f075ed [Davies Liu] Merge branch 'master' into sort
eb53ca6 [Davies Liu] Merge branch 'master' into sort
644abaf [Davies Liu] add license in LICENSE
19f7873 [Davies Liu] improve tests
55602ee [Davies Liu] use external sort in sortBy() and sortByKey()
@davies davies deleted the sort branch September 15, 2014 22:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants