Skip to content

Conversation

@hvanhovell
Copy link
Contributor

@hvanhovell hvanhovell commented Oct 15, 2019

What changes were proposed in this pull request?

Observable metrics are named arbitrary aggregate functions that can be defined on a query (Dataframe). As soon as the execution of a Dataframe reaches a completion point (e.g. finishes batch query or reaches streaming epoch) a named event is emitted that contains the metrics for the data processed since the last completion point.

A user can observe these metrics by attaching a listener to spark session, it depends on the execution mode which listener to attach:

  • Batch: QueryExecutionListener. This will be called when the query completes. A user can access the metrics by using the QueryExecution.observedMetrics map.
  • (Micro-batch) Streaming: StreamingQueryListener. This will be called when the streaming query completes an epoch. A user can access the metrics by using the StreamingQueryProgress.observedMetrics map. Please note that we currently do not support continuous execution streaming.

Why are the changes needed?

This enabled observable metrics.

Does this PR introduce any user-facing change?

Yes. It adds the observe method to Dataset.

How was this patch tested?

  • Added unit tests for the CollectMetrics logical node to the AnalysisSuite.
  • Added unit tests for StreamingProgress JSON serialization to the StreamingQueryStatusAndProgressSuite.
  • Added integration tests for streaming to the StreamingQueryListenerSuite.
  • Added integration tests for batch to the DataFrameCallbackSuite.

Observable metrics are named arbitrary aggregate functions that can be defined on a query (Dataframe). As soon as the execution of a Dataframe reaches a completion point (e.g. finishes batch query or reaches streaming epoch) a named event is emitted that contains the metrics for the data processed since the last completion point.

A user can observe these metrics by attaching a listener to spark session, it depends on the execution mode which listener to attach:
- Batch: `QueryExecutionListener`. This will be called when the query completes. A user can access the metrics by using the `QueryExecution.observedMetrics` map.
- Streaming: `StreamingQueryListener`. This will be called when the streaming query completes an epoch. A user can access the metrics by using the `StreamingQueryProgress.observedMetrics` map.

- Added unit tests for the `CollectMetrics` logical node to the `AnalysisSuite`.
- Added unit tests for `StreamingProgress` JSON serialization to the `StreamingQueryStatusAndProgressSuite`.
- Added integration tests for streaming to the `StreamingQueryListenerSuite`.
- Added integration tests for batch to the `DataFrameCallbackSuite`.
@hvanhovell hvanhovell added the SQL label Oct 15, 2019
@hvanhovell hvanhovell changed the title [SPARK-29348][SQL] This PR adds observable metrics to DBR. [SPARK-29348][SQL] This PR adds observable metrics Oct 15, 2019
@hvanhovell hvanhovell changed the title [SPARK-29348][SQL] This PR adds observable metrics [SPARK-29348][SQL] Add observable metrics Oct 15, 2019
@hvanhovell hvanhovell changed the title [SPARK-29348][SQL] Add observable metrics [SPARK-29348][SQL] Add observable Metrics for Streaming queries Oct 15, 2019
@SparkQA
Copy link

SparkQA commented Oct 15, 2019

Test build #112110 has finished for PR 26127 at commit fb36fa6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class CollectMetrics(
  • class DataTypeJsonSerializer extends JsonSerializer[DataType]
  • class DataTypeJsonDeserializer extends JsonDeserializer[DataType]
  • case class CollectMetricsExec(

@HeartSaVioR
Copy link
Contributor

I like an idea in general. Just a sake of understanding, is this meant to be an alternative of #21721 or we still have #21721 unresolved even after this?

And this approach would have same issue pointed out in #21721 - it won't work for continuous processing. I would like to see some discussion/decision that whether we can ignore the continuous processing when designing some API for streaming queries.

@cloud-fan
Copy link
Contributor

This seems nothing to do with DS v2, but a new API in Dataset that allows users to collect metrics.

From the PR description, this should support both micro-batch and continuous streaming, as epoch is a general concept. I haven't looked into the code yet, but do we support continuous streaming in this PR?

@SparkQA
Copy link

SparkQA commented Oct 16, 2019

Test build #112158 has finished for PR 26127 at commit 6937a9a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor

  1. StreamingQueryListener doesn't work with continuous mode. Please refer [SPARK-23887][SS] continuous query progress reporting #24537 which proposes to support StreamingQueryListener in continuous mode. (I haven't had a look in details though... I'd spend my time on continuous mode once we are clear it's still making progress.)

  2. In continuous mode, the task is never completed - the patch seems to do something when task is completed which will never happen.

      // Only publish the value of the accumulator when the task has completed. This is done by
      // updating a task local accumulator ('updater') which will be merged with the actual
      // accumulator as soon as the task completes. This avoids the following problems during the
      // heartbeat:
      // - Correctness issues due to partially completed/visible updates.
      // - Performance issues due to excessive serialization.

@hvanhovell
Copy link
Contributor Author

@cloud-fan @HeartSaVioR the current approach uses ProgressReporter so it currently does not work with ContinuousExecution (See https://issues.apache.org/jira/browse/SPARK-23887). I would rather not wait until this is fixed. At some point ContinuousExecution should report progress through ProgressReporter (the approach taken in #24537), when that we can make observable metrics work with it.

I will update the description to underline that this only works with (micro) batch.

@SparkQA
Copy link

SparkQA commented Oct 17, 2019

Test build #112224 has finished for PR 26127 at commit a58c04e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

If it's justified to have ProgressReporter supports micro-batch only, I think it's also justified to add more features to ProgressReporter.


/**
* Validate that collected metrics names are unique. The same name cannot be used for metrics
* with different results. However multiple instances of metrics with with same result and name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will we eliminate the duplicated metrics (same name and result)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will do that in a follow-up.

* - Has only attributes that are nested inside an aggregate function.
*
* @param e expression to check.
* @param seenAggregate `true` iff one of the parents on the expression is an aggregate function.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we check this in CheckAnalysis? We have a similar check there for Aggregate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* completion point.
* Please note that continuous execution is currently not supported.
*
* The metrics columns must either contain a literal (e.g. lit(42)), or should contain one or
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it follow the same rule of Aggregate#aggregateExpressions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It follows the rules for global aggregate.

val sources: Array[SourceProgress],
val sink: SinkProgress) extends Serializable {
val sink: SinkProgress,
@JsonDeserialize(contentAs = classOf[GenericRowWithSchema])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does it mean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It means that it can assume that the rows are instances of GenericRowWithSchema.

@hvanhovell
Copy link
Contributor Author

Alright, let me update this one.

@hvanhovell
Copy link
Contributor Author

@cloud-fan I have updated, PTAL

@SparkQA
Copy link

SparkQA commented Nov 29, 2019

Test build #114590 has finished for PR 26127 at commit cdc390a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hvanhovell
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Nov 29, 2019

Test build #114636 has finished for PR 26127 at commit cdc390a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

class DataTypeJsonSerializer extends JsonSerializer[DataType] {
private val delegate = new JValueSerializer
override def serialize(
value: DataType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 4 space indentation

StructType.fromAttributes(metricExpressions.map(_.toAttribute))
}

private lazy val toRowConverter: InternalRow => Row = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add comment that it's better to use interpreted version here, as it's not called very frequently,


override def outputPartitioning: Partitioning = child.outputPartitioning

override def outputOrdering: Seq[SortOrder] = child.outputOrdering
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we override canonicalize?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should. If we do that we might eliminate a CollectMetrics operator when reuse exchanges.

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except a few minor comments

@SparkQA
Copy link

SparkQA commented Dec 3, 2019

Test build #114738 has finished for PR 26127 at commit cb69e55.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 3, 2019

Test build #114739 has finished for PR 26127 at commit a07474c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class Subquery(child: LogicalPlan, correlated: Boolean) extends OrderPreservingUnaryNode
  • case class InsertAdaptiveSparkPlan(session: SparkSession) extends Rule[SparkPlan]
  • class HiveThriftServer2AppStatusStore(
  • class HiveThriftServer2HistoryServerPlugin extends AppHistoryServerPlugin

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Dec 3, 2019

Test build #114758 has finished for PR 26127 at commit a07474c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class Subquery(child: LogicalPlan, correlated: Boolean) extends OrderPreservingUnaryNode
  • case class InsertAdaptiveSparkPlan(session: SparkSession) extends Rule[SparkPlan]
  • class HiveThriftServer2AppStatusStore(
  • class HiveThriftServer2HistoryServerPlugin extends AppHistoryServerPlugin

@hvanhovell
Copy link
Contributor Author

Merging this. Thanks for the reviews!

@hvanhovell hvanhovell closed this in d7b268a Dec 3, 2019
attilapiros pushed a commit to attilapiros/spark that referenced this pull request Dec 6, 2019
### What changes were proposed in this pull request?
Observable metrics are named arbitrary aggregate functions that can be defined on a query (Dataframe). As soon as the execution of a Dataframe reaches a completion point (e.g. finishes batch query or reaches streaming epoch) a named event is emitted that contains the metrics for the data processed since the last completion point.

A user can observe these metrics by attaching a listener to spark session, it depends on the execution mode which listener to attach:
- Batch: `QueryExecutionListener`. This will be called when the query completes. A user can access the metrics by using the `QueryExecution.observedMetrics` map.
- (Micro-batch) Streaming: `StreamingQueryListener`. This will be called when the streaming query completes an epoch. A user can access the metrics by using the `StreamingQueryProgress.observedMetrics` map. Please note that we currently do not support continuous execution streaming.

### Why are the changes needed?
This enabled observable metrics.

### Does this PR introduce any user-facing change?
Yes. It adds the `observe` method to `Dataset`.

### How was this patch tested?
- Added unit tests for the `CollectMetrics` logical node to the `AnalysisSuite`.
- Added unit tests for `StreamingProgress` JSON serialization to the `StreamingQueryStatusAndProgressSuite`.
- Added integration tests for streaming to the `StreamingQueryListenerSuite`.
- Added integration tests for batch to the `DataFrameCallbackSuite`.

Closes apache#26127 from hvanhovell/SPARK-29348.

Authored-by: herman <[email protected]>
Signed-off-by: herman <[email protected]>
executedPlan.execute(), sparkSession.sessionState.conf)

/** Get the metrics observed during the execution of the query plan. */
def observedMetrics: Map[String, Row] = CollectMetricsExec.collect(executedPlan)
Copy link
Member

@gatorsmile gatorsmile Dec 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on this PR description, observedMetrics sounds like a public API for end users to use.

However, it conflicts with what we commented above [in toRDD]. QueryExecution is not a public class and we discourage users from calling these internal functions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think queryExecution is exposed in Dataset as an unstable API. I think the comments in the class implies that as well:

* The primary workflow for executing relational queries using Spark. Designed to allow easy
* access to the intermediate phases of query execution for developers.
*
* While this is not a public class, we should avoid changing the function names for the sake of
* changing them, because a lot of developers use the feature for debugging.

I agree that using methods in this class here is discouraged though. Maybe we had to mark Dataset.observe as an unstable API or developer API too for now if it is difficult to avoid adding and using an API here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, QueryExecution should be public class, as two methods in QueryExecutionListener are marked as @DeveloperApi and the signature of methods contain QueryExecution. If we worry about restriction for modifying QueryException due to being public class, we may need to have other class only for reporting to QueryExecutionListener.

Copy link
Member

@HyukjinKwon HyukjinKwon Dec 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DeveloperApi implies:

A lower-level, unstable API intended for developers.

I agree that It might have to be considered a public class but unstable. I am sure most of methods there are discouraged even for developers to use without knowing exactly what it does.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah my bad. Thanks for pointing it out.

Though I still feel listener stuff is more alike public API, as it can be used more than debug purpose. QueryExecutionListener and StreamingQueryListener take different approach, QueryExecutionListener directly exposes low-level internal instance and let end users use with caution, whereas StreamingQueryListener has individual classes which only contain information for report only, not exposing internal one.

IMHO, I feel StreamingQueryListener is the right way to go, but QueryExecutionListener has been served long ago, so no strong opinion.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, StreamingQueryProgress.observedMetrics with StreamingQueryListener seems fine. The problem here looks only QueryExecution.observedMetrics with QueryExecutionListener, which looks having a contradiction about its stability. It seems it has to be fixed by either avoid adding it to QueryExecution or explicitly marking Dataset.observe as an unstable API (or developer API).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. We should make the batch listener not rely on QueryExecution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what the issue is?

By definition anything in QueryExecution is internal, unstable, API. The reason that I added it here is that is a good narrow waist to add this, you want to collect the metrics for the entire query. It is public because most other methods in this class are public; this allows for some clever integrations (for the more adventurous developer) and makes debugging easier.

The batch listener is marked as experimental. A developer should be warned when (s)he uses this API for anything (including using it to collect observable metrics). I am not sure how realistic stabilizing the batch listener API is if you include QueryExecution (or a stabilized version of it). We could expose a stable callback, e,g. onObservedMetrics(...), for observed metrics. On the other hand is kind of annoying that we can't expose this through the Dataframe itself, and that is because when we execute the Dataframe we often use a different one under the hood.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is not about this PR itself, but about the bath query listener relying on QueryExecution which is not going to be public. I think the streaming query listener is better. It's still unstable now but we are going to make it stable in the future.

Copy link
Contributor

@HeartSaVioR HeartSaVioR Dec 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The batch listener is no longer marked as experimental in 3.0, as there have been some of slight modifications (enough for @Evolving) but it doesn't change majorly during 4 years of life - see #25558. If we feel concerned it could be rolled back to Experimental/Unstable, though I know there're couple of projects in Spark ecosystem already leveraging it, and it represents that the possibility is not restricted to the debug purpose.

The major feature of batch listener is that (unlikely of streaming query listener which summarizes information and stores into a new data structure) it exposes various plans for both logical/physical, and I don't imagine these plans will be used for execution-purposes. Mostly read-only. It may not unrealistic if we could provide these plans for read-only (cloned, can't execute), but yeah, it may not be easy as it seems (not enough familiar with it).

*
* A user can observe these metrics by either adding
* [[org.apache.spark.sql.streaming.StreamingQueryListener]] or a
* [[org.apache.spark.sql.util.QueryExecutionListener]] to the spark session.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have an example with QueryExecutionListener too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll raise a PR for this, as I found a slight bug in example for streaming as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raised #27046. Please note that the patch relies on the current status.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you want to encourage users to use an unstable API?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to show the documented usage.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and mark this as unstable too .. given that it relies on an unstable APIs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's sort it out a bit.

Was the batch use case excluded by intention due to the use of DeveloperApi? If that's the intention, then yes #27046 can just fix the slight bug on streaming example.

If the concern is focused on having QueryExecution as private API and exposing it to DeveloperApi, I think it's time to fix that, as no one would complain for the change on major release

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't document unstable features unless you commit to stabilizing them in the short term. Please don't add that example. A savy developer will know how work with this, and can weight the risk involved themselves.

The same goes for unstable. Please don't mark it as unstable, the function itself is stable. It is stable for streaming. The batch case relies on QueryExecutionListener and isn't, however that API is marked as such so there shouldn't be any confusion there.

@HeartSaVioR it would great if we can fix the streaming example.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK thanks for clarification. Totally makes sense. Will update the PR there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I offline discussed with @hvanhovell. I am okie.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants