From 181e053c4a56ec323625e3242cfacec32e9f927f Mon Sep 17 00:00:00 2001 From: Yuheng Chang Date: Fri, 21 Nov 2025 10:38:26 -0800 Subject: [PATCH 1/4] fix --- .../spark/sql/connect/planner/SparkConnectPlanner.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 644784fa3db6c..045fd9e83e06c 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -2992,6 +2992,13 @@ class SparkConnectPlanner( // the SQL command and defer the actual analysis and execution to the flow function. if (insidePipelineFlowFunction) { result.setRelation(relation) + responseObserver.onNext( + ExecutePlanResponse + .newBuilder() + .setSessionId(sessionHolder.sessionId) + .setServerSideSessionId(sessionHolder.serverSessionId) + .setSqlCommandResult(result) + .build) return } From 5ab76bcaa589d578d8a272db6a7d27541e724f97 Mon Sep 17 00:00:00 2001 From: Yuheng Chang Date: Fri, 21 Nov 2025 11:23:17 -0800 Subject: [PATCH 2/4] add tests --- .../connect/planner/SparkConnectPlanner.scala | 1 + ...SparkDeclarativePipelinesServerSuite.scala | 45 +++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 045fd9e83e06c..9af2e7cb46616 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -2992,6 +2992,7 @@ class SparkConnectPlanner( // the SQL command and defer the actual analysis and execution to the flow function. if (insidePipelineFlowFunction) { result.setRelation(relation) + executeHolder.eventsManager.postFinished() responseObserver.onNext( ExecutePlanResponse .newBuilder() diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala index c9551646385c2..78b41b67ba5fa 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala @@ -850,4 +850,49 @@ class SparkDeclarativePipelinesServerSuite } } } + + test("spark.sql() inside a pipeline flow function should return a sql_command_result") { + withRawBlockingStub { implicit stub => + val graphId = createDataflowGraph + val pipelineAnalysisContext = proto.PipelineAnalysisContext + .newBuilder() + .setDataflowGraphId(graphId) + .setFlowName("flow1") + .build() + val userContext = proto.UserContext + .newBuilder() + .addExtensions(com.google.protobuf.Any.pack(pipelineAnalysisContext)) + .setUserId("test_user") + .build() + + val relation = proto.Plan + .newBuilder() + .setCommand( + proto.Command + .newBuilder() + .setSqlCommand( + proto.SqlCommand + .newBuilder() + .setInput( + proto.Relation + .newBuilder() + .setRead(proto.Read + .newBuilder() + .setNamedTable( + proto.Read.NamedTable.newBuilder().setUnparsedIdentifier("table")) + .build()) + .build())) + .build()) + .build() + + val sparkSqlRequest = proto.ExecutePlanRequest + .newBuilder() + .setUserContext(userContext) + .setPlan(relation) + .build() + val sparkSqlResponse = stub.executePlan(sparkSqlRequest).next() + assert(sparkSqlResponse.hasSqlCommandResult) + assert(sparkSqlResponse.getSqlCommandResult.getRelation == relation) + } + } } From a629e97d10a97e7cec1245638db6546a86040cef Mon Sep 17 00:00:00 2001 From: Yuheng Chang Date: Fri, 21 Nov 2025 11:27:55 -0800 Subject: [PATCH 3/4] fix tests --- .../pipelines/SparkDeclarativePipelinesServerSuite.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala index 78b41b67ba5fa..3c0a4c3678bc7 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala @@ -889,10 +889,13 @@ class SparkDeclarativePipelinesServerSuite .newBuilder() .setUserContext(userContext) .setPlan(relation) + .setSessionId(UUID.randomUUID().toString) .build() val sparkSqlResponse = stub.executePlan(sparkSqlRequest).next() assert(sparkSqlResponse.hasSqlCommandResult) - assert(sparkSqlResponse.getSqlCommandResult.getRelation == relation) + assert( + sparkSqlResponse.getSqlCommandResult.getRelation == + relation.getCommand.getSqlCommand.getInput) } } } From 048057c5784255213fff25aa38a67667031c7a24 Mon Sep 17 00:00:00 2001 From: Yuheng Chang Date: Fri, 21 Nov 2025 12:54:20 -0800 Subject: [PATCH 4/4] address comment --- ...SparkDeclarativePipelinesServerSuite.scala | 48 ++++++++++++++++++- 1 file changed, 47 insertions(+), 1 deletion(-) diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala index 3c0a4c3678bc7..3cb45fa6e1720 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala @@ -851,7 +851,8 @@ class SparkDeclarativePipelinesServerSuite } } - test("spark.sql() inside a pipeline flow function should return a sql_command_result") { + test( + "SPARK-54452: spark.sql() inside a pipeline flow function should return a sql_command_result") { withRawBlockingStub { implicit stub => val graphId = createDataflowGraph val pipelineAnalysisContext = proto.PipelineAnalysisContext @@ -898,4 +899,49 @@ class SparkDeclarativePipelinesServerSuite relation.getCommand.getSqlCommand.getInput) } } + + test( + "SPARK-54452: spark.sql() outside a pipeline flow function should return a " + + "sql_command_result") { + withRawBlockingStub { implicit stub => + val graphId = createDataflowGraph + val pipelineAnalysisContext = proto.PipelineAnalysisContext + .newBuilder() + .setDataflowGraphId(graphId) + .build() + val userContext = proto.UserContext + .newBuilder() + .addExtensions(com.google.protobuf.Any.pack(pipelineAnalysisContext)) + .setUserId("test_user") + .build() + + val relation = proto.Plan + .newBuilder() + .setCommand( + proto.Command + .newBuilder() + .setSqlCommand( + proto.SqlCommand + .newBuilder() + .setInput(proto.Relation + .newBuilder() + .setSql(proto.SQL.newBuilder().setQuery("SELECT * FROM RANGE(5)")) + .build()) + .build()) + .build()) + .build() + + val sparkSqlRequest = proto.ExecutePlanRequest + .newBuilder() + .setUserContext(userContext) + .setPlan(relation) + .setSessionId(UUID.randomUUID().toString) + .build() + val sparkSqlResponse = stub.executePlan(sparkSqlRequest).next() + assert(sparkSqlResponse.hasSqlCommandResult) + assert( + sparkSqlResponse.getSqlCommandResult.getRelation == + relation.getCommand.getSqlCommand.getInput) + } + } }