Skip to content

[SPARK-29450][SS] Measure the number of output rows for streaming aggregation with append mode#26104

Closed
HeartSaVioR wants to merge 1 commit intoapache:masterfrom
HeartSaVioR:SPARK-29450
Closed

[SPARK-29450][SS] Measure the number of output rows for streaming aggregation with append mode#26104
HeartSaVioR wants to merge 1 commit intoapache:masterfrom
HeartSaVioR:SPARK-29450

Conversation

@HeartSaVioR
Copy link
Copy Markdown
Contributor

@HeartSaVioR HeartSaVioR commented Oct 13, 2019

What changes were proposed in this pull request?

This patch addresses missing metric, the number of output rows for streaming aggregation with append mode. Other modes are correctly measuring it.

Why are the changes needed?

Without the patch, the value for such metric is always 0.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit test added. Also manually tested with below query:

query

import spark.implicits._

spark.conf.set("spark.sql.shuffle.partitions", "5")

val df = spark.readStream
  .format("rate")
  .option("rowsPerSecond", 1000)
  .load()
  .withWatermark("timestamp", "5 seconds")
  .selectExpr("timestamp", "mod(value, 100) as mod", "value")
  .groupBy(window($"timestamp", "10 seconds"), $"mod")
  .agg(max("value").as("max_value"), min("value").as("min_value"), avg("value").as("avg_value"))

val query = df
  .writeStream
  .format("memory")
  .option("queryName", "test")
  .outputMode("append")
  .start()

query.awaitTermination()

before the patch

screenshot-before-SPARK-29450

after the patch

screenshot-after-SPARK-29450

@HeartSaVioR
Copy link
Copy Markdown
Contributor Author

cc. @tdas @zsxwing @jose-torres @gaborgsomogyi
also cc. to @jaceklaskowski as reporter of the issue

@SparkQA
Copy link
Copy Markdown

SparkQA commented Oct 13, 2019

Test build #111992 has finished for PR 26104 at commit bf14f96.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Copy Markdown
Contributor Author

I've added the query and screenshots to show which issue this PR fixes.

@HeartSaVioR
Copy link
Copy Markdown
Contributor Author

retest this, please

@SparkQA
Copy link
Copy Markdown

SparkQA commented Nov 18, 2019

Test build #113983 has finished for PR 26104 at commit bf14f96.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Copy Markdown
Contributor Author

@tdas @zsxwing @jose-torres @gaborgsomogyi Kindly reminder.

Copy link
Copy Markdown
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks fine to me.

@dongjoon-hyun
Copy link
Copy Markdown
Member

Retest this please.

@SparkQA
Copy link
Copy Markdown

SparkQA commented Dec 19, 2019

Test build #115529 has finished for PR 26104 at commit bf14f96.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Copy Markdown
Member

Merged to master.

@HeartSaVioR
Copy link
Copy Markdown
Contributor Author

Thanks all for reviewing and merging!

@HeartSaVioR HeartSaVioR deleted the SPARK-29450 branch December 19, 2019 09:58
finished = true
null
} else {
numOutputRows += 1
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A regression introduced in #18107 ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, looks like so. The API seemed to be revised completely in #18107 and I don't have background though.

@gatorsmile
Copy link
Copy Markdown
Member

@HeartSaVioR Could you help backport this to 2.4?

@HeartSaVioR
Copy link
Copy Markdown
Contributor Author

Ah yes I didn't get the intention on comment and now I'm seeing the intention. Happy to submit a PR for porting back of this. Thanks!

HeartSaVioR added a commit to HeartSaVioR/spark that referenced this pull request Jan 15, 2020
…regation with append mode

### What changes were proposed in this pull request?

This patch addresses missing metric, the number of output rows for streaming aggregation with append mode. Other modes are correctly measuring it.

### Why are the changes needed?

Without the patch, the value for such metric is always 0.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Unit test added. Also manually tested with below query:

> query

```
import spark.implicits._

spark.conf.set("spark.sql.shuffle.partitions", "5")

val df = spark.readStream
  .format("rate")
  .option("rowsPerSecond", 1000)
  .load()
  .withWatermark("timestamp", "5 seconds")
  .selectExpr("timestamp", "mod(value, 100) as mod", "value")
  .groupBy(window($"timestamp", "10 seconds"), $"mod")
  .agg(max("value").as("max_value"), min("value").as("min_value"), avg("value").as("avg_value"))

val query = df
  .writeStream
  .format("memory")
  .option("queryName", "test")
  .outputMode("append")
  .start()

query.awaitTermination()
```

> before the patch

![screenshot-before-SPARK-29450](https://user-images.githubusercontent.com/1317309/69023217-58d7bc80-0a01-11ea-8cac-40f1cced6d16.png)

> after the patch

![screenshot-after-SPARK-29450](https://user-images.githubusercontent.com/1317309/69023221-5c6b4380-0a01-11ea-8a66-7bf1b7d09fc7.png)

Closes apache#26104 from HeartSaVioR/SPARK-29450.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants