Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ case class FlatMapGroupsWithStateExec(
// If timeout is based on event time, then filter late data based on watermark
val filteredIter = watermarkPredicateForData match {
case Some(predicate) if timeoutConf == EventTimeTimeout =>
iter.filter(row => !predicate.eval(row))
applyRemovingRowsOlderThanWatermark(iter, predicate)
case _ =>
iter
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ case class StreamingSymmetricHashJoinExec(
WatermarkSupport.watermarkExpression(watermarkAttribute, eventTimeWatermark) match {
case Some(watermarkExpr) =>
val predicate = newPredicate(watermarkExpr, inputAttributes)
inputIter.filter { row => !predicate.eval(row) }
applyRemovingRowsOlderThanWatermark(inputIter, predicate)
case None =>
inputIter
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numLateRows" -> SQLMetrics.createMetric(sparkContext,
"number of rows which are later than watermark"),
"numTotalStateRows" -> SQLMetrics.createMetric(sparkContext, "number of total state rows"),
"numUpdatedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of updated state rows"),
"allUpdatesTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "total time to update rows"),
Expand All @@ -93,7 +95,9 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>
new StateOperatorProgress(
numRowsTotal = longMetric("numTotalStateRows").value,
numRowsUpdated = longMetric("numUpdatedStateRows").value,
memoryUsedBytes = longMetric("stateMemory").value)
memoryUsedBytes = longMetric("stateMemory").value,
numLateInputRows = longMetric("numLateRows").value
)
}

/** Records the duration of running `body` for the next query progress update. */
Expand Down Expand Up @@ -122,6 +126,15 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>
}.toMap
}

protected def applyRemovingRowsOlderThanWatermark(iter: Iterator[InternalRow],
predicate: Predicate): Iterator[InternalRow] = {
iter.filter { row =>
val filteredIn = !predicate.eval(row)
if (!filteredIn) longMetric("numLateRows") += 1
filteredIn
}
}

/**
* Should the MicroBatchExecution run another batch based on this stateful operator and the
* current updated metadata.
Expand Down Expand Up @@ -301,7 +314,8 @@ case class StateStoreSaveExec(
// Assumption: watermark predicates must be non-empty if append mode is allowed
case Some(Append) =>
allUpdatesTimeMs += timeTakenMs {
val filteredIter = iter.filter(row => !watermarkPredicateForData.get.eval(row))
val filteredIter = applyRemovingRowsOlderThanWatermark(iter,
watermarkPredicateForData.get)
while (filteredIter.hasNext) {
val row = filteredIter.next().asInstanceOf[UnsafeRow]
val key = getKey(row)
Expand Down Expand Up @@ -344,7 +358,7 @@ case class StateStoreSaveExec(
new NextIterator[InternalRow] {
// Filter late date using watermark if specified
private[this] val baseIterator = watermarkPredicateForData match {
case Some(predicate) => iter.filter((row: InternalRow) => !predicate.eval(row))
case Some(predicate) => applyRemovingRowsOlderThanWatermark(iter, predicate)
case None => iter
}
private val updatesStartTimeNs = System.nanoTime
Expand Down Expand Up @@ -421,14 +435,13 @@ case class StreamingDeduplicateExec(
Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
val numOutputRows = longMetric("numOutputRows")
val numTotalStateRows = longMetric("numTotalStateRows")
val numUpdatedStateRows = longMetric("numUpdatedStateRows")
val allUpdatesTimeMs = longMetric("allUpdatesTimeMs")
val allRemovalsTimeMs = longMetric("allRemovalsTimeMs")
val commitTimeMs = longMetric("commitTimeMs")

val baseIterator = watermarkPredicateForData match {
case Some(predicate) => iter.filter(row => !predicate.eval(row))
case Some(predicate) => applyRemovingRowsOlderThanWatermark(iter, predicate)
case None => iter
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ import org.apache.spark.annotation.InterfaceStability
class StateOperatorProgress private[sql](
val numRowsTotal: Long,
val numRowsUpdated: Long,
val memoryUsedBytes: Long
val memoryUsedBytes: Long,
val numLateInputRows: Long
) extends Serializable {

/** The compact JSON representation of this progress. */
Expand All @@ -48,12 +49,13 @@ class StateOperatorProgress private[sql](
def prettyJson: String = pretty(render(jsonValue))

private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress =
new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes)
new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes, numLateInputRows)

private[sql] def jsonValue: JValue = {
("numRowsTotal" -> JInt(numRowsTotal)) ~
("numRowsUpdated" -> JInt(numRowsUpdated)) ~
("memoryUsedBytes" -> JInt(memoryUsedBytes))
("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~
("numLateInputRows" -> JInt(numLateInputRows))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you are measuring the number of "keys" filtered out of the state store since they have crossed the late threshold correct ? It may be better to rename this metrics here and at other places to "number of evicted rows". Its better if we could rather expose the actual number of events that were late.

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Jun 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arunmahadevan

Here you are measuring the number of "keys" filtered out of the state store since they have crossed the late threshold correct ?

No, it is based on "input" rows which are filtered out due to watermark threshold. Note that the meaning of "input" is relative, cause it doesn't represent for input rows in overall query, but represents for input rows in state operator.

Its better if we could rather expose the actual number of events that were late.

I guess the comment is based on missing, but I would think that it would be correct that we filtered out late events from the first phase of query (not from state operator) so that we can get correct count of late events. For now filters affect the count.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I meant was, if the input to the state operator is the result of the aggregate, then we would not be counting the actual input rows to the group by. There would be max one row per key, so would give the impression that there are not as many late events but in reality it may be more.

If this is not the case then I am fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arunmahadevan Ah yes got it. If we would want to have accurate number we need to filter out late events from the first time anyway. I guess we may need to defer addressing this until we change the behavior.

}

override def toString: String = prettyJson
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,11 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
AddData(inputData, 25), // Advance watermark to 15 seconds
CheckNewAnswer((10, 5)),
assertNumStateRows(2),
assertNumLateInputRows(0),
AddData(inputData, 10), // Should not emit anything as data less than watermark
CheckNewAnswer(),
assertNumStateRows(2)
assertNumStateRows(2),
assertNumLateInputRows(1)
)
}

Expand All @@ -187,12 +189,15 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
AddData(inputData, 25), // Advance watermark to 15 seconds
CheckNewAnswer((25, 1)),
assertNumStateRows(2),
assertNumLateInputRows(0),
AddData(inputData, 10, 25), // Ignore 10 as its less than watermark
CheckNewAnswer((25, 2)),
assertNumStateRows(2),
assertNumLateInputRows(1),
AddData(inputData, 10), // Should not emit anything as data less than watermark
CheckNewAnswer(),
assertNumStateRows(2)
assertNumStateRows(2),
assertNumLateInputRows(1)
)
}

Expand Down Expand Up @@ -491,6 +496,13 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
true
}

private def assertNumLateInputRows(numLateInputRows: Long): AssertOnQuery = AssertOnQuery { q =>
q.processAllAvailable()
val progressWithData = q.recentProgress.lastOption.get
assert(progressWithData.stateOperators(0).numLateInputRows === numLateInputRows)
true
}

private def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = {
AssertOnQuery { q =>
body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ trait StateStoreMetricsTest extends StreamTest {
lastCheckedRecentProgressIndex = -1
}

def assertNumStateRows(total: Seq[Long], updated: Seq[Long]): AssertOnQuery =
AssertOnQuery(s"Check total state rows = $total, updated state rows = $updated") { q =>
def assertNumStateRows(total: Seq[Long], updated: Seq[Long],
lateInputRows: Seq[Long]): AssertOnQuery =
AssertOnQuery(s"Check total state rows = $total, updated state rows = $updated" +
s", late input rows = $lateInputRows") { q =>
val recentProgress = q.recentProgress
require(recentProgress.nonEmpty, "No progress made, cannot check num state rows")
require(recentProgress.length < spark.sessionState.conf.streamingProgressRetention,
Expand All @@ -57,12 +59,15 @@ trait StateStoreMetricsTest extends StreamTest {
val numUpdatedRows = arraySum(allNumUpdatedRowsSinceLastCheck, numStateOperators)
assert(numUpdatedRows === updated, s"incorrect updates rows, $debugString")

val numLateInputRows = recentProgress.last.stateOperators.map(_.numLateInputRows)
assert(numLateInputRows === lateInputRows, s"incorrect late input rows, $debugString")

lastCheckedRecentProgressIndex = recentProgress.length - 1
true
}

def assertNumStateRows(total: Long, updated: Long): AssertOnQuery =
assertNumStateRows(Seq(total), Seq(updated))
def assertNumStateRows(total: Long, updated: Long, lateInputRows: Long = 0): AssertOnQuery =
assertNumStateRows(Seq(total), Seq(updated), Seq(lateInputRows))

def arraySum(arraySeq: Seq[Array[Long]], arrayLength: Int): Seq[Long] = {
if (arraySeq.isEmpty) return Seq.fill(arrayLength)(0L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,15 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest with BeforeAndAf
testStream(result, Append)(
AddData(inputData, "a" -> 1),
CheckLastBatch("a" -> 1),
assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)),

AddData(inputData, "a" -> 2), // Dropped from the second `dropDuplicates`
CheckLastBatch(),
assertNumStateRows(total = Seq(1L, 2L), updated = Seq(0L, 1L)),
assertNumStateRows(total = Seq(1L, 2L), updated = Seq(0L, 1L), lateInputRows = Seq(0L, 0L)),

AddData(inputData, "b" -> 1),
CheckLastBatch("b" -> 1),
assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L))
)
}

Expand All @@ -107,7 +107,7 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest with BeforeAndAf

AddData(inputData, 10), // Should not emit anything as data less than watermark
CheckNewAnswer(),
assertNumStateRows(total = 1, updated = 0),
assertNumStateRows(total = 1, updated = 0, lateInputRows = 1),

AddData(inputData, 45), // Advance watermark to 35 seconds, no-data-batch drops row 25
CheckNewAnswer(45),
Expand All @@ -131,23 +131,23 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest with BeforeAndAf
CheckLastBatch(),
// states in aggregate in [10, 14), [15, 20) (2 windows)
// states in deduplicate is 10 to 15
assertNumStateRows(total = Seq(2L, 6L), updated = Seq(2L, 6L)),
assertNumStateRows(total = Seq(2L, 6L), updated = Seq(2L, 6L), lateInputRows = Seq(0L, 0L)),

AddData(inputData, 25), // Advance watermark to 15 seconds
CheckLastBatch((10 -> 5)), // 5 items (10 to 14) after deduplicate, emitted with no-data-batch
// states in aggregate in [15, 20) and [25, 30); no-data-batch removed [10, 14)
// states in deduplicate is 25, no-data-batch removed 10 to 14
assertNumStateRows(total = Seq(2L, 1L), updated = Seq(1L, 1L)),
assertNumStateRows(total = Seq(2L, 1L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)),

AddData(inputData, 10), // Should not emit anything as data less than watermark
CheckLastBatch(),
assertNumStateRows(total = Seq(2L, 1L), updated = Seq(0L, 0L)),
assertNumStateRows(total = Seq(2L, 1L), updated = Seq(0L, 0L), lateInputRows = Seq(0L, 1L)),

AddData(inputData, 40), // Advance watermark to 30 seconds
CheckLastBatch((15 -> 1), (25 -> 1)),
// states in aggregate is [40, 45); no-data-batch removed [15, 20) and [25, 30)
// states in deduplicate is 40; no-data-batch removed 25
assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L))
assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L))
)
}

Expand All @@ -163,16 +163,16 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest with BeforeAndAf
testStream(result, Update)(
AddData(inputData, "a" -> 1),
CheckLastBatch("a" -> 1L),
assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)),
AddData(inputData, "a" -> 1), // Dropped
CheckLastBatch(),
assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L), lateInputRows = Seq(0L, 0L)),
AddData(inputData, "a" -> 2),
CheckLastBatch("a" -> 3L),
assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)),
AddData(inputData, "b" -> 1),
CheckLastBatch("b" -> 1L),
assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L))
)
}

Expand All @@ -188,16 +188,16 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest with BeforeAndAf
testStream(result, Complete)(
AddData(inputData, "a" -> 1),
CheckLastBatch("a" -> 1L),
assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)),
AddData(inputData, "a" -> 1), // Dropped
CheckLastBatch("a" -> 1L),
assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L), lateInputRows = Seq(0L, 0L)),
AddData(inputData, "a" -> 2),
CheckLastBatch("a" -> 3L),
assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)),
AddData(inputData, "b" -> 1),
CheckLastBatch("a" -> 3L, "b" -> 1L),
assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L))
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,11 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with

AddData(input2, 1),
CheckNewAnswer(), // Should not join as < 15 removed
assertNumStateRows(total = 2, updated = 0), // row not add as 1 < state key watermark = 15
assertNumStateRows(total = 2, updated = 0), // row not add as 1 < state key watermark = 15

AddData(input1, 5),
CheckNewAnswer(), // Same reason as above
assertNumStateRows(total = 2, updated = 0)
assertNumStateRows(total = 2, updated = 0, lateInputRows = 1) // row later than watermark
)
}

Expand Down Expand Up @@ -216,12 +216,12 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with
// (1, 28) ==> passed filter, matched with left (1, 3) and (1, 5), added to state
AddData(rightInput, (1, 20), (1, 21), (1, 28)),
CheckNewAnswer((1, 3, 21), (1, 5, 21), (1, 3, 28), (1, 5, 28)),
assertNumStateRows(total = 5, updated = 1),
assertNumStateRows(total = 5, updated = 1, lateInputRows = 1),

// New data to left input with leftTime <= 20 should be filtered due to event time watermark
AddData(leftInput, (1, 20), (1, 21)),
CheckNewAnswer((1, 21, 28)),
assertNumStateRows(total = 6, updated = 1)
assertNumStateRows(total = 6, updated = 1, lateInputRows = 1)
)
}

Expand Down Expand Up @@ -290,7 +290,7 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with

AddData(leftInput, (1, 30), (1, 31)), // 30 should not be processed or added to state
CheckNewAnswer((1, 31, 26), (1, 31, 30), (1, 31, 31)),
assertNumStateRows(total = 11, updated = 1), // only 31 added
assertNumStateRows(total = 11, updated = 1, lateInputRows = 1), // only 31 added

// Advance the watermark
AddData(rightInput, (1, 80)),
Expand All @@ -304,7 +304,7 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with

AddData(rightInput, (1, 46), (1, 50)), // 46 should not be processed or added to state
CheckNewAnswer((1, 49, 50), (1, 50, 50)),
assertNumStateRows(total = 7, updated = 1) // 50 added
assertNumStateRows(total = 7, updated = 1, lateInputRows = 1) // 50 added
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
| "stateOperators" : [ {
| "numRowsTotal" : 0,
| "numRowsUpdated" : 1,
| "memoryUsedBytes" : 2
| "memoryUsedBytes" : 2,
| "numLateInputRows" : 0
| } ],
| "sources" : [ {
| "description" : "source",
Expand Down Expand Up @@ -91,7 +92,8 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
| "stateOperators" : [ {
| "numRowsTotal" : 0,
| "numRowsUpdated" : 1,
| "memoryUsedBytes" : 2
| "memoryUsedBytes" : 2,
| "numLateInputRows" : 0
| } ],
| "sources" : [ {
| "description" : "source",
Expand Down Expand Up @@ -230,7 +232,7 @@ object StreamingQueryStatusAndProgressSuite {
"avg" -> "2016-12-05T20:54:20.827Z",
"watermark" -> "2016-12-05T20:54:20.827Z").asJava),
stateOperators = Array(new StateOperatorProgress(
numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2)),
numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2, numLateInputRows = 0)),
sources = Array(
new SourceProgress(
description = "source",
Expand All @@ -254,7 +256,7 @@ object StreamingQueryStatusAndProgressSuite {
// empty maps should be handled correctly
eventTime = new java.util.HashMap(Map.empty[String, String].asJava),
stateOperators = Array(new StateOperatorProgress(
numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2)),
numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2, numLateInputRows = 0)),
sources = Array(
new SourceProgress(
description = "source",
Expand Down