Skip to content

Commit 763af15

Browse files
committed
Add typeChecking debugging functions
1 parent 8c69303 commit 763af15

File tree

2 files changed

+75
-0
lines changed
  • sql
    • catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions
    • core/src/main/scala/org/apache/spark/sql/execution/debug

2 files changed

+75
-0
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ class InterpretedProjection(expressions: Seq[Expression]) extends Projection {
3939
}
4040
new GenericRow(outputArray)
4141
}
42+
43+
override def toString = s"Row => [${exprArray.mkString(",")}]"
4244
}
4345

4446
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.spark.annotation.DeveloperApi
2424
import org.apache.spark.SparkContext._
2525
import org.apache.spark.sql.{SchemaRDD, Row}
2626
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
27+
import org.apache.spark.sql.catalyst.types._
2728

2829
/**
2930
* :: DeveloperApi ::
@@ -56,6 +57,23 @@ package object debug {
5657
case _ =>
5758
}
5859
}
60+
61+
def typeCheck(): Unit = {
62+
val plan = query.queryExecution.executedPlan
63+
val visited = new collection.mutable.HashSet[TreeNodeRef]()
64+
val debugPlan = plan transform {
65+
case s: SparkPlan if !visited.contains(new TreeNodeRef(s)) =>
66+
visited += new TreeNodeRef(s)
67+
TypeCheck(s)
68+
}
69+
try {
70+
println(s"Results returned: ${debugPlan.execute().count()}")
71+
} catch {
72+
case e: Exception =>
73+
def unwrap(e: Throwable): Throwable = if (e.getCause == null) e else unwrap(e.getCause)
74+
println(s"Deepest Error: ${unwrap(e)}")
75+
}
76+
}
5977
}
6078

6179
private[sql] case class DebugNode(child: SparkPlan) extends UnaryNode {
@@ -115,4 +133,59 @@ package object debug {
115133
}
116134
}
117135
}
136+
137+
object TypeCheck {
138+
def typeCheck(data: Any, schema: DataType): Unit = (data, schema) match {
139+
case (null, _) =>
140+
141+
case (row: Row, StructType(fields)) =>
142+
row.zip(fields.map(_.dataType)).foreach { case(d,t) => typeCheck(d,t) }
143+
case (s: Seq[_], ArrayType(elemType, _)) =>
144+
s.foreach(typeCheck(_, elemType))
145+
case (m: Map[_, _], MapType(keyType, valueType, _)) =>
146+
m.keys.foreach(typeCheck(_, keyType))
147+
m.values.foreach(typeCheck(_, valueType))
148+
149+
case (_: Long, LongType) =>
150+
case (_: Int, IntegerType) =>
151+
case (_: String, StringType) =>
152+
case (_: Float, FloatType) =>
153+
case (_: Byte, ByteType) =>
154+
case (_: Short, ShortType) =>
155+
case (_: Boolean, BooleanType) =>
156+
case (_: Double, DoubleType) =>
157+
158+
case (d, t) => sys.error(s"Invalid data found: got $d (${d.getClass}) expected $t")
159+
}
160+
}
161+
162+
case class TypeCheck(child: SparkPlan) extends SparkPlan {
163+
import TypeCheck._
164+
//def otherCopyArgs = null :: Nil
165+
166+
override def nodeName = ""
167+
168+
override def makeCopy(args: Array[Object]): this.type = TypeCheck(args(0).asInstanceOf[SparkPlan]).asInstanceOf[this.type]
169+
170+
def output = child.output
171+
172+
def children = child :: Nil
173+
174+
def execute() = {
175+
child.execute().map { row =>
176+
try typeCheck(row, child.schema) catch {
177+
case e: Exception =>
178+
sys.error(
179+
s"""
180+
|ERROR WHEN TYPE CHECKING QUERY
181+
|==============================
182+
|$e
183+
|======== BAD TREE ============
184+
|$child
185+
""".stripMargin)
186+
}
187+
row
188+
}
189+
}
190+
}
118191
}

0 commit comments

Comments
 (0)