Skip to content

Conversation

@brkyvz
Copy link
Contributor

@brkyvz brkyvz commented Mar 26, 2020

What changes were proposed in this pull request?

In Structured Streaming, we provide progress updates every 10 seconds when a stream doesn't have any new data upstream. When providing this progress though, we zero out the input information but not the output information. This PR fixes that bug.

Why are the changes needed?

Fixes a bug around incorrect metrics

Does this PR introduce any user-facing change?

Fixes a bug in the metrics

How was this patch tested?

New regression test

observedMetrics = new java.util.HashMap(observedMetrics.asJava))

if (hasNewData) {
if (hasExecuted) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was also incorrect for no new data micro batches

Copy link
Contributor

@HeartSaVioR HeartSaVioR Mar 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice finding. We haven't recognized the bug because lastNoDataProgressEventTime is set to Long.MinValue which makes next no new data micro batch to update the progress immediately, which hides the bug. (If that's intentional, well, then it's too tricky and we should have commented here.)

Maybe we should also rename lastNoDataProgressEventTime as well as the fix changes the semantic?

And we may want to revisit that our intention is updating progress immediately whenever the batch has not run after any batch run.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, that's why I'm facing issues... I understand better now

@SparkQA
Copy link

SparkQA commented Mar 26, 2020

Test build #120438 has finished for PR 28040 at commit 7def4de.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 26, 2020

Test build #120442 has finished for PR 28040 at commit b57aa74.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val clock = new StreamManualClock()

testStream(aggWithWatermark)(
StartStream(Trigger.ProcessingTime("interval 1 second"), clock),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdas This test tests append mode output for no-data microbatches

}

def stateOperatorProgresses: Seq[StateOperatorProgress] = {
val operatorProgress = mutable.ArrayBuffer[StateOperatorProgress]()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't find this code readable, therefore removed the hack here

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Mar 27, 2020

I've proposed similar issue (different bug but the approach to resolve would be similar) in #25987 in Oct. 2019. It didn't get some love. Could we please revisit it as well? Thanks in advance.

@SparkQA
Copy link

SparkQA commented Mar 27, 2020

Test build #120444 has finished for PR 28040 at commit 4655611.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


addTimestamp(104, 123) // watermark = 90 before this, watermark = 123 - 10 = 113 after this
check((100L, 105L) -> 2L) // no-data-batch emits results on 100-105,
assert(query.lastProgress.sink.numOutputRows === 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The last progress here is for "no data & no run" because of the reason I commented earlier - that's why the test fails.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually no. FileStreamSink is a V1 sink and doesn't support output metrics it seems

Copy link
Contributor

@HeartSaVioR HeartSaVioR Mar 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value is -1 instead of 0 if it doesn't support output metrics, and as you can see the error message in Jenkins build, here the value is 0 instead of -1, because the patch overwrites the value to 0 when the batch hasn't run. So yes the last progress here is for "no data & no run", though the new commit should fix this problem.

V1 suite

{
  "id" : "1bbf91ac-0a24-4da7-bfe8-54ce1ac63e0f",
  "runId" : "ac738c58-a976-4c87-9547-b4a1ee1c2560",
  "name" : null,
  "timestamp" : "2020-03-28T23:33:08.567Z",
  "batchId" : 0,
  "numInputRows" : 1,
  "inputRowsPerSecond" : 83.33333333333333,
  "processedRowsPerSecond" : 0.3835826620636747,
  "durationMs" : {
    "addBatch" : 2055,
    "getBatch" : 2,
    "latestOffset" : 0,
    "queryPlanning" : 449,
    "triggerExecution" : 2607,
    "walCommit" : 49
  },
  "eventTime" : {
    "avg" : "1970-01-01T00:01:40.000Z",
    "max" : "1970-01-01T00:01:40.000Z",
    "min" : "1970-01-01T00:01:40.000Z",
    "watermark" : "1970-01-01T00:00:00.000Z"
  },
  "stateOperators" : [ {
    "numRowsTotal" : 1,
    "numRowsUpdated" : 1,
    "memoryUsedBytes" : 1400,
    "customMetrics" : {
      "loadedMapCacheHitCount" : 0,
      "loadedMapCacheMissCount" : 0,
      "stateOnCurrentVersionSizeBytes" : 680
    }
  } ],
  "sources" : [ {
    "description" : "MemoryStream[value#1L]",
    "startOffset" : null,
    "endOffset" : 0,
    "numInputRows" : 1,
    "inputRowsPerSecond" : 83.33333333333333,
    "processedRowsPerSecond" : 0.3835826620636747
  } ],
  "sink" : {
    "description" : "FileSink[/private/var/folders/wn/3hpqx8015hjbmq43hmrw78z40000gn/T/stream.output-cf800c40-1e18-405e-b48f-71a08348a298]",
    "numOutputRows" : -1
  }
}
{
  "id" : "1bbf91ac-0a24-4da7-bfe8-54ce1ac63e0f",
  "runId" : "ac738c58-a976-4c87-9547-b4a1ee1c2560",
  "name" : null,
  "timestamp" : "2020-03-28T23:33:11.185Z",
  "batchId" : 1,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "addBatch" : 935,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 52,
    "triggerExecution" : 1101,
    "walCommit" : 70
  },
  "eventTime" : {
    "watermark" : "1970-01-01T00:01:30.000Z"
  },
  "stateOperators" : [ {
    "numRowsTotal" : 1,
    "numRowsUpdated" : 0,
    "memoryUsedBytes" : 2272,
    "customMetrics" : {
      "loadedMapCacheHitCount" : 10,
      "loadedMapCacheMissCount" : 0,
      "stateOnCurrentVersionSizeBytes" : 720
    }
  } ],
  "sources" : [ {
    "description" : "MemoryStream[value#1L]",
    "startOffset" : 0,
    "endOffset" : 0,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "FileSink[/private/var/folders/wn/3hpqx8015hjbmq43hmrw78z40000gn/T/stream.output-cf800c40-1e18-405e-b48f-71a08348a298]",
    "numOutputRows" : -1
  }
}
{
  "id" : "1bbf91ac-0a24-4da7-bfe8-54ce1ac63e0f",
  "runId" : "ac738c58-a976-4c87-9547-b4a1ee1c2560",
  "name" : null,
  "timestamp" : "2020-03-28T23:33:12.287Z",
  "batchId" : 2,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 0,
    "triggerExecution" : 0
  },
  "eventTime" : {
    "watermark" : "1970-01-01T00:01:30.000Z"
  },
  "stateOperators" : [ {
    "numRowsTotal" : 1,
    "numRowsUpdated" : 0,
    "memoryUsedBytes" : 2272,
    "customMetrics" : {
      "loadedMapCacheHitCount" : 10,
      "loadedMapCacheMissCount" : 0,
      "stateOnCurrentVersionSizeBytes" : 720
    }
  } ],
  "sources" : [ {
    "description" : "MemoryStream[value#1L]",
    "startOffset" : 0,
    "endOffset" : 0,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "FileSink[/private/var/folders/wn/3hpqx8015hjbmq43hmrw78z40000gn/T/stream.output-cf800c40-1e18-405e-b48f-71a08348a298]",
    "numOutputRows" : 0
  }
}
{
  "id" : "1bbf91ac-0a24-4da7-bfe8-54ce1ac63e0f",
  "runId" : "ac738c58-a976-4c87-9547-b4a1ee1c2560",
  "name" : null,
  "timestamp" : "2020-03-28T23:33:13.066Z",
  "batchId" : 2,
  "numInputRows" : 2,
  "inputRowsPerSecond" : 153.84615384615384,
  "processedRowsPerSecond" : 3.2258064516129035,
  "durationMs" : {
    "addBatch" : 482,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 50,
    "triggerExecution" : 620,
    "walCommit" : 44
  },
  "eventTime" : {
    "avg" : "1970-01-01T00:01:53.500Z",
    "max" : "1970-01-01T00:02:03.000Z",
    "min" : "1970-01-01T00:01:44.000Z",
    "watermark" : "1970-01-01T00:01:30.000Z"
  },
  "stateOperators" : [ {
    "numRowsTotal" : 2,
    "numRowsUpdated" : 2,
    "memoryUsedBytes" : 2584,
    "customMetrics" : {
      "loadedMapCacheHitCount" : 20,
      "loadedMapCacheMissCount" : 0,
      "stateOnCurrentVersionSizeBytes" : 920
    }
  } ],
  "sources" : [ {
    "description" : "MemoryStream[value#1L]",
    "startOffset" : 0,
    "endOffset" : 1,
    "numInputRows" : 2,
    "inputRowsPerSecond" : 153.84615384615384,
    "processedRowsPerSecond" : 3.2258064516129035
  } ],
  "sink" : {
    "description" : "FileSink[/private/var/folders/wn/3hpqx8015hjbmq43hmrw78z40000gn/T/stream.output-cf800c40-1e18-405e-b48f-71a08348a298]",
    "numOutputRows" : -1
  }
}
{
  "id" : "1bbf91ac-0a24-4da7-bfe8-54ce1ac63e0f",
  "runId" : "ac738c58-a976-4c87-9547-b4a1ee1c2560",
  "name" : null,
  "timestamp" : "2020-03-28T23:33:13.688Z",
  "batchId" : 3,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "addBatch" : 987,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 43,
    "triggerExecution" : 1117,
    "walCommit" : 44
  },
  "eventTime" : {
    "watermark" : "1970-01-01T00:01:53.000Z"
  },
  "stateOperators" : [ {
    "numRowsTotal" : 1,
    "numRowsUpdated" : 0,
    "memoryUsedBytes" : 2512,
    "customMetrics" : {
      "loadedMapCacheHitCount" : 30,
      "loadedMapCacheMissCount" : 0,
      "stateOnCurrentVersionSizeBytes" : 720
    }
  } ],
  "sources" : [ {
    "description" : "MemoryStream[value#1L]",
    "startOffset" : 1,
    "endOffset" : 1,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "FileSink[/private/var/folders/wn/3hpqx8015hjbmq43hmrw78z40000gn/T/stream.output-cf800c40-1e18-405e-b48f-71a08348a298]",
    "numOutputRows" : -1
  }
}
{
  "id" : "1bbf91ac-0a24-4da7-bfe8-54ce1ac63e0f",
  "runId" : "ac738c58-a976-4c87-9547-b4a1ee1c2560",
  "name" : null,
  "timestamp" : "2020-03-28T23:33:14.806Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 0,
    "triggerExecution" : 0
  },
  "eventTime" : {
    "watermark" : "1970-01-01T00:01:53.000Z"
  },
  "stateOperators" : [ {
    "numRowsTotal" : 1,
    "numRowsUpdated" : 0,
    "memoryUsedBytes" : 2512,
    "customMetrics" : {
      "loadedMapCacheHitCount" : 30,
      "loadedMapCacheMissCount" : 0,
      "stateOnCurrentVersionSizeBytes" : 720
    }
  } ],
  "sources" : [ {
    "description" : "MemoryStream[value#1L]",
    "startOffset" : 1,
    "endOffset" : 1,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "FileSink[/private/var/folders/wn/3hpqx8015hjbmq43hmrw78z40000gn/T/stream.output-cf800c40-1e18-405e-b48f-71a08348a298]",
    "numOutputRows" : 0
  }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, we may not want to overwrite the value to 0 if the value is negative - it may be odd if the value has been -1 because the sink doesn't support numOutputRows but sometimes the value is 0.

@brkyvz
Copy link
Contributor Author

brkyvz commented Mar 27, 2020

@HeartSaVioR Thanks for bringing that PR to my attention. We should get that in as well! Would you like to take over both?

@HeartSaVioR
Copy link
Contributor

I'm not sure I get it. My PR fixes a different bug so while there may be conflict between twos, twos are valid by theirselves. Why not deal with this PR (or my PR) quickly and do rebase, and deal with other? I think this PR is getting close to merge.

@SparkQA
Copy link

SparkQA commented Mar 28, 2020

Test build #120539 has finished for PR 28040 at commit 62044dc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 },
AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 1 },
AddData(inputData, 10, 12, 14),
AdvanceManualClock(1000L), // watermark = 5, runs no-data microbatch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This surprises me, although it's not directly related to this PR so treat it as OFF-TOPIC.

Based on the test, it sounds to me as we need to wait for next trigger interval to run no-data microbatch, and we need to run no-data microbatch even input is available. The input is handled in next trigger.

My expectation was that no-data microbatch is consolidated with data microbatch if there's input available. And ideally thinking, no data microbatch should not require to wait for next trigger interval.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I think this is because of manual clock synchronization and the next batch is already planned before the data appears to be there

AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 },
AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 3 },
AssertOnQuery { _.lastProgress.sink.numOutputRows == 0 },
AdvanceManualClock(1000L), // runs batch with no new data and watermark progresses
Copy link
Contributor

@HeartSaVioR HeartSaVioR Mar 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also something hard to understand (requires two trigger intervals instead of one - ideally zero - to run no-data microbatch) but yes this is OFF-TOPIC.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not off-topic actually because (i) not understanding this correctly can lead to flaky tests, and (ii) I was afraid that fixes made in this PR actually changed the semantic behavior of no data batches. But that is not the case. I tested in this unit test myself. I think all the confusion is starting from the fact that you dont need to advance manual clock after StartStream to trigger the first batch. So the first AdvanceManualClock not really necessary. Rather what it is doing is advancing the clock thus allowing the 2nd batch to be automatically triggered as soon as the first batch finishes. This is what is leading to the confusion on why is the second batch not picking up the new data ... that is because the next batch has been unblocked already (i.e., before AddData(10, 12, 14)) with the first AdvancedManualClock. This weird asynchronousness despite using the manual clock makes the test incomprehensible and is also a perfect recipe for flakiness.

Copy link
Contributor

@tdas tdas Apr 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brkyvz here is my proposed test. @HeartSaVioR please take a look and see whether this is more understandable.

    testStream(aggWithWatermark)(
      // batchId 0
      AddData(inputData, 15),
      StartStream(Trigger.ProcessingTime("interval 1 second"), clock),
      CheckAnswer(), // watermark = 0
      AssertOnQuery { _.stateNodes.size === 1 },
      AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 },
      AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 },
      AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 1 },
      AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 0 },

      // batchId 1 without data
      AdvanceManualClock(1000L), // watermark = 5
      Execute { q =>             // wait for the no data batch to complete
        eventually(timeout(streamingTimeout)) { assert(q.lastProgress.batchId === 1) }
      },
      AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 },
      AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 0 },
      AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 1 },
      AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 0 },

      // batchId 2 with data
      AddData(inputData, 10, 12, 14),
      AdvanceManualClock(1000L), // watermark = 5
      CheckAnswer(),
      AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 },
      AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 },
      AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 2 },
      AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 0 },

      // batchId 3 with data
      AddData(inputData, 25),
      AdvanceManualClock(1000L), // watermark = 5
      CheckAnswer(),
      AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 },
      AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 },
      AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 3 },
      AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 0 },

      // batchId 4 without data
      AdvanceManualClock(1000L), // watermark = 15
      Execute { q =>             // wait for the no data batch to complete
        eventually(timeout(streamingTimeout)) { assert(q.lastProgress.batchId === 4) }
      },
      AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 1 },
      AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 0 },
      AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 2 },
      AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 1 }
    )
  }

Copy link
Contributor

@HeartSaVioR HeartSaVioR Apr 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdas

I think all the confusion is starting from the fact that you dont need to advance manual clock after StartStream to trigger the first batch.
Rather what it is doing is advancing the clock thus allowing the 2nd batch to be automatically triggered as soon as the first batch finishes.
This weird asynchronousness despite using the manual clock makes the test incomprehensible and is also a perfect recipe for flakiness.

Ah, nice finding. Great analysis. That's what I've missed (and very confusing behavior TBH). The proposal looks great and provides better understanding. I have comments for new proposal but since the proposal is reflected in PR, I'll comment directly to the PR.

@HeartSaVioR
Copy link
Contributor

Looks like the change brought side-effect and build failure is related. Could you please fix these tests as well?

@brkyvz
Copy link
Contributor Author

brkyvz commented Mar 29, 2020

I reverted the changes with respect to the empty progress. Seemed to be a bit more risky than I'd like, as I'd like to warmfix this into Spark 3.0

@SparkQA
Copy link

SparkQA commented Mar 29, 2020

Test build #120544 has finished for PR 28040 at commit 5fbbf41.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor

Retest this, please

@SparkQA
Copy link

SparkQA commented Mar 29, 2020

Test build #120553 has finished for PR 28040 at commit 5fbbf41.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 0 },
AddData(inputData, 10, 12, 14),
AdvanceManualClock(1000L), // watermark = 5, runs with the just added data
AdvanceManualClock(1000L), // watermark = 0, runs with the just added data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just remove watermark here in comment as you've done with further AdvanceManualClock

@SparkQA
Copy link

SparkQA commented Apr 2, 2020

Test build #120702 has finished for PR 28040 at commit 68848d9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@brkyvz
Copy link
Contributor Author

brkyvz commented Apr 2, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Apr 2, 2020

Test build #120701 has finished for PR 28040 at commit d501127.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor

Looks like the flaky test is below (and this PR made it flaky), not in streaming aggregation.

org.apache.spark.sql.kafka010.KafkaSinkMicroBatchStreamingSuite.streaming - sink progress is produced

@SparkQA
Copy link

SparkQA commented Apr 2, 2020

Test build #120703 has finished for PR 28040 at commit 68848d9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 3, 2020

Test build #120743 has finished for PR 28040 at commit 2c9ed55.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@brkyvz
Copy link
Contributor Author

brkyvz commented Apr 4, 2020

@brkyvz brkyvz changed the title [DO NOT MERGE][SPARK-31278][SS] Fix StreamingQuery output rows metric [SPARK-31278][SS] Fix StreamingQuery output rows metric Apr 4, 2020
@SparkQA
Copy link

SparkQA commented Apr 5, 2020

Test build #120814 has finished for PR 28040 at commit 786d921.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 3 },
AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 0 },
AdvanceManualClock(1000L), // runs batch with no new data and watermark progresses
CheckAnswer(), // watermark = 15, but nothing yet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this will flaky. CheckAnswer() works reliably only when there is new data to process because it waits for the new data's offset to be reported as processed. Here there is no new data in the no-data-batch, so its possible that this CheckAnswer wont wait for the no-data-batch to finish before starting the last progress checks.

Instead its more reliable (probably) to use eventually, where you check that the lastprogress has the expected batchId.

@SparkQA
Copy link

SparkQA commented Apr 7, 2020

Test build #120931 has finished for PR 28040 at commit 68bf147.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@brkyvz
Copy link
Contributor Author

brkyvz commented Apr 7, 2020

Unrelated flaky test: org.apache.spark.sql.hive.thriftserver.CliSuite.SPARK-11188 Analysis error reporting

@brkyvz
Copy link
Contributor Author

brkyvz commented Apr 7, 2020

retest this please


// batchId 1 without data
AdvanceManualClock(1000L), // watermark = 5
Execute { q => // wait for the no data batch to complete
Copy link
Contributor

@HeartSaVioR HeartSaVioR Apr 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Good to have) It might be good to have a new operation to consolidate waiting for "no data batch" and checking the answer (as they have same pattern except the desired batch ID).

Not mandatory to do it in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it'd be nice to provide an inbuilt function for it if this pattern is used more over time

@tdas
Copy link
Contributor

tdas commented Apr 8, 2020

LGTM for this PR. @brkyvz feel free to merge it.

@brkyvz
Copy link
Contributor Author

brkyvz commented Apr 8, 2020

Thanks @HeartSaVioR @tdas for the review. Merging to master/3.0. Let's jump on #25987 next.

@asfgit asfgit closed this in 8ab2a0c Apr 8, 2020
asfgit pushed a commit that referenced this pull request Apr 8, 2020
### What changes were proposed in this pull request?

In Structured Streaming, we provide progress updates every 10 seconds when a stream doesn't have any new data upstream. When providing this progress though, we zero out the input information but not the output information. This PR fixes that bug.

### Why are the changes needed?

Fixes a bug around incorrect metrics

### Does this PR introduce any user-facing change?

Fixes a bug in the metrics

### How was this patch tested?

New regression test

Closes #28040 from brkyvz/sinkMetrics.

Lead-authored-by: Burak Yavuz <[email protected]>
Co-authored-by: Burak Yavuz <[email protected]>
Signed-off-by: Burak Yavuz <[email protected]>
(cherry picked from commit 8ab2a0c)
Signed-off-by: Burak Yavuz <[email protected]>
@SparkQA
Copy link

SparkQA commented Apr 8, 2020

Test build #120932 has finished for PR 28040 at commit 68bf147.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

sjincho pushed a commit to sjincho/spark that referenced this pull request Apr 15, 2020
### What changes were proposed in this pull request?

In Structured Streaming, we provide progress updates every 10 seconds when a stream doesn't have any new data upstream. When providing this progress though, we zero out the input information but not the output information. This PR fixes that bug.

### Why are the changes needed?

Fixes a bug around incorrect metrics

### Does this PR introduce any user-facing change?

Fixes a bug in the metrics

### How was this patch tested?

New regression test

Closes apache#28040 from brkyvz/sinkMetrics.

Lead-authored-by: Burak Yavuz <[email protected]>
Co-authored-by: Burak Yavuz <[email protected]>
Signed-off-by: Burak Yavuz <[email protected]>
@jainshashank24
Copy link

jainshashank24 commented Sep 6, 2020

Hi @HeartSaVioR regarding the above PR i didn't understand the -1 value showing for ElasticSearch Sink for numOutputRows.
What does -1 represent in numOutputRows ?
Is it that it doesnt support for ES Sink or is there some bug in that ?

I can see in the sinkProgress description it is mentioned that
"@param numOutputRows Number of rows written to the sink or -1 for Continuous Mode (temporarily)

  • or Sink V1 (until decommissioned)."

So here does ElasticSearch sink comes under Sink V1 ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants