Skip to content

Conversation

@pengzhon-db
Copy link
Contributor

@pengzhon-db pengzhon-db commented May 11, 2023

What changes were proposed in this pull request?

This change adds a new spark connect relation type CachedDataFrame, which can represent a DataFrame that's been cached on the server side.

On the server side, each (userId, sessionId) has a map to cache DataFrame. DataFrame will be removed from cache when the corresponding session expires. (The caller can also evict the DataFrame from cache earlier, depending on the logic.)

On the client side, a new relation type and function is added. The new function can create a DataFrame reference given a key. The key is the id of a cached DataFrame, which is usually passed from server to the client. When transforming the DataFrame reference, the server finds the actual DataFrame from the cache and replace it.

One use case of this function will be streaming foreachBatch(). Server needs to call user function for every batch which takes a DataFrame as argument. With the new function, we can cache the DataFrame on the server. Pass the id back to client which can creates the DataFrame reference.

Why are the changes needed?

This change is needed to support streaming foreachBatch() in Spark Connect.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Scala unit test.
Manual test.
(More end to end test will be added when foreachBatch() is supported. Currently there is no way to add a dataframe to the server cache using Python.)

@pengzhon-db pengzhon-db changed the title Spark connect function to create dataframe ref [SPARK-43474] [SS] [CONNECT] Add a spark connect function to create DataFrame reference May 12, 2023
// Represents a DataFrame that has been cached on server.
message CachedDataFrame {
// (Required) An identifier of the user which cached the dataframe
string userId = 1;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also just get userId and sessionId from server via request, instead of passing from here.
But that would require we update transformRelation() to take into two more parameters, which means all all those transform...() need to be updated to have two more parameter.

@pengzhon-db
Copy link
Contributor Author

@rangadi can u review this PR?

Copy link

@rangadi rangadi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Over all LGTM. Made a few comments.
@grundprinzip, @amaliujia, @hvanhovell could one of you take a quick look? This is used for representing runtime Dataframe that is result of a microbatch (needed for foreachBatch()).

@pengzhon-db pengzhon-db force-pushed the spark_connect_create_df_ref branch from fefed84 to 4924f91 Compare June 8, 2023 23:30
Copy link

@rangadi rangadi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon could you take a look. This look good to me.

val SessionHolder(userId, sessionId, session) = notification.getValue
val blockManager = session.sparkContext.env.blockManager
blockManager.removeCache(userId, sessionId)
cachedDataFrameManager.remove(userId, sessionId)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: This reference should be removed from streaming engine once the foreach batch completes..

* accessed from the same user within the same session. The DataFrame will be removed from the
* cache when the session expires.
*/
private[connect] class SparkConnectCachedDataFrameManager extends Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do we need Logging? It is not used?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed. See continuation of this PR : #41580

Comment on lines +38 to +46
private val dataFrameCache = mutable.Map[(String, String), mutable.Map[String, DataFrame]]()

def put(userId: String, sessionId: String, dataFrameId: String, value: DataFrame): Unit =
synchronized {
val sessionKey = (userId, sessionId)
val sessionDataFrameMap = dataFrameCache
.getOrElseUpdate(sessionKey, mutable.Map[String, DataFrame]())
sessionDataFrameMap.put(dataFrameId, value)
}
Copy link
Contributor

@zhenlineo zhenlineo Jun 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using two concurrent hash maps + compute to avoid synchronized? For example:

    dataFrameCache.compute(sessionKey, (key, sessionDataFrameMap) => {
      val newMap = if (sessionDataFrameMap == null) new ConcurrentHashMap[String, String]() else sessionDataFrameMap
      newMap.put(dataFrameId, value)
    })

Similar logics apply for remove.
For get, you just need to get without the need to explicitly lock.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only my personal taste:

I feel like synchronized is easy to reason compared to ConcurrentHashMap for code readers. Unless there is significantly performance gain somehow if we switch to a concurrent data structure.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, we could user ConcurrentHashMap. But I often end up preferring synchronized as well. Since this is not perf critical (used only for certain DFs), though I am not sure if there is any perf difference.
Added @GuardedBy annotation.
See the the continuation of this PR here: https://github.com/apache/spark/pull/41580/files#diff-1a8933e9723f5497c3991441c7ff21fe43db63d483354af9a0113043ea600b3eR42

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks fine but would be great if @grundprinzip finds some time to take a look.

Copy link
Contributor

@grundprinzip grundprinzip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that the better design would be to embed the rel cache into the SessionHolder() because that is already keyed on the user ID and session ID.

Comment on lines +402 to +405
string userId = 1;

// (Required) An identifier of the Spark session in which the relation is cached
string sessionId = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The user, session ID can't be trusted coming from the proto. THe cached relation must only have the actual unique ID of the relation ID and the rest is resolved from the context of the query.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. This is important. Changed the implementation to use SparkSession as the key (it has as sessionUUID)
[continue the discussion here]

Comment on lines +792 to +795
SparkConnectService.cachedDataFrameManager
.get(rel.getUserId, rel.getSessionId, rel.getRelationId)
.logicalPlan
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conceptually, the cached data should come from the session holder that could be passed to the planner instead.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. For now proposing to keep it in separate class. Continue discussion here.

* accessed from the same user within the same session. The DataFrame will be removed from the
* cache when the session expires.
*/
private[connect] class SparkConnectCachedDataFrameManager extends Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add this class to the session holder to make sure that this is properly associated to the right user ID and session.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed above. SessionHolder is not accessible yet. Also removed session_id and user_id from this cache, instead making it key on actual Spark session (user_id & session_id is implicit in that)

Comment on lines +40 to +46
def put(userId: String, sessionId: String, dataFrameId: String, value: DataFrame): Unit =
synchronized {
val sessionKey = (userId, sessionId)
val sessionDataFrameMap = dataFrameCache
.getOrElseUpdate(sessionKey, mutable.Map[String, DataFrame]())
sessionDataFrameMap.put(dataFrameId, value)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will make this easier as well because you only have one concurrent map


createDataFrame.__doc__ = PySparkSession.createDataFrame.__doc__

def _createCachedDataFrame(self, relationId: str) -> "DataFrame":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems to be unused here?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed. It will used in foreachBatch implementation (in follow up PRs)

Copy link

@rangadi rangadi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed the feedback in continuation PR: #41580
Please see replies here, but comment in the above PR.

Comment on lines +402 to +405
string userId = 1;

// (Required) An identifier of the Spark session in which the relation is cached
string sessionId = 2;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. This is important. Changed the implementation to use SparkSession as the key (it has as sessionUUID)
[continue the discussion here]

Comment on lines +792 to +795
SparkConnectService.cachedDataFrameManager
.get(rel.getUserId, rel.getSessionId, rel.getRelationId)
.logicalPlan
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. For now proposing to keep it in separate class. Continue discussion here.

* accessed from the same user within the same session. The DataFrame will be removed from the
* cache when the session expires.
*/
private[connect] class SparkConnectCachedDataFrameManager extends Logging {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed. See continuation of this PR : #41580

* accessed from the same user within the same session. The DataFrame will be removed from the
* cache when the session expires.
*/
private[connect] class SparkConnectCachedDataFrameManager extends Logging {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed above. SessionHolder is not accessible yet. Also removed session_id and user_id from this cache, instead making it key on actual Spark session (user_id & session_id is implicit in that)

Comment on lines +38 to +46
private val dataFrameCache = mutable.Map[(String, String), mutable.Map[String, DataFrame]]()

def put(userId: String, sessionId: String, dataFrameId: String, value: DataFrame): Unit =
synchronized {
val sessionKey = (userId, sessionId)
val sessionDataFrameMap = dataFrameCache
.getOrElseUpdate(sessionKey, mutable.Map[String, DataFrame]())
sessionDataFrameMap.put(dataFrameId, value)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, we could user ConcurrentHashMap. But I often end up preferring synchronized as well. Since this is not perf critical (used only for certain DFs), though I am not sure if there is any perf difference.
Added @GuardedBy annotation.
See the the continuation of this PR here: https://github.com/apache/spark/pull/41580/files#diff-1a8933e9723f5497c3991441c7ff21fe43db63d483354af9a0113043ea600b3eR42


createDataFrame.__doc__ = PySparkSession.createDataFrame.__doc__

def _createCachedDataFrame(self, relationId: str) -> "DataFrame":
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed. It will used in foreachBatch implementation (in follow up PRs)

@rangadi
Copy link

rangadi commented Jun 14, 2023

Please note that updates to this PR are in another PR: #41580

@hvanhovell hvanhovell closed this Jun 14, 2023
HyukjinKwon pushed a commit that referenced this pull request Jun 29, 2023
…frames by ID

[This is a continuation of #41146, to change the author of the PR. Retains the description.]

### What changes were proposed in this pull request?

This change adds a new spark connect relation type `CachedRemoteRelation`, which can represent a DataFrame that's been cached on the server side.

On the server side, each `SessionHolder` has a cache to maintain mapping from Dataframe ID to actual dataframe.

On the client side, a new relation type and function is added. The new function can create a DataFrame reference given a key. The key is the id of a cached DataFrame, which is usually passed from server to the client. When transforming the DataFrame reference, the server finds the actual DataFrame from the cache and replace it.

One use case of this function will be streaming foreachBatch(). Server needs to call user function for every batch which takes a DataFrame as argument. With the new function, we can cache the DataFrame on the server. Pass the id back to client which can creates the DataFrame reference.

### Why are the changes needed?

This change is needed to support streaming foreachBatch() in Spark Connect.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Scala unit test.
Manual test.
(More end to end test will be added when foreachBatch() is supported. Currently there is no way to add a dataframe to the server cache using Python.)

Closes #41580 from rangadi/df-ref.

Authored-by: Raghu Angadi <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants