Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2992,6 +2992,14 @@ class SparkConnectPlanner(
// the SQL command and defer the actual analysis and execution to the flow function.
if (insidePipelineFlowFunction) {
result.setRelation(relation)
executeHolder.eventsManager.postFinished()
responseObserver.onNext(
ExecutePlanResponse
.newBuilder()
.setSessionId(sessionHolder.sessionId)
.setServerSideSessionId(sessionHolder.serverSessionId)
.setSqlCommandResult(result)
.build)
return
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -850,4 +850,98 @@ class SparkDeclarativePipelinesServerSuite
}
}
}

test(
"SPARK-54452: spark.sql() inside a pipeline flow function should return a sql_command_result") {
withRawBlockingStub { implicit stub =>
val graphId = createDataflowGraph
val pipelineAnalysisContext = proto.PipelineAnalysisContext
.newBuilder()
.setDataflowGraphId(graphId)
.setFlowName("flow1")
.build()
val userContext = proto.UserContext
.newBuilder()
.addExtensions(com.google.protobuf.Any.pack(pipelineAnalysisContext))
.setUserId("test_user")
.build()

val relation = proto.Plan
.newBuilder()
.setCommand(
proto.Command
.newBuilder()
.setSqlCommand(
proto.SqlCommand
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I ask why we use different setSqlCommand in two test cases? Can we use the following like the other test case in the same way?

            .setSqlCommand(
              proto.SqlCommand
                .newBuilder()
                .setInput(proto.Relation
                  .newBuilder()
                  .setSql(proto.SQL.newBuilder().setQuery("SELECT * FROM RANGE(5)"))
                  .build())
                .build())
            .build())

.newBuilder()
.setInput(
proto.Relation
.newBuilder()
.setRead(proto.Read
.newBuilder()
.setNamedTable(
proto.Read.NamedTable.newBuilder().setUnparsedIdentifier("table"))
.build())
.build()))
.build())
.build()

val sparkSqlRequest = proto.ExecutePlanRequest
.newBuilder()
.setUserContext(userContext)
.setPlan(relation)
.setSessionId(UUID.randomUUID().toString)
.build()
val sparkSqlResponse = stub.executePlan(sparkSqlRequest).next()
assert(sparkSqlResponse.hasSqlCommandResult)
assert(
sparkSqlResponse.getSqlCommandResult.getRelation ==
relation.getCommand.getSqlCommand.getInput)
}
}

test(
"SPARK-54452: spark.sql() outside a pipeline flow function should return a " +
"sql_command_result") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we expect the same behavior? According to the log, it is guarded by insidePipelineFlowFunction condition, isn't it?

if (insidePipelineFlowFunction) {
result.setRelation(relation)
return
}

withRawBlockingStub { implicit stub =>
val graphId = createDataflowGraph
val pipelineAnalysisContext = proto.PipelineAnalysisContext
.newBuilder()
.setDataflowGraphId(graphId)
.build()
val userContext = proto.UserContext
.newBuilder()
.addExtensions(com.google.protobuf.Any.pack(pipelineAnalysisContext))
.setUserId("test_user")
.build()

val relation = proto.Plan
.newBuilder()
.setCommand(
proto.Command
.newBuilder()
.setSqlCommand(
proto.SqlCommand
.newBuilder()
.setInput(proto.Relation
.newBuilder()
.setSql(proto.SQL.newBuilder().setQuery("SELECT * FROM RANGE(5)"))
.build())
.build())
.build())
.build()

val sparkSqlRequest = proto.ExecutePlanRequest
.newBuilder()
.setUserContext(userContext)
.setPlan(relation)
.setSessionId(UUID.randomUUID().toString)
.build()
val sparkSqlResponse = stub.executePlan(sparkSqlRequest).next()
assert(sparkSqlResponse.hasSqlCommandResult)
assert(
sparkSqlResponse.getSqlCommandResult.getRelation ==
relation.getCommand.getSqlCommand.getInput)
}
}
}