Skip to content

Conversation

@HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Dec 30, 2019

What changes were proposed in this pull request?

This patch fixes a small bug in the example of streaming query, as the type of observable metrics is Java Map instead of Scala Map, so to use foreach it should be converted first.

Why are the changes needed?

Described above.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Ran below query via spark-shell:

Streaming

import scala.collection.JavaConverters._
import scala.util.Random
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.asScala.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
        println(s"alert! error ratio: $ratio")
      }
    }
  }

  def onQueryStarted(event: QueryStartedEvent): Unit = {}
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
})

val rates = spark
  .readStream
  .format("rate")
  .option("rowsPerSecond", 10)
  .load

val rand = new Random()
val df = rates.map { row => (row.getLong(1), if (row.getLong(1) % 2 == 0) "error" else null) }.toDF
val ds = df.selectExpr("_1 AS id", "_2 AS error")
// Observe row count (rc) and error row count (erc) in the batch Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("console").start()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a minor bug on example: unlike observedMetrics available in QueryExecutionListener, event.progress.observedMetrics is a "Java" Map instead of "Scala" Map, hence to use foreach it needs asScala to convert to Scala Map.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds fine. The alternative is to wrap it in Option(....get(...)).foreach but that's messier

@HeartSaVioR
Copy link
Contributor Author

Thanks for reviewing, reflected review comments, as well as updated PR description to reflect the change.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good by me

@SparkQA
Copy link

SparkQA commented Dec 30, 2019

Test build #115935 has finished for PR 27046 at commit 6dfa13f.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 30, 2019

Test build #115931 has finished for PR 27046 at commit 8398cf4.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 30, 2019

Test build #115936 has finished for PR 27046 at commit 4c16552.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry last nit. this one too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI @hvanhovell as I believe you're already using this API; this changes the method signature.

@hvanhovell
Copy link
Contributor

See my comments on this in #26127. I am -1 on this in its current form. I am fine with fixing the example.

@HeartSaVioR
Copy link
Contributor Author

For the record, the link of key comment is here: #26127 (comment)

@HeartSaVioR HeartSaVioR changed the title [SPARK-29348][SQL][FOLLOWUP] Add example of batch query for observable metrics [SPARK-29348][SQL][FOLLOWUP] Fix slight bug on streaming example for Dataset.observe Dec 30, 2019
@HeartSaVioR
Copy link
Contributor Author

Just updated. Please take a look again. Thanks in advance!

@SparkQA
Copy link

SparkQA commented Dec 30, 2019

Test build #115940 has finished for PR 27046 at commit 4c16552.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Discussed with @hvanhovell offline as well. Yup, I am good with it.

@SparkQA
Copy link

SparkQA commented Dec 30, 2019

Test build #115954 has finished for PR 27046 at commit 2878cdb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Merged to master.

@HeartSaVioR
Copy link
Contributor Author

Thanks for reviewing and merging!

@HeartSaVioR HeartSaVioR deleted the SPARK-29348-FOLLOWUP branch December 30, 2019 16:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants