Skip to content

Commit 7112da8

Browse files
committed
[SPARK-2554][SQL] CountDistinct partial aggregation and object allocation improvements
Author: Michael Armbrust <[email protected]> Author: Gregory Owen <[email protected]> Closes apache#1935 from marmbrus/countDistinctPartial and squashes the following commits: 5c7848d [Michael Armbrust] turn off caching in the constructor 8074a80 [Michael Armbrust] fix tests 32d216f [Michael Armbrust] reynolds comments c122cca [Michael Armbrust] Address comments, add tests b2e8ef3 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into countDistinctPartial fae38f4 [Michael Armbrust] Fix style fdca896 [Michael Armbrust] cleanup 93d0f64 [Michael Armbrust] metastore concurrency fix. db44a30 [Michael Armbrust] JIT hax. 3868f6c [Michael Armbrust] Merge pull request #9 from GregOwen/countDistinctPartial c9e67de [Gregory Owen] Made SpecificRow and types serializable by Kryo 2b46c4b [Michael Armbrust] Merge remote-tracking branch 'origin/master' into countDistinctPartial 8ff6402 [Michael Armbrust] Add specific row. 58d15f1 [Michael Armbrust] disable codegen logging 87d101d [Michael Armbrust] Fix isNullAt bug abee26d [Michael Armbrust] WIP 27984d0 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into countDistinctPartial 57ae3b1 [Michael Armbrust] Fix order dependent test b3d0f64 [Michael Armbrust] Add golden files. c1f7114 [Michael Armbrust] Improve tests / fix serialization. f31b8ad [Michael Armbrust] more fixes 38c7449 [Michael Armbrust] comments and style 9153652 [Michael Armbrust] better toString d494598 [Michael Armbrust] Fix tests now that the planner is better 41fbd1d [Michael Armbrust] Never try and create an empty hash set. 050bb97 [Michael Armbrust] Skip no-arg constructors for kryo, bd08239 [Michael Armbrust] WIP 213ada8 [Michael Armbrust] First draft of partially aggregated and code generated count distinct / max (cherry picked from commit 7e191fe) Signed-off-by: Michael Armbrust <[email protected]>
1 parent 9309786 commit 7112da8

File tree

33 files changed

+1239
-34
lines changed

33 files changed

+1239
-34
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala

Lines changed: 342 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ class InterpretedProjection(expressions: Seq[Expression]) extends Projection {
2727
def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
2828
this(expressions.map(BindReferences.bindReference(_, inputSchema)))
2929

30-
protected val exprArray = expressions.toArray
30+
// null check is required for when Kryo invokes the no-arg constructor.
31+
protected val exprArray = if (expressions != null) expressions.toArray else null
3132

3233
def apply(input: Row): Row = {
3334
val outputArray = new Array[Any](exprArray.length)
@@ -109,7 +110,346 @@ class JoinedRow extends Row {
109110
def apply(i: Int) =
110111
if (i < row1.size) row1(i) else row2(i - row1.size)
111112

112-
def isNullAt(i: Int) = apply(i) == null
113+
def isNullAt(i: Int) =
114+
if (i < row1.size) row1.isNullAt(i) else row2.isNullAt(i - row1.size)
115+
116+
def getInt(i: Int): Int =
117+
if (i < row1.size) row1.getInt(i) else row2.getInt(i - row1.size)
118+
119+
def getLong(i: Int): Long =
120+
if (i < row1.size) row1.getLong(i) else row2.getLong(i - row1.size)
121+
122+
def getDouble(i: Int): Double =
123+
if (i < row1.size) row1.getDouble(i) else row2.getDouble(i - row1.size)
124+
125+
def getBoolean(i: Int): Boolean =
126+
if (i < row1.size) row1.getBoolean(i) else row2.getBoolean(i - row1.size)
127+
128+
def getShort(i: Int): Short =
129+
if (i < row1.size) row1.getShort(i) else row2.getShort(i - row1.size)
130+
131+
def getByte(i: Int): Byte =
132+
if (i < row1.size) row1.getByte(i) else row2.getByte(i - row1.size)
133+
134+
def getFloat(i: Int): Float =
135+
if (i < row1.size) row1.getFloat(i) else row2.getFloat(i - row1.size)
136+
137+
def getString(i: Int): String =
138+
if (i < row1.size) row1.getString(i) else row2.getString(i - row1.size)
139+
140+
def copy() = {
141+
val totalSize = row1.size + row2.size
142+
val copiedValues = new Array[Any](totalSize)
143+
var i = 0
144+
while(i < totalSize) {
145+
copiedValues(i) = apply(i)
146+
i += 1
147+
}
148+
new GenericRow(copiedValues)
149+
}
150+
151+
override def toString() = {
152+
val row = (if (row1 != null) row1 else Seq[Any]()) ++ (if (row2 != null) row2 else Seq[Any]())
153+
s"[${row.mkString(",")}]"
154+
}
155+
}
156+
157+
/**
158+
* JIT HACK: Replace with macros
159+
* The `JoinedRow` class is used in many performance critical situation. Unfortunately, since there
160+
* are multiple different types of `Rows` that could be stored as `row1` and `row2` most of the
161+
* calls in the critical path are polymorphic. By creating special versions of this class that are
162+
* used in only a single location of the code, we increase the chance that only a single type of
163+
* Row will be referenced, increasing the opportunity for the JIT to play tricks. This sounds
164+
* crazy but in benchmarks it had noticeable effects.
165+
*/
166+
class JoinedRow2 extends Row {
167+
private[this] var row1: Row = _
168+
private[this] var row2: Row = _
169+
170+
def this(left: Row, right: Row) = {
171+
this()
172+
row1 = left
173+
row2 = right
174+
}
175+
176+
/** Updates this JoinedRow to used point at two new base rows. Returns itself. */
177+
def apply(r1: Row, r2: Row): Row = {
178+
row1 = r1
179+
row2 = r2
180+
this
181+
}
182+
183+
/** Updates this JoinedRow by updating its left base row. Returns itself. */
184+
def withLeft(newLeft: Row): Row = {
185+
row1 = newLeft
186+
this
187+
}
188+
189+
/** Updates this JoinedRow by updating its right base row. Returns itself. */
190+
def withRight(newRight: Row): Row = {
191+
row2 = newRight
192+
this
193+
}
194+
195+
def iterator = row1.iterator ++ row2.iterator
196+
197+
def length = row1.length + row2.length
198+
199+
def apply(i: Int) =
200+
if (i < row1.size) row1(i) else row2(i - row1.size)
201+
202+
def isNullAt(i: Int) =
203+
if (i < row1.size) row1.isNullAt(i) else row2.isNullAt(i - row1.size)
204+
205+
def getInt(i: Int): Int =
206+
if (i < row1.size) row1.getInt(i) else row2.getInt(i - row1.size)
207+
208+
def getLong(i: Int): Long =
209+
if (i < row1.size) row1.getLong(i) else row2.getLong(i - row1.size)
210+
211+
def getDouble(i: Int): Double =
212+
if (i < row1.size) row1.getDouble(i) else row2.getDouble(i - row1.size)
213+
214+
def getBoolean(i: Int): Boolean =
215+
if (i < row1.size) row1.getBoolean(i) else row2.getBoolean(i - row1.size)
216+
217+
def getShort(i: Int): Short =
218+
if (i < row1.size) row1.getShort(i) else row2.getShort(i - row1.size)
219+
220+
def getByte(i: Int): Byte =
221+
if (i < row1.size) row1.getByte(i) else row2.getByte(i - row1.size)
222+
223+
def getFloat(i: Int): Float =
224+
if (i < row1.size) row1.getFloat(i) else row2.getFloat(i - row1.size)
225+
226+
def getString(i: Int): String =
227+
if (i < row1.size) row1.getString(i) else row2.getString(i - row1.size)
228+
229+
def copy() = {
230+
val totalSize = row1.size + row2.size
231+
val copiedValues = new Array[Any](totalSize)
232+
var i = 0
233+
while(i < totalSize) {
234+
copiedValues(i) = apply(i)
235+
i += 1
236+
}
237+
new GenericRow(copiedValues)
238+
}
239+
240+
override def toString() = {
241+
val row = (if (row1 != null) row1 else Seq[Any]()) ++ (if (row2 != null) row2 else Seq[Any]())
242+
s"[${row.mkString(",")}]"
243+
}
244+
}
245+
246+
/**
247+
* JIT HACK: Replace with macros
248+
*/
249+
class JoinedRow3 extends Row {
250+
private[this] var row1: Row = _
251+
private[this] var row2: Row = _
252+
253+
def this(left: Row, right: Row) = {
254+
this()
255+
row1 = left
256+
row2 = right
257+
}
258+
259+
/** Updates this JoinedRow to used point at two new base rows. Returns itself. */
260+
def apply(r1: Row, r2: Row): Row = {
261+
row1 = r1
262+
row2 = r2
263+
this
264+
}
265+
266+
/** Updates this JoinedRow by updating its left base row. Returns itself. */
267+
def withLeft(newLeft: Row): Row = {
268+
row1 = newLeft
269+
this
270+
}
271+
272+
/** Updates this JoinedRow by updating its right base row. Returns itself. */
273+
def withRight(newRight: Row): Row = {
274+
row2 = newRight
275+
this
276+
}
277+
278+
def iterator = row1.iterator ++ row2.iterator
279+
280+
def length = row1.length + row2.length
281+
282+
def apply(i: Int) =
283+
if (i < row1.size) row1(i) else row2(i - row1.size)
284+
285+
def isNullAt(i: Int) =
286+
if (i < row1.size) row1.isNullAt(i) else row2.isNullAt(i - row1.size)
287+
288+
def getInt(i: Int): Int =
289+
if (i < row1.size) row1.getInt(i) else row2.getInt(i - row1.size)
290+
291+
def getLong(i: Int): Long =
292+
if (i < row1.size) row1.getLong(i) else row2.getLong(i - row1.size)
293+
294+
def getDouble(i: Int): Double =
295+
if (i < row1.size) row1.getDouble(i) else row2.getDouble(i - row1.size)
296+
297+
def getBoolean(i: Int): Boolean =
298+
if (i < row1.size) row1.getBoolean(i) else row2.getBoolean(i - row1.size)
299+
300+
def getShort(i: Int): Short =
301+
if (i < row1.size) row1.getShort(i) else row2.getShort(i - row1.size)
302+
303+
def getByte(i: Int): Byte =
304+
if (i < row1.size) row1.getByte(i) else row2.getByte(i - row1.size)
305+
306+
def getFloat(i: Int): Float =
307+
if (i < row1.size) row1.getFloat(i) else row2.getFloat(i - row1.size)
308+
309+
def getString(i: Int): String =
310+
if (i < row1.size) row1.getString(i) else row2.getString(i - row1.size)
311+
312+
def copy() = {
313+
val totalSize = row1.size + row2.size
314+
val copiedValues = new Array[Any](totalSize)
315+
var i = 0
316+
while(i < totalSize) {
317+
copiedValues(i) = apply(i)
318+
i += 1
319+
}
320+
new GenericRow(copiedValues)
321+
}
322+
323+
override def toString() = {
324+
val row = (if (row1 != null) row1 else Seq[Any]()) ++ (if (row2 != null) row2 else Seq[Any]())
325+
s"[${row.mkString(",")}]"
326+
}
327+
}
328+
329+
/**
330+
* JIT HACK: Replace with macros
331+
*/
332+
class JoinedRow4 extends Row {
333+
private[this] var row1: Row = _
334+
private[this] var row2: Row = _
335+
336+
def this(left: Row, right: Row) = {
337+
this()
338+
row1 = left
339+
row2 = right
340+
}
341+
342+
/** Updates this JoinedRow to used point at two new base rows. Returns itself. */
343+
def apply(r1: Row, r2: Row): Row = {
344+
row1 = r1
345+
row2 = r2
346+
this
347+
}
348+
349+
/** Updates this JoinedRow by updating its left base row. Returns itself. */
350+
def withLeft(newLeft: Row): Row = {
351+
row1 = newLeft
352+
this
353+
}
354+
355+
/** Updates this JoinedRow by updating its right base row. Returns itself. */
356+
def withRight(newRight: Row): Row = {
357+
row2 = newRight
358+
this
359+
}
360+
361+
def iterator = row1.iterator ++ row2.iterator
362+
363+
def length = row1.length + row2.length
364+
365+
def apply(i: Int) =
366+
if (i < row1.size) row1(i) else row2(i - row1.size)
367+
368+
def isNullAt(i: Int) =
369+
if (i < row1.size) row1.isNullAt(i) else row2.isNullAt(i - row1.size)
370+
371+
def getInt(i: Int): Int =
372+
if (i < row1.size) row1.getInt(i) else row2.getInt(i - row1.size)
373+
374+
def getLong(i: Int): Long =
375+
if (i < row1.size) row1.getLong(i) else row2.getLong(i - row1.size)
376+
377+
def getDouble(i: Int): Double =
378+
if (i < row1.size) row1.getDouble(i) else row2.getDouble(i - row1.size)
379+
380+
def getBoolean(i: Int): Boolean =
381+
if (i < row1.size) row1.getBoolean(i) else row2.getBoolean(i - row1.size)
382+
383+
def getShort(i: Int): Short =
384+
if (i < row1.size) row1.getShort(i) else row2.getShort(i - row1.size)
385+
386+
def getByte(i: Int): Byte =
387+
if (i < row1.size) row1.getByte(i) else row2.getByte(i - row1.size)
388+
389+
def getFloat(i: Int): Float =
390+
if (i < row1.size) row1.getFloat(i) else row2.getFloat(i - row1.size)
391+
392+
def getString(i: Int): String =
393+
if (i < row1.size) row1.getString(i) else row2.getString(i - row1.size)
394+
395+
def copy() = {
396+
val totalSize = row1.size + row2.size
397+
val copiedValues = new Array[Any](totalSize)
398+
var i = 0
399+
while(i < totalSize) {
400+
copiedValues(i) = apply(i)
401+
i += 1
402+
}
403+
new GenericRow(copiedValues)
404+
}
405+
406+
override def toString() = {
407+
val row = (if (row1 != null) row1 else Seq[Any]()) ++ (if (row2 != null) row2 else Seq[Any]())
408+
s"[${row.mkString(",")}]"
409+
}
410+
}
411+
412+
/**
413+
* JIT HACK: Replace with macros
414+
*/
415+
class JoinedRow5 extends Row {
416+
private[this] var row1: Row = _
417+
private[this] var row2: Row = _
418+
419+
def this(left: Row, right: Row) = {
420+
this()
421+
row1 = left
422+
row2 = right
423+
}
424+
425+
/** Updates this JoinedRow to used point at two new base rows. Returns itself. */
426+
def apply(r1: Row, r2: Row): Row = {
427+
row1 = r1
428+
row2 = r2
429+
this
430+
}
431+
432+
/** Updates this JoinedRow by updating its left base row. Returns itself. */
433+
def withLeft(newLeft: Row): Row = {
434+
row1 = newLeft
435+
this
436+
}
437+
438+
/** Updates this JoinedRow by updating its right base row. Returns itself. */
439+
def withRight(newRight: Row): Row = {
440+
row2 = newRight
441+
this
442+
}
443+
444+
def iterator = row1.iterator ++ row2.iterator
445+
446+
def length = row1.length + row2.length
447+
448+
def apply(i: Int) =
449+
if (i < row1.size) row1(i) else row2(i - row1.size)
450+
451+
def isNullAt(i: Int) =
452+
if (i < row1.size) row1.isNullAt(i) else row2.isNullAt(i - row1.size)
113453

114454
def getInt(i: Int): Int =
115455
if (i < row1.size) row1.getInt(i) else row2.getInt(i - row1.size)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ object EmptyRow extends Row {
127127
* the array is not copied, and thus could technically be mutated after creation, this is not
128128
* allowed.
129129
*/
130-
class GenericRow(protected[catalyst] val values: Array[Any]) extends Row {
130+
class GenericRow(protected[sql] val values: Array[Any]) extends Row {
131131
/** No-arg constructor for serialization. */
132132
def this() = this(null)
133133

0 commit comments

Comments
 (0)