-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-40460][SS] Fix streaming metrics when selecting _metadata
#37905
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |
| package org.apache.spark.sql.execution.streaming | ||
|
|
||
| import scala.collection.mutable.{Map => MutableMap} | ||
| import scala.collection.mutable | ||
|
|
||
| import org.apache.spark.sql.{Dataset, SparkSession} | ||
| import org.apache.spark.sql.catalyst.encoders.RowEncoder | ||
|
|
@@ -553,7 +554,7 @@ class MicroBatchExecution( | |
| logDebug(s"Running batch $currentBatchId") | ||
|
|
||
| // Request unprocessed data from all sources. | ||
| newData = reportTimeTaken("getBatch") { | ||
| val mutableNewData = mutable.Map.empty ++ reportTimeTaken("getBatch") { | ||
| availableOffsets.flatMap { | ||
| case (source: Source, available: Offset) | ||
| if committedOffsets.get(source).map(_ != available).getOrElse(true) => | ||
|
|
@@ -590,7 +591,7 @@ class MicroBatchExecution( | |
| val newBatchesPlan = logicalPlan transform { | ||
| // For v1 sources. | ||
| case StreamingExecutionRelation(source, output, catalogTable) => | ||
| newData.get(source).map { dataPlan => | ||
| mutableNewData.get(source).map { dataPlan => | ||
| val hasFileMetadata = output.exists { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. looking at the code, seems the problem is we resolve the metadata columns in every micro-batch. Shouldn't we only resolve it once?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It will require Source to indicate the request of metadata column and produce the logical plan accordingly when getBatch is called. My understanding is that DSv1 source does not have an interface to receive the information of which columns will be referred in actual query. |
||
| case FileSourceMetadataAttribute(_) => true | ||
| case _ => false | ||
|
|
@@ -608,6 +609,11 @@ class MicroBatchExecution( | |
| } | ||
| newRelation | ||
| } | ||
| // SPARK-40460: overwrite the entry with the new logicalPlan | ||
| // because it might contain the _metadata column. It is a necessary change, | ||
| // in the ProgressReporter, we use the following mapping to get correct streaming metrics: | ||
| // streaming logical plan (with sources) <==> trigger's logical plan <==> executed plan | ||
| mutableNewData.put(source, finalDataPlan) | ||
| val maxFields = SQLConf.get.maxToStringFields | ||
| assert(output.size == finalDataPlan.output.size, | ||
| s"Invalid batch: ${truncatedString(output, ",", maxFields)} != " + | ||
|
|
@@ -623,14 +629,14 @@ class MicroBatchExecution( | |
|
|
||
| // For v2 sources. | ||
| case r: StreamingDataSourceV2Relation => | ||
| newData.get(r.stream).map { | ||
| mutableNewData.get(r.stream).map { | ||
| case OffsetHolder(start, end) => | ||
| r.copy(startOffset = Some(start), endOffset = Some(end)) | ||
| }.getOrElse { | ||
| LocalRelation(r.output, isStreaming = true) | ||
| } | ||
| } | ||
|
|
||
| newData = mutableNewData.toMap | ||
| // Rewire the plan to use the new attributes that were returned by the source. | ||
| val newAttributePlan = newBatchesPlan.transformAllExpressionsWithPruning( | ||
| _.containsPattern(CURRENT_LIKE)) { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While we are here, probably less intrusive change would be moving (L594 ~ L610) to L567. After the change we wouldn't need to make a change to newData here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I initially thought about that, but we need to know the
outputfromStreamingExecutionRelation(source, output, catalogTable)to resolve_metadataright (L591 ~ L593)?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, you're right. I missed that.
Btw, looks like my change (tagging catalogTable into LogicalRelation) will also fall into this bug. Thanks for fixing this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Np - an unintentional fix :-)
Thanks for helping!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe, we may want to check the case of self-union / self-join to verify we really didn't break things. This works only when this condition is true
leaf : source = 1 : 1(otherwise we are overwriting the value in map), while the code comment of ProgressReporter tells there are counter cases.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it - could you share an example? In this case, does that mean the
leaf : source = 1 : N?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code comment actually doesn't say much and I'm speculating. Let's just try a best effort, self-union and self-join.
df = spark.readStream... -> df.union(df)/df = spark.readStream... -> df.join(df)