Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented Jan 23, 2017

What changes were proposed in this pull request?

A logical Limit is performed physically by two operations LocalLimit and GlobalLimit.

Most of time, we gather all data into a single partition in order to run GlobalLimit. If we use a very big limit number, shuffling data causes performance issue also reduces parallelism.

We can avoid shuffling into single partition if we don't care data ordering. This patch implements this idea by doing a map stage during global limit. It collects the info of row numbers at each partition. For each partition, we locally retrieves limited data without any shuffling to finish this global limit.

For example, we have three partitions with rows (100, 100, 50) respectively. In global limit of 100 rows, we may take (34, 33, 33) rows for each partition locally. After global limit we still have three partitions.

If the data partition has certain ordering, we can't distribute required rows evenly to each partitions because it could change data ordering. But we still can avoid shuffling.

How was this patch tested?

Jenkins tests.

@viirya
Copy link
Member Author

viirya commented Jan 23, 2017

cc @rxin @wzhfy @scwf

@viirya viirya force-pushed the improve-global-limit-parallelism branch from e067b10 to 5dae2da Compare January 23, 2017 13:02
@SparkQA
Copy link

SparkQA commented Jan 23, 2017

Test build #71833 has finished for PR 16677 at commit e067b10.

  • This patch fails from timeout after a configured wait of `250m`.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class FakePartitioning(orgPartition: Partitioning, numPartitions: Int) extends Partitioning
  • case class LocalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode with CodegenSupport
  • case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode

@viirya
Copy link
Member Author

viirya commented Jan 23, 2017

retest this please.

@viirya viirya force-pushed the improve-global-limit-parallelism branch from 5dae2da to 0205cd9 Compare January 23, 2017 15:39
@SparkQA
Copy link

SparkQA commented Jan 23, 2017

Test build #71842 has finished for PR 16677 at commit 5dae2da.

  • This patch fails from timeout after a configured wait of `250m`.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class FakePartitioning(orgPartition: Partitioning, numPartitions: Int) extends Partitioning
  • case class LocalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode with CodegenSupport
  • case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode

@SparkQA
Copy link

SparkQA commented Jan 23, 2017

Test build #71856 has finished for PR 16677 at commit 0205cd9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class FakePartitioning(orgPartition: Partitioning, numPartitions: Int) extends Partitioning
  • case class LocalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode with CodegenSupport
  • case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode

@SparkQA
Copy link

SparkQA commented Jan 23, 2017

Test build #71848 has finished for PR 16677 at commit 5dae2da.

  • This patch fails from timeout after a configured wait of `250m`.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class FakePartitioning(orgPartition: Partitioning, numPartitions: Int) extends Partitioning
  • case class LocalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode with CodegenSupport
  • case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode

@viirya viirya force-pushed the improve-global-limit-parallelism branch 2 times, most recently from 3dec117 to 0a2e96f Compare January 24, 2017 03:55
@SparkQA
Copy link

SparkQA commented Jan 24, 2017

Test build #71901 has finished for PR 16677 at commit 3dec117.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class FakePartitioning(orgPartition: Partitioning, numPartitions: Int) extends Partitioning
  • case class LocalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode with CodegenSupport
  • case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode

@SparkQA
Copy link

SparkQA commented Jan 24, 2017

Test build #71905 has finished for PR 16677 at commit 0a2e96f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class FakePartitioning(orgPartition: Partitioning, numPartitions: Int) extends Partitioning
  • case class LocalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode with CodegenSupport
  • case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode

@viirya viirya force-pushed the improve-global-limit-parallelism branch from 0a2e96f to 4fb5e40 Compare January 24, 2017 11:57
@SparkQA
Copy link

SparkQA commented Jan 24, 2017

Test build #71931 has finished for PR 16677 at commit 4fb5e40.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class FakePartitioning(orgPartition: Partitioning, numPartitions: Int) extends Partitioning
  • case class LocalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode with CodegenSupport
  • case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode

@viirya viirya force-pushed the improve-global-limit-parallelism branch from 4fb5e40 to 9d4cadb Compare January 24, 2017 15:33
@SparkQA
Copy link

SparkQA commented Jan 24, 2017

Test build #71936 has finished for PR 16677 at commit 9d4cadb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class FakePartitioning(orgPartition: Partitioning, numPartitions: Int) extends Partitioning
  • case class LocalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode with CodegenSupport
  • case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode

@viirya viirya changed the title [WIP][SQL] Use map output statistices to improve global limit's parallelism [SPARK-19355][SQL] Use map output statistices to improve global limit's parallelism Jan 25, 2017
@viirya
Copy link
Member Author

viirya commented Jan 25, 2017

also cc @cloud-fan and @hvanhovell

@viirya viirya force-pushed the improve-global-limit-parallelism branch from 9d4cadb to 7f89c30 Compare January 25, 2017 00:59
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about LocalPartitioning

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think here we should use the shuffle rdd to directly read the data from disk.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, right.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getOrElse(empty iter)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we won't reach here, but the change is ok.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its better to broadcast reduceAmounts

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, i have thought it before. forget to add it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can move the logical of construct the shuffled rdd to ShuffleExchange and in global limit we begin with the shuffle rdd.

  1. add a new Distribution for fake partitioning
  2. modify the ShuffledRowRDD to carry the row num of each partition

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this, I am more conservative. Because currently there are no other operators using this feature. So I would tend to not change ShuffleExchange right now.

@SparkQA
Copy link

SparkQA commented Jan 25, 2017

Test build #71955 has finished for PR 16677 at commit 7f89c30.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class FakePartitioning(orgPartition: Partitioning, numPartitions: Int) extends Partitioning
  • case class LocalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode with CodegenSupport
  • case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode

@viirya viirya force-pushed the improve-global-limit-parallelism branch from 7f89c30 to def10e6 Compare January 25, 2017 05:34
@SparkQA
Copy link

SparkQA commented Jan 25, 2017

Test build #71972 has started for PR 16677 at commit def10e6.

@viirya viirya force-pushed the improve-global-limit-parallelism branch from def10e6 to 4e31bb7 Compare January 25, 2017 05:56
@SparkQA
Copy link

SparkQA commented Jan 25, 2017

Test build #71973 has started for PR 16677 at commit 4e31bb7.

@hvanhovell
Copy link
Contributor

Merging to master. Thanks!

@asfgit asfgit closed this in 4f17585 Aug 10, 2018
@viirya
Copy link
Member Author

viirya commented Aug 10, 2018

Thank you! @hvanhovell

-- A test suite for IN LIMIT in parent side, subquery, and both predicate subquery
-- It includes correlated cases.

-- Disable global limit optimization
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have a problem here?

Copy link
Member Author

@viirya viirya Aug 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This disables the optimization to get the limited values exactly the same as the current golden results.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see, thanks!

// partitions. If disabled, scanning data partitions sequentially until reaching limit number.
// Besides, if child output has certain ordering, we can't evenly pick up rows from
// each parititon.
val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && child.outputOrdering == Nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya dumb question, what is child.outputOrdering doing here? I am not entirely sure that we should guarantee that you should get the lowest elements of a dataset if you perform a limit in the middle of a query (a top level sort-limit does have this guarantee). I also don't think the SQL standard supports/mandates this.

Moreover checking child.outputOrdering only checks the order of the partition and not the order of the frame as a whole. You should also add the child.outputPartitioning.

I would be slightly in favor of removing the child.outputOrdering check.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we remove it, we may need to feature flag it first since people may rely on the old behavior. Anyway all of this is up for debate.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a query like select * from table order by a limit 10, I think the expected semantics is going to return top 10 elements, not any 10 elements. In order to not change this behavior, I add this check.

Moreover checking child.outputOrdering only checks the order of the partition and not the order of the frame as a whole. You should also add the child.outputPartitioning.

I think you are correct. We need to check child.outputPartitioning. I think we need to check there is a RangePartitioning. The check should be the child is a range partitioning and has some output ordering. WDYT?

I am not entirely sure that we should guarantee that you should get the lowest elements of a dataset if you perform a limit in the middle of a query (a top level sort-limit does have this guarantee). I also don't think the SQL standard supports/mandates this.
I would be slightly in favor of removing the child.outputOrdering check.

I am not sure for a limit in the middle of a query, if we don't need to consider this. When such query has sort, don't we need to return top limit elements?

cc @cloud-fan too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

select * from table order by a limit 10 gets planned differently right? It should use TakeOrderedAndProjectExec.

There is nothing in the SQL standard that mandates that a nested order by followed by a limit has to respect that ordering clause. In fact, AFAIR, the standard does not even support nested limits (they make stuff non-deterministic).

If we end up supporting this, then I'd rather have an explicit flag in GlobalLimitExec (orderedLimit or something like that) and set that during planning by matching on Limit(limit, Sort(order, true, child)). I want the explicit flag because then we can figure out what limit is doing by looking at the physical plan. I want to explicitly check for an underlying sort to match the current TakeOrderedAndProjectExec semantics and to avoid weird behavior because something way down the plan has set some arbitrary ordering.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I got your point. As the SQL standard doesn't mandates that. I think we can safely remove the child.outputPartitioning check.

Let me open a follow up PR for it.

/**
* The number of outputs for the map task.
*/
def numberOfOutput: Long
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this mean? output blocks? output files?

@rxin
Copy link
Contributor

rxin commented Sep 18, 2018

two questions about this (i just saw this from a different place):

  1. is numOutput about number of records?

  2. how much memory usage will be increased by, for the driver, at scale?

@hvanhovell
Copy link
Contributor

  1. numOutputs is the number or records
  2. 8 bytes per MapStatus.

checkAnswer(
limit2Df.groupBy("id").count().select($"id"),
limit2Df.select($"id"))
withSQLConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT.key -> "true") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we set this flag here? we need to document it.

override def beforeAll(): Unit = {
super.beforeAll()
TestHive.setCacheTables(false)
TestHive.setConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we set this flag here? we need to document it.

@viirya
Copy link
Member Author

viirya commented Sep 18, 2018

@rxin Thanks for the comment. I will improve the document in a pr.

}
}
}
val broadMap = sparkContext.broadcast(takeAmountByPartition)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does "broad" here means broadcast? if yes, i don't think we have this convention in spark ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw why do we need to broadcast this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we want the map to be sent to each node just only once?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also let me change the variable name when improving the document.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also broadcast closures automatically, don't we? so just putting a variable in a closure would accomplish this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

broadcast is more efficient if data size is big, because of TorrentBroadcast. What's our expectation of the data size here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The size depends on the number of partitions. Each partition uses an int. If this is too small, we can remove the broadcast.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but tasks are already broadcasted

@rxin
Copy link
Contributor

rxin commented Sep 19, 2018

actually looking at the design - this could cause perf regressions in some cases too right? it introduces a barrier that was previously non-existent. if the number of records to take isn't substantially less than the actual records on each partition, perf would be much worse. also it feels to me this isn't shuffle at all, and we are piggybacking on the wrong infrastructure. what you really want is a way to buffer blocks temporarily, and can launch a 2nd wave of tasks to rerun some of them.

@viirya
Copy link
Member Author

viirya commented Sep 19, 2018

I'm not sure where it can cause perf regressions. Basically this just changes the way we retrieve records from partitions when performing limit. This doesn't do shuffling them together to single partition.

@cloud-fan
Copy link
Contributor

cloud-fan commented Sep 19, 2018

Let me take an example from the PR description

For example, we have three partitions with rows (100, 100, 50) respectively. In global limit of 100 rows, we may take (34, 33, 33) rows for each partition locally. After global limit we still have three partitions.

Without this patch, we need to take the first 100 rows from each partition, and then perform a shuffle to send all data into one partition and take the first 100 rows.

So if the limit is big, this patch is super useful, if the limit is small, this patch is not that useful but should not be slower.

The only overhead I can think of is, MapStatus needs to carry the numRecords metrics. It should be a small overhead, as MapStatus already carries many information.

@rxin
Copy link
Contributor

rxin commented Sep 19, 2018

ok after thinking about it more, i think we should just revert all of these changes and go back to the drawing board. here's why:

  1. the prs change some of the most common/core parts of spark, and are not properly designed (as in they haven't gone through actual discussions; there's not even a doc on how they work). the prs created a much more complicated implementations for limit / top k. you might be able to justify the complexity with the perf improvements, but we better write them down, discuss them, and make sure they are the right design choices. we also need to explain the execution strategies for limit in comments. this is just a comment about the process, not the actual design.

  2. now onto the design, i am having issues with two major parts:

  • 2a. what this pr really wanted was an abstraction to buffer data, and then have the driver analyze some statistics about data (records per map task), and then make decisions. because spark doesn't yet have that infrastructure, this pr just adds some hacks to shuffle to make it work. there is no proper abstraction here.
  • 2b. i'm not even sure if the algorithm here is the right one. the pr tries to parallelize as much as possible by keeping the same number of tasks. imo a simpler design that would work for more common cases is to buffer the data, get the records per map task, and create a new rdd with the first N number of partitions that reach limit. that way, we don't launch too many asks, and we retain ordering.
  1. the pr implementation quality is poor. variable names are confusing (output vs records); it's severely lacking documentation; the doc for the config option is arcane.

sorry about all of the above, but we gotta do better.

@cloud-fan
Copy link
Contributor

I'm convinced, there are 2 major issues:

  1. abusing shuffle. we need a new mechanism for driver to analyze some statistics about data (records per map task)
  2. too many small tasks. We need a better algorithm to decide the parallelism of limit.

@viirya
Copy link
Member Author

viirya commented Sep 19, 2018

I understood the two major concerns regarding this change. I'm going to submit a pr to revert the change. I will look into this idea further with new design.

asfgit pushed a commit that referenced this pull request Sep 20, 2018
## What changes were proposed in this pull request?

This goes to revert sequential PRs based on some discussion and comments at #16677 (comment).

#22344
#22330
#22239
#16677

## How was this patch tested?

Existing tests.

Closes #22481 from viirya/revert-SPARK-19355-1.

Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 89671a2)
Signed-off-by: Wenchen Fan <[email protected]>
@sujith71955
Copy link
Contributor

@viirya Are we also looking to optimize CollectLimitExec part? I saw in SparkPlan we have an executeTake() method which basically interpolate the number of partitions and processes the limit query. if driver analyze some statistics about data then i think even this algorithm we can optimize right.

@viirya
Copy link
Member Author

viirya commented Oct 10, 2018

@sujith71955 For executeTake, to optimize it we need to collect statistics of RDD. executeTake incrementally scans partitions. Ideally, it should just scan few partitions to return n rows, and remaining partitions can be skipped and don't need to be materialized. So going back to the beginning, IMHO, if we are going to collect the statistics, we will materialize all partitions, and that seems to be opposite to executeTake's optimization.

@sujith71955
Copy link
Contributor

@viirya I am having a usecase where a normal query is taking around 5 seconds where same query with limit 5000 is taking around 17 sec. when i was checking i could find bottleneck in the above mentioned flow.

@sujith71955
Copy link
Contributor

sujith71955 commented Oct 10, 2018

Mainly i think because we are trying to interpolate the number of partitions

@viirya
Copy link
Member Author

viirya commented Oct 10, 2018

@sujith71955 Thanks. I see. The case is somehow different with the problem this PR wants to solve. But I think it is a reasonable use case. May you want to create a ticket for us to track it?

@sujith71955
Copy link
Contributor

Yes sure , i will create a ticket for this issue and Keep you guys in loop. Thanks

@viirya viirya deleted the improve-global-limit-parallelism branch December 27, 2023 18:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.