diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala index 0cce44c6e3d9..76958f055f2e 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala @@ -397,6 +397,22 @@ class ReplE2ESuite extends RemoteSparkSession with BeforeAndAfterEach { assertContains("noException: Boolean = true", output) } + test("broadcast works with REPL generated code") { + val input = + """ + |val add1 = udf((i: Long) => i + 1) + |val tableA = spark.range(2).alias("a") + |val tableB = broadcast(spark.range(2).select(add1(col("id")).alias("id"))).alias("b") + |tableA.join(tableB). + | where(col("a.id")===col("b.id")). + | select(col("a.id").alias("a_id"), col("b.id").alias("b_id")). + | collect(). + | mkString("[", ", ", "]") + |""".stripMargin + val output = runCommandsInShell(input) + assertContains("""String = "[[1,1]]"""", output) + } + test("closure cleaner") { val input = """ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index e839d2c06913..561deacfb72d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicLong import scala.util.control.NonFatal -import org.apache.spark.{ErrorMessageFormat, SparkException, SparkThrowable, SparkThrowableHelper} +import org.apache.spark.{ErrorMessageFormat, JobArtifactSet, SparkException, SparkThrowable, SparkThrowableHelper} import org.apache.spark.SparkContext.{SPARK_JOB_DESCRIPTION, SPARK_JOB_INTERRUPT_ON_CANCEL} import org.apache.spark.internal.Logging import org.apache.spark.internal.config.{SPARK_DRIVER_PREFIX, SPARK_EXECUTOR_PREFIX} @@ -255,7 +255,8 @@ object SQLExecution extends Logging { val activeSession = sparkSession val sc = sparkSession.sparkContext val localProps = Utils.cloneProperties(sc.getLocalProperties) - exec.submit(() => { + val artifactState = JobArtifactSet.getCurrentJobArtifactState.orNull + exec.submit(() => JobArtifactSet.withActiveJobArtifactState(artifactState) { val originalSession = SparkSession.getActiveSession val originalLocalProps = sc.getLocalProperties SparkSession.setActiveSession(activeSession)