Skip to content

Conversation

@arunmahadevan
Copy link
Contributor

What changes were proposed in this pull request?

Currently a "StreamingQueryListener" can only be registered programatically. We could have a new config "spark.sql.streamingQueryListeners" similar to "spark.sql.queryExecutionListeners" and "spark.extraListeners" for users to register custom streaming listeners.

How was this patch tested?

New unit test and running example programs.

Please review http://spark.apache.org/contributing.html before opening a pull request.

@arunmahadevan
Copy link
Contributor Author

ping @tdas @jose-torres @HeartSaVioR

@arunmahadevan arunmahadevan changed the title SPARK-24480: Added config for registering streamingQueryListeners SPARK-24479: Added config for registering streamingQueryListeners Jun 7, 2018
@HyukjinKwon
Copy link
Member

Mind fixing the PR title to [SPARK-24479][SS] Added config for registering streamingQueryListeners?

private var lastTerminatedQuery: StreamingQuery = null

sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS)
.foreach { classNames =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach { classNames =>
  ...
}

Copy link

@merlintang merlintang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM +1


import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryStartedEvent, QueryTerminatedEvent, _}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: import looks a bit odd {QueryStartedEvent, QueryTerminatedEvent, _}.

@SparkQA
Copy link

SparkQA commented Jun 7, 2018

Test build #91510 has finished for PR 21504 at commit d3a3baa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • .doc(\"List of class names implementing StreamingQueryListener that will be automatically \" +

@arunmahadevan arunmahadevan changed the title SPARK-24479: Added config for registering streamingQueryListeners [SPARK-24479][SS] Added config for registering streamingQueryListeners Jun 7, 2018
@arunmahadevan
Copy link
Contributor Author

@HyukjinKwon , thanks for reviewing. Addressed comments.

@SparkQA
Copy link

SparkQA commented Jun 7, 2018

Test build #91532 has finished for PR 21504 at commit f721ebe.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Utils.loadExtensions(classOf[StreamingQueryListener], classNames,
sparkSession.sparkContext.conf).foreach(addListener)
}

Copy link

@merlintang merlintang Jun 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two comments here:

  1. we need to log the registration information here
  2. we need to use try catch for this, it is possible that register fail. this would break the job.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Addressed, please check.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems fine.

Utils.loadExtensions(classOf[StreamingQueryListener], classNames,
sparkSession.sparkContext.conf).foreach(listener => {
addListener(listener)
logInfo(s"Registered listener ${listener.getClass.getName}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would do this at debug level ..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either debug or info is fine for me, since it would add just couple of log lines only once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since its only once and provides information to user I guess info is fine. Similar pattern here https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L2359

}
} catch {
case e: Exception =>
throw new SparkException(s"Exception when registering StreamingQueryListener", e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s seems not needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except comments from @HyukjinKwon.

@SparkQA
Copy link

SparkQA commented Jun 8, 2018

Test build #91540 has finished for PR 21504 at commit 02b2973.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@SparkQA
Copy link

SparkQA commented Jun 8, 2018

Test build #91570 has finished for PR 21504 at commit 421e16b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Jun 8, 2018

Test build #91577 has finished for PR 21504 at commit 421e16b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor

retest this, please

@SparkQA
Copy link

SparkQA commented Jun 9, 2018

Test build #91585 has finished for PR 21504 at commit 421e16b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor

Test failures were from kafka.

retest this, please

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Jun 11, 2018

Test build #91645 has finished for PR 21504 at commit 421e16b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

.toSequence
.createOptional

val STREAMING_QUERY_LISTENERS = buildStaticConf("spark.sql.streamingQueryListeners")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe -> spark.sql.streaming.streamingQueryListeners for consistency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok makes sense. renamed.

@arunmahadevan
Copy link
Contributor Author

@HyukjinKwon , addressed comments. Can you take it forward?

@HyukjinKwon
Copy link
Member

Looks fine but let me leave this open for more days before merging it in case someone has some comments on it.

@SparkQA
Copy link

SparkQA commented Jun 12, 2018

Test build #91715 has finished for PR 21504 at commit b87c90b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Jun 13, 2018

Test build #91737 has finished for PR 21504 at commit b87c90b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.



override protected def sparkConf: SparkConf =
super.sparkConf.set("spark.sql.streamingQueryListeners",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems a mistake here.

Change-Id: I030ef8a27ec2962170981f2d1b4ed16165e2aef9
@jose-torres
Copy link
Contributor

(Don't block on me - I won't have time to review in detail unless needed, but broadly the PR looks fine)

@SparkQA
Copy link

SparkQA commented Jun 13, 2018

Test build #91741 has finished for PR 21504 at commit bbec6ce.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Jun 13, 2018

Test build #91761 has finished for PR 21504 at commit bbec6ce.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Merged to master.

@asfgit asfgit closed this in 7703b46 Jun 13, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants