Skip to content

[SPARK-21669] Internal API for collecting metrics/stats during FileFormatWriter jobs#18884

Closed
adrian-ionescu wants to merge 10 commits intoapache:masterfrom
adrian-ionescu:write-stats-tracker-api
Closed

[SPARK-21669] Internal API for collecting metrics/stats during FileFormatWriter jobs#18884
adrian-ionescu wants to merge 10 commits intoapache:masterfrom
adrian-ionescu:write-stats-tracker-api

Conversation

@adrian-ionescu
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

This patch introduces an internal interface for tracking metrics and/or statistics on data on the fly, as it is being written to disk during a FileFormatWriter job and partially reimplements SPARK-20703 in terms of it.

The interface basically consists of 3 traits:

  • WriteTaskStats: just a tag for classes that represent statistics collected during a WriteTask
    The only constraint it adds is that the class should be Serializable, as instances of it will be collected on the driver from all executors at the end of the WriteJob.
  • WriteTaskStatsTracker: a trait for classes that can actually compute statistics based on tuples that are processed by a given WriteTask and eventually produce a WriteTaskStats instance.
  • WriteJobStatsTracker: a trait for classes that act as containers of Serializable state that's necessary for instantiating WriteTaskStatsTracker on executors and finally process the resulting collection of WriteTaskStats, once they're gathered back on the driver.

Potential future use of this interface is e.g. CBO stats maintenance during INSERT INTO table ... operations.

How was this patch tested?

Existing tests for SPARK-20703 exercise the new code: hive/SQLMetricsSuite, sql/JavaDataFrameReaderWriterSuite, etc.

@rxin
Copy link
Copy Markdown
Contributor

rxin commented Aug 8, 2017

Jenkins, add to white list.

class BasicWriteTaskStatsTracker(hadoopConf: Configuration)
extends WriteTaskStatsTracker {

var numPartitions: Int = 0
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private[this] ?

* It is therefore important that such an objects is [[Serializable]], as it will be sent
* from the driver to all executors.
*/
trait WriteJobStatsTracker
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so i think the general approach is that the final implementation should add serializable, and the trait shouldn't ...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No strong preference, just curious.. why is that preferable?
The way I see it, this way you're sure to get it; otherwise you might forget to mix it in and then you'll only realize it at runtime when faced with a "task not serializable" exception.
Is there some disadvantage to mixing it into the trait?

@SparkQA
Copy link
Copy Markdown

SparkQA commented Aug 8, 2017

Test build #3885 has finished for PR 18884 at commit 3665f2f.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

hadoopConf = hadoopConf,
partitionColumns = partitionColumns,
bucketSpec = None,
statsTrackers = Nil,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we want that as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might, but it wasn't originally handled in #18159 and so FileStreamSink doesn't extend DataWritingCommand.
@viirya, do you remember if there was any particular reason for this, or was it just overlooked / deemed out of scope?
Anyway, I could try to add handling for it in this PR, but I'd say it's rather orthogonal.

Copy link
Copy Markdown
Member

@viirya viirya Aug 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think FileStreamSink is not a RunnableCommand? #18159 adds data writing metrics for certain RunnableCommand.

Do we have a common physical node representing data writing operation in a FileStreamSink so we can bind its SQLMetric and update the metrics after insertion? Looks like the addBatch accepts arbitrary DataFrame and writes the data from it directly.

I'd think it might be another issue.

assert(statsTrackers.length == statsPerTracker.length,
s"""Every WriteTask should have produced one `WriteTaskStats` object for every tracker.
|statsTrackers = ${statsTrackers}
|statsPerTracker = ${statsPerTracker}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of many states, this might result big error message. Just printing length is enough?

* the corresponding [[WriteTaskStats]] from all executors.
*/
private def processStats(
statsTrackers: Seq[WriteJobStatsTracker],
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current framework looks like the trackers can't share the collection of states or some same metrics. Isn't a likely happened use case? When two trackers needs the same metrics, we will need to collect it in two copies of stats.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if that's a common use case..
But if you do need to share some stats in two trackers, I can think of two solutions within the current framework:

  1. in the processStats of the first tracker store the somewhere (e.g. catalog) and then retrieve them during processStats of the second tracker
  2. replace the two trackers with a single, combined one (inheritance / composition)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because some metrics might be costly, I was just thinking of a case that more than one tracker need to use few overlapping metrics. Instead of having measuring the metrics in two different collections of states, measuring once and having just one copy of the metrics seems more reasonable. For now, this may lead to overdesign, just curious how we can deal with it easily. We can consider that once we hit that.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If 2 trackers have overlapped metrics, I think we probably need to combine them into one (inheritance / composition).

@rxin
Copy link
Copy Markdown
Contributor

rxin commented Aug 9, 2017

This looks good to me, but I didn't review super carefully. It's a great clean-up and abstraction.

cc @cloud-fan

partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
statsTrackers: Seq[WriteJobStatsTracker],
refreshFunction: (Seq[ExecutedWriteSummary]) => Unit,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of having the refreshFunction, can we just let this write method return Seq[ExecutedWriteSummary]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or return Set[String] as updated partitions


val numStatsTrackers = statsTrackers.length
assert(statsPerTask.forall(_.length == numStatsTrackers),
s"""Every WriteTask should have produced one `WriteTaskStats` object for every tracker.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: $numStatsTrackers

* Process the given collection of stats computed during this job.
* E.g. aggregate them, write them to memory / disk, issue warnings, whatever.
* @param stats One [[WriteTaskStats]] object from each successful write task.
* @note The type here is too generic. These classes should probably be parametrized:
Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan Aug 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should be only one space before @note

@cloud-fan
Copy link
Copy Markdown
Contributor

LGTM except some minor comments, great clean up!

@cloud-fan
Copy link
Copy Markdown
Contributor

LGTM, pending jenkins

@cloud-fan
Copy link
Copy Markdown
Contributor

retest this please

@SparkQA
Copy link
Copy Markdown

SparkQA commented Aug 10, 2017

Test build #80493 has finished for PR 18884 at commit 7ec545b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait WriteJobStatsTracker extends Serializable

@rxin
Copy link
Copy Markdown
Contributor

rxin commented Aug 10, 2017

Merging in master.

@asfgit asfgit closed this in 95ad960 Aug 10, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants