@@ -22,13 +22,11 @@ import org.apache.spark.sql.types.{DataType, IntegerType}
2222
2323/**
2424 * Specifies how tuples that share common expressions will be distributed when a query is executed
25- * in parallel on many machines. Distribution can be used to refer to two distinct physical
26- * properties:
27- * - Inter-node partitioning of data: In this case the distribution describes how tuples are
28- * partitioned across physical machines in a cluster. Knowing this property allows some
29- * operators (e.g., Aggregate) to perform partition local operations instead of global ones.
30- * - Intra-partition ordering of data: In this case the distribution describes guarantees made
31- * about how tuples are distributed within a single partition.
25+ * in parallel on many machines.
26+ *
27+ * Distribution here refers to inter-node partitioning of data. That is, it describes how tuples
28+ * are partitioned across physical machines in a cluster. Knowing this property allows some
29+ * operators (e.g., Aggregate) to perform partition local operations instead of global ones.
3230 */
3331sealed trait Distribution {
3432 /**
@@ -70,9 +68,7 @@ case object AllTuples extends Distribution {
7068
7169/**
7270 * Represents data where tuples that share the same values for the `clustering`
73- * [[Expression Expressions ]] will be co-located. Based on the context, this
74- * can mean such tuples are either co-located in the same partition or they will be contiguous
75- * within a single partition.
71+ * [[Expression Expressions ]] will be co-located in the same partition.
7672 */
7773case class ClusteredDistribution (
7874 clustering : Seq [Expression ],
@@ -118,10 +114,12 @@ case class HashClusteredDistribution(
118114
119115/**
120116 * Represents data where tuples have been ordered according to the `ordering`
121- * [[Expression Expressions ]]. This is a strictly stronger guarantee than
122- * [[ClusteredDistribution ]] as an ordering will ensure that tuples that share the
123- * same value for the ordering expressions are contiguous and will never be split across
124- * partitions.
117+ * [[Expression Expressions ]]. Its requirement is defined as the following:
118+ * - Given any 2 adjacent partitions, all the rows of the second partition must be larger than or
119+ * equal to any row in the first partition, according to the `ordering` expressions.
120+ *
121+ * In other words, this distribution requires the rows to be ordered across partitions, but not
122+ * necessarily within a partition.
125123 */
126124case class OrderedDistribution (ordering : Seq [SortOrder ]) extends Distribution {
127125 require(
@@ -241,12 +239,12 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
241239
242240/**
243241 * Represents a partitioning where rows are split across partitions based on some total ordering of
244- * the expressions specified in `ordering`. When data is partitioned in this manner the following
245- * two conditions are guaranteed to hold:
246- * - All row where the expressions in `ordering` evaluate to the same values will be in the same
247- * partition.
248- * - Each partition will have a `min` and `max` row, relative to the given ordering. All rows
249- * that are in between `min` and `max` in this `ordering` will reside in this partition .
242+ * the expressions specified in `ordering`. When data is partitioned in this manner, it guarantees:
243+ * Given any 2 adjacent partitions, all the rows of the second partition must be larger than any row
244+ * in the first partition, according to the `ordering` expressions.
245+ *
246+ * This is a strictly stronger guarantee than what `OrderedDistribution(ordering)` requires, as
247+ * there is no overlap between partitions .
250248 *
251249 * This class extends expression primarily so that transformations over expression will descend
252250 * into its child.
@@ -262,6 +260,22 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int)
262260 super .satisfies0(required) || {
263261 required match {
264262 case OrderedDistribution (requiredOrdering) =>
263+ // If `ordering` is a prefix of `requiredOrdering`:
264+ // Let's say `ordering` is [a, b] and `requiredOrdering` is [a, b, c]. According to the
265+ // RangePartitioning definition, any [a, b] in a previous partition must be smaller
266+ // than any [a, b] in the following partition. This also means any [a, b, c] in a
267+ // previous partition must be smaller than any [a, b, c] in the following partition.
268+ // Thus `RangePartitioning(a, b)` satisfies `OrderedDistribution(a, b, c)`.
269+ //
270+ // If `requiredOrdering` is a prefix of `ordering`:
271+ // Let's say `ordering` is [a, b, c] and `requiredOrdering` is [a, b]. According to the
272+ // RangePartitioning definition, any [a, b, c] in a previous partition must be smaller
273+ // than any [a, b, c] in the following partition. If there is a [a1, b1] from a previous
274+ // partition which is larger than a [a2, b2] from the following partition, then there
275+ // must be a [a1, b1 c1] larger than [a2, b2, c2], which violates RangePartitioning
276+ // definition. So it's guaranteed that, any [a, b] in a previous partition must not be
277+ // greater(i.e. smaller or equal to) than any [a, b] in the following partition. Thus
278+ // `RangePartitioning(a, b, c)` satisfies `OrderedDistribution(a, b)`.
265279 val minSize = Seq (requiredOrdering.size, ordering.size).min
266280 requiredOrdering.take(minSize) == ordering.take(minSize)
267281 case ClusteredDistribution (requiredClustering, _) =>
0 commit comments