Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.Collections

import scala.collection.JavaConverters._

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
Expand All @@ -33,6 +34,7 @@ import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingQueryWrapper}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.{AccumulatorV2, LongAccumulator}

/**
Expand Down Expand Up @@ -255,5 +257,13 @@ package object debug {
override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
consume(ctx, input)
}

override def doExecuteBroadcast[T](): Broadcast[T] = {
child.executeBroadcast()
}

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
child.executeColumnar()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.debug

import java.io.ByteArrayOutputStream

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
Expand Down Expand Up @@ -48,4 +50,45 @@ class DebuggingSuite extends SparkFunSuite with SharedSQLContext {
assert(res.forall{ case (subtree, code) =>
subtree.contains("Range") && code.contains("Object[]")})
}

test("SPARK-28537: DebugExec cannot debug broadcast related queries") {
val rightDF = spark.range(10)
val leftDF = spark.range(10)
val joinedDF = leftDF.join(rightDF, leftDF("id") === rightDF("id"))

val captured = new ByteArrayOutputStream()
Console.withOut(captured) {
joinedDF.debug()
}

val output = captured.toString()
assert(output.contains(
"""== BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])) ==
|Tuples output: 0
| id LongType: {}
|== WholeStageCodegen ==
|Tuples output: 10
| id LongType: {java.lang.Long}
|== Range (0, 10, step=1, splits=2) ==
|Tuples output: 0
| id LongType: {}""".stripMargin))
}

test("SPARK-28537: DebugExec cannot debug columnar related queries") {
val df = spark.range(5)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we split this into 2 different tests?

df.persist()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we unpersist this?


val captured = new ByteArrayOutputStream()
Console.withOut(captured) {
df.debug()
}
df.unpersist()

val output = captured.toString().replaceAll("#\\d+", "#x")
assert(output.contains(
"""== InMemoryTableScan [id#xL] ==
|Tuples output: 0
| id LongType: {}
|""".stripMargin))
}
}