Skip to content

Conversation

@HyukjinKwon
Copy link
Member

What changes were proposed in this pull request?

This PR proposes to factor the common attributes out from FlatMapGroupsWithStateExec to FlatMapGroupsWithStateExecBase.

Why are the changes needed?

There are a lot of stuff to share if you implement another version of FlatMapGroupsWithStateExec.
Should better factor them out. This is also part of #37285 which demonstrates how the refactored trait is used.

Does this PR introduce any user-facing change?

No, this is refactoring-only.

How was this patch tested?

Existing test cases should cover it.

@HyukjinKwon
Copy link
Member Author

cc @HeartSaVioR and @viirya FYI

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks OK in general. Minor/nit comments.


private val isTimeoutEnabled = timeoutConf != NoTimeout
private val watermarkPresent = child.output.exists {
val stateInfo: Option[StatefulOperatorStateInfo]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stateInfo / eventTimeWatermark <= would they work if we change them to protected?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember I exposed them for a reason at that time.. let me change this to protected and see if it passes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error is like this:

[error] /.../spark/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala:51:17: overriding method stateInfo in trait StatefulOperator of type => Option[org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo];
[error]  value stateInfo has weaker access privileges; it should be public
[error]   protected val stateInfo: Option[StatefulOperatorStateInfo]
[error]                 ^
[error] /.../spark/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala:57:17: overriding method eventTimeWatermark in trait WatermarkSupport of type => Option[Long];
[error]  value eventTimeWatermark has weaker access privileges; it should be public
[error]   protected val eventTimeWatermark: Option[Long]
[error]                 ^
[error] two errors found
[error] (sql / Compile / compileIncremental) Compilation failed
[error] Total time: 160 s (02:40), completed Sep 13, 2022 1:03:01 PM

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah OK that was from another trait. Thanks for explanation.

@HyukjinKwon
Copy link
Member Author

Let me get this in - I am pretty much confident that this is the right change too :-).

Merged to master.

import org.apache.spark.util.{CompletionIterator, SerializableConfiguration}

/**
* Physical operator for executing `FlatMapGroupsWithState`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's nice if we could update this doc too.

private val getValueObj =
ObjectOperator.deserializeRowToObject(valueDeserializer, dataAttributes)
private val getOutputRow = ObjectOperator.wrapObjectToRow(outputObjectType)
abstract class InputProcessor(store: StateStore) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe good to have some doc for this abstract class?

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks okay just minor comments about doc.

@HyukjinKwon HyukjinKwon deleted the SPARK-40411 branch January 15, 2024 00:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants