Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,9 @@ public static long parseSecondNano(String secondNano) throws IllegalArgumentExce
public final int months;
public final long microseconds;

/**
* Return the interval in miliseconds, not including the months in interval.
*/
public final long milliseconds() {
Copy link
Contributor Author

@tdas tdas Dec 16, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin Documented as per your request

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm why don't you remove this and i will send a patch for docs.

return this.microseconds / MICROS_PER_MILLI;
}
Expand Down
4 changes: 4 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,10 @@ class Dataset[T] private[sql](
val parsedDelay =
Option(CalendarInterval.fromString("interval " + delayThreshold))
.getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'"))
// Threshold specified in months/years is non-deterministic
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think waiting 1 month for late data is a reasonable use case. Based on the definition of the watermark, its actually okay for us to over estimate this delay too. Why not take take the max (31 days, leap year)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i worry users think it will align with month boundary. It is safer to just say "31 days".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does "safe" mean in this context? users must not rely on watermarks for correctness as they can be arbitrarily delayed based on batch boundaries. I think this error actually confuses the point as its is enforcing precision when this API cannot provide that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when i have a record that's 29 days late in feb what should I expect? If we want to just change month to a fix number of days, I'd say just use 30, and then document it clearly in the API (e.g. "if month is specified, 1 month = 30 days").

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when i have a record that's 29 days late in feb what should I expect?

If I set the watermark to 1 month I would expect that in no case will a record that arrives on the 1st (of any month) be dropped before that month is over. This expectation should hold true for February as well as January. As such, I would pick 31 days (and document it as you suggest).

You should never expect data to be dropped by a watermark. If you want to ensure that data will be dropped, you should use a filter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm

if (parsedDelay.months > 0) {
throw new AnalysisException(s"Cannot specify time delay in months or years, use days instead")
}
EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.collection.JavaConverters._

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
Expand Down Expand Up @@ -84,6 +84,8 @@ trait ProgressReporter extends Logging {
private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
timestampFormat.setTimeZone(TimeZone.getTimeZone("UTC"))

private val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty

@volatile
protected var currentStatus: StreamingQueryStatus = {
new StreamingQueryStatus(
Expand Down Expand Up @@ -182,7 +184,9 @@ trait ProgressReporter extends Logging {

/** Extracts statistics from the most recent query execution. */
private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = {
val watermarkTimestamp = Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
val watermarkTimestamp =
if (hasEventTime) Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
else Map.empty[String, String]

if (!hasNewData) {
return ExecutionStats(Map.empty, Seq.empty, watermarkTimestamp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.InternalOutputModes.Complete

class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Logging {

import testImplicits._

Expand All @@ -52,48 +53,78 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
assert(e.getMessage contains "int")
}

test("error on delay in months/years") {
val inputData = MemoryStream[Int].toDF()

def check(delayThreshold: String): Unit = {
val e = intercept[AnalysisException] {
inputData.withWatermark("value", delayThreshold)
}
assert(e.getMessage.contains("month"))
assert(e.getMessage.contains("year"))
}
check("1 year")
check("2 months")
}

test("event time and watermark metrics") {
val inputData = MemoryStream[Int]
def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = AssertOnQuery { q =>
body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime)
true
}

val windowedAggregation = inputData.toDF()
// No event time metrics when there is no watermarking
val inputData1 = MemoryStream[Int]
val aggWithoutWatermark = inputData1.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
.groupBy(window($"eventTime", "5 seconds") as 'window)
.agg(count("*") as 'count)
.select($"window".getField("start").cast("long").as[Long], $"count".as[Long])

testStream(aggWithoutWatermark, outputMode = Complete)(
AddData(inputData1, 15),
CheckAnswer((15, 1)),
assertEventStats { e => assert(e.isEmpty) },
AddData(inputData1, 10, 12, 14),
CheckAnswer((10, 3), (15, 1)),
assertEventStats { e => assert(e.isEmpty) }
)

// All event time metrics where watermarking is set
val inputData2 = MemoryStream[Int]
val aggWithWatermark = inputData2.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
.withWatermark("eventTime", "10 seconds")
.groupBy(window($"eventTime", "5 seconds") as 'window)
.agg(count("*") as 'count)
.select($"window".getField("start").cast("long").as[Long], $"count".as[Long])

def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = AssertOnQuery { q =>
body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime)
true
}

testStream(windowedAggregation)(
AddData(inputData, 15),
testStream(aggWithWatermark)(
AddData(inputData2, 15),
CheckAnswer(),
assertEventStats { e =>
assert(e.get("max") === formatTimestamp(15))
assert(e.get("min") === formatTimestamp(15))
assert(e.get("avg") === formatTimestamp(15))
assert(e.get("watermark") === formatTimestamp(0))
},
AddData(inputData, 10, 12, 14),
AddData(inputData2, 10, 12, 14),
CheckAnswer(),
assertEventStats { e =>
assert(e.get("max") === formatTimestamp(14))
assert(e.get("min") === formatTimestamp(10))
assert(e.get("avg") === formatTimestamp(12))
assert(e.get("watermark") === formatTimestamp(5))
},
AddData(inputData, 25),
AddData(inputData2, 25),
CheckAnswer(),
assertEventStats { e =>
assert(e.get("max") === formatTimestamp(25))
assert(e.get("min") === formatTimestamp(25))
assert(e.get("avg") === formatTimestamp(25))
assert(e.get("watermark") === formatTimestamp(5))
},
AddData(inputData, 25),
AddData(inputData2, 25),
CheckAnswer((10, 3)),
assertEventStats { e =>
assert(e.get("max") === formatTimestamp(25))
Expand Down