Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions connect/src/main/protobuf/spark/connect/expressions.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ message Expression {
UnresolvedAttribute unresolved_attribute = 2;
UnresolvedFunction unresolved_function = 3;
ExpressionString expression_string = 4;
UnresolvedStar unresolved_star = 5;
}

message Literal {
Expand Down Expand Up @@ -155,4 +156,7 @@ message Expression {
string expression = 1;
}

// UnresolvedStar is used to expand all the fields of a relation or struct.
message UnresolvedStar {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) {
rel: proto.Project,
common: Option[proto.RelationCommon]): LogicalPlan = {
val baseRel = transformRelation(rel.getInput)
val projection = if (rel.getExpressionsCount == 0) {
// TODO: support the target field for *.
val projection =
if (rel.getExpressionsCount == 1 && rel.getExpressions(0).hasUnresolvedStar) {
Seq(UnresolvedStar(Option.empty))
} else {
rel.getExpressionsList.asScala.map(transformExpression).map(UnresolvedAlias(_))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._

import org.apache.spark.SparkFunSuite
import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.Expression.UnresolvedStar
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

Expand Down Expand Up @@ -91,6 +92,22 @@ class SparkConnectPlannerSuite extends SparkFunSuite with SparkConnectPlanTest {
assert(res.nodeName == "UnresolvedRelation")
}

test("Simple Project") {
val readWithTable = proto.Read.newBuilder()
.setNamedTable(proto.Read.NamedTable.newBuilder.addParts("name").build())
.build()
val project =
proto.Project.newBuilder()
.setInput(proto.Relation.newBuilder().setRead(readWithTable).build())
.addExpressions(
proto.Expression.newBuilder()
.setUnresolvedStar(UnresolvedStar.newBuilder().build()).build()
).build()
val res = transform(proto.Relation.newBuilder.setProject(project).build())
assert(res !== null)
assert(res.nodeName == "Project")
}

test("Simple Sort") {
val sort = proto.Sort.newBuilder
.addAllSortFields(Seq(proto.Sort.SortField.newBuilder().build()).asJava)
Expand Down
130 changes: 130 additions & 0 deletions core/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -3002,5 +3002,135 @@
"message" : [
"Failed to execute command because subquery expressions are not allowed in DEFAULT values."
]
},
"_LEGACY_ERROR_TEMP_2000" : {
"message" : [
"<message>. If necessary set <ansiConfig> to false to bypass this error."
]
},
"_LEGACY_ERROR_TEMP_2001" : {
"message" : [
"<message> If necessary set <ansiConfig> to false to bypass this error"
]
},
"_LEGACY_ERROR_TEMP_2002" : {
"message" : [
"<message>"
]
},
"_LEGACY_ERROR_TEMP_2003" : {
"message" : [
"Unsuccessful try to zip maps with <size> unique keys due to exceeding the array size limit <maxRoundedArrayLength>"
]
},
"_LEGACY_ERROR_TEMP_2004" : {
"message" : [
"no default for type <dataType>"
]
},
"_LEGACY_ERROR_TEMP_2005" : {
"message" : [
"Type <dataType> does not support ordered operations"
]
},
"_LEGACY_ERROR_TEMP_2006" : {
"message" : [
"The specified group index cannot be less than zero"
]
},
"_LEGACY_ERROR_TEMP_2007" : {
"message" : [
"Regex group count is <groupCount>, but the specified group index is <groupIndex>"
]
},
"_LEGACY_ERROR_TEMP_2008" : {
"message" : [
"Find an invalid url string <url>. If necessary set <ansiConfig> to false to bypass this error."
]
},
"_LEGACY_ERROR_TEMP_2009" : {
"message" : [
"dataType"
]
},
"_LEGACY_ERROR_TEMP_2010" : {
"message" : [
"Window Functions do not support merging."
]
},
"_LEGACY_ERROR_TEMP_2011" : {
"message" : [
"Unexpected data type <dataType>"
]
},
"_LEGACY_ERROR_TEMP_2012" : {
"message" : [
"Unexpected type <dataType>"
]
},
"_LEGACY_ERROR_TEMP_2013" : {
"message" : [
"Negative values found in <frequencyExpression>"
]
},
"_LEGACY_ERROR_TEMP_2014" : {
"message" : [
"<funcName> is not matched at addNewFunction"
]
},
"_LEGACY_ERROR_TEMP_2015" : {
"message" : [
"Cannot generate <codeType> code for incomparable type: <dataType>"
]
},
"_LEGACY_ERROR_TEMP_2016" : {
"message" : [
"Can not interpolate <arg> into code block."
]
},
"_LEGACY_ERROR_TEMP_2017" : {
"message" : [
"not resolved"
]
},
"_LEGACY_ERROR_TEMP_2018" : {
"message" : [
"class `<cls>` is not supported by `MapObjects` as resulting collection."
]
},
"_LEGACY_ERROR_TEMP_2019" : {
"message" : [
"Cannot use null as map key!"
]
},
"_LEGACY_ERROR_TEMP_2020" : {
"message" : [
"Couldn't find a valid constructor on <cls>"
]
},
"_LEGACY_ERROR_TEMP_2021" : {
"message" : [
"Couldn't find a primary constructor on <cls>"
]
},
"_LEGACY_ERROR_TEMP_2022" : {
"message" : [
"Unsupported natural join type <joinType>"
]
},
"_LEGACY_ERROR_TEMP_2023" : {
"message" : [
"Unresolved encoder expected, but <attr> was found."
]
},
"_LEGACY_ERROR_TEMP_2024" : {
"message" : [
"Only expression encoders are supported for now."
]
},
"_LEGACY_ERROR_TEMP_2025" : {
"message" : [
"<className> must override either <m1> or <m2>"
]
}
}
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2119,7 +2119,9 @@ class SparkContext(config: SparkConf) extends Logging {
Utils.tryLogNonFatalError {
_plugins.foreach(_.shutdown())
}
FallbackStorage.cleanUp(_conf, _hadoopConfiguration)
Utils.tryLogNonFatalError {
FallbackStorage.cleanUp(_conf, _hadoopConfiguration)
}
Utils.tryLogNonFatalError {
_eventLogger.foreach(_.stop())
}
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import scala.io.Codec
import scala.language.implicitConversions
import scala.ref.WeakReference
import scala.reflect.{classTag, ClassTag}
import scala.util.hashing

import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
import org.apache.hadoop.io.{BytesWritable, NullWritable, Text}
Expand All @@ -50,7 +49,7 @@ import org.apache.spark.util.Utils
import org.apache.spark.util.collection.{ExternalAppendOnlyMap, OpenHashMap,
Utils => collectionUtils}
import org.apache.spark.util.random.{BernoulliCellSampler, BernoulliSampler, PoissonSampler,
SamplingUtils}
SamplingUtils, XORShiftRandom}

/**
* A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
Expand Down Expand Up @@ -505,7 +504,7 @@ abstract class RDD[T: ClassTag](
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
var position = new XORShiftRandom(index).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
Expand Down
16 changes: 12 additions & 4 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2890,10 +2890,18 @@ private[spark] class DAGScheduler(
}

def stop(): Unit = {
messageScheduler.shutdownNow()
shuffleMergeFinalizeScheduler.shutdownNow()
eventProcessLoop.stop()
taskScheduler.stop()
Utils.tryLogNonFatalError {
messageScheduler.shutdownNow()
}
Utils.tryLogNonFatalError {
shuffleMergeFinalizeScheduler.shutdownNow()
}
Utils.tryLogNonFatalError {
eventProcessLoop.stop()
}
Utils.tryLogNonFatalError {
taskScheduler.stop()
}
}

eventProcessLoop.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -972,15 +972,23 @@ private[spark] class TaskSchedulerImpl(
}

override def stop(): Unit = {
speculationScheduler.shutdown()
Utils.tryLogNonFatalError {
speculationScheduler.shutdown()
}
if (backend != null) {
backend.stop()
Utils.tryLogNonFatalError {
backend.stop()
}
}
if (taskResultGetter != null) {
taskResultGetter.stop()
Utils.tryLogNonFatalError {
taskResultGetter.stop()
}
}
if (barrierCoordinator != null) {
barrierCoordinator.stop()
Utils.tryLogNonFatalError {
barrierCoordinator.stop()
}
}
starvationTimer.cancel()
abortTimer.cancel()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,9 @@ private[storage] class BlockManagerDecommissioner(
.sortBy(b => (b.shuffleId, b.mapId))
shufflesToMigrate.addAll(newShufflesToMigrate.map(x => (x, 0)).asJava)
migratingShuffles ++= newShufflesToMigrate
val remainedShuffles = migratingShuffles.size - numMigratedShuffles.get()
logInfo(s"${newShufflesToMigrate.size} of ${localShuffles.size} local shuffles " +
s"are added. In total, ${migratingShuffles.size} shuffles are remained.")
s"are added. In total, $remainedShuffles shuffles are remained.")

// Update the threads doing migrations
val livePeerSet = bm.getPeers(false).toSet
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/connect/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def _build_metrics(self, metrics: "pb2.Response.Metrics") -> typing.List[PlanMet
def sql(self, sql_string: str) -> "DataFrame":
return DataFrame.withPlan(SQL(sql_string), self)

def collect(self, plan: pb2.Plan) -> pandas.DataFrame:
def _to_pandas(self, plan: pb2.Plan) -> pandas.DataFrame:
req = pb2.Request()
req.user_context.user_id = self._user_id
req.plan.CopyFrom(plan)
Expand Down
10 changes: 6 additions & 4 deletions python/pyspark/sql/connect/data_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
TYPE_CHECKING,
)

import pandas

import pyspark.sql.connect.plan as plan
from pyspark.sql.connect.column import (
ColumnOrString,
Expand Down Expand Up @@ -225,11 +227,11 @@ def _print_plan(self) -> str:
return ""

def collect(self):
query = self._plan.collect(self._session)
return self._session.collect(query)
raise NotImplementedError("Please use toPandas().")

def toPandas(self):
return self.collect()
def toPandas(self) -> pandas.DataFrame:
query = self._plan.collect(self._session)
return self._session._to_pandas(query)

def explain(self) -> str:
query = self._plan.collect(self._session)
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/sql/tests/connect/test_spark_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class SparkConnectTests(SparkConnectSQLTestCase):
def test_simple_read(self) -> None:
"""Tests that we can access the Spark Connect GRPC service locally."""
df = self.connect.read.table(self.tbl_name)
data = df.limit(10).collect()
data = df.limit(10).toPandas()
# Check that the limit is applied
assert len(data.index) == 10

Expand All @@ -67,7 +67,7 @@ def conv_udf(x) -> str:

u = udf(conv_udf)
df = self.connect.read.table(self.tbl_name)
result = df.select(u(df.id)).collect()
result = df.select(u(df.id)).toPandas()
assert result is not None

def test_simple_explain_string(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1470,6 +1470,11 @@ private[spark] object Client extends Logging {
addClasspathEntry(getClusterPath(sparkConf, cp), env)
}

val cpSet = extraClassPath match {
case Some(classPath) if Utils.isTesting => classPath.split(File.pathSeparator).toSet
case _ => Set.empty[String]
}

addClasspathEntry(Environment.PWD.$$(), env)

addClasspathEntry(Environment.PWD.$$() + Path.SEPARATOR + LOCALIZED_CONF_DIR, env)
Expand Down Expand Up @@ -1513,7 +1518,13 @@ private[spark] object Client extends Logging {
}

sys.env.get(ENV_DIST_CLASSPATH).foreach { cp =>
addClasspathEntry(getClusterPath(sparkConf, cp), env)
// SPARK-40635: during the test, add a jar de-duplication process to avoid
// that the startup command can't be executed due to the too long classpath.
val newCp = if (Utils.isTesting) {
cp.split(File.pathSeparator)
.filterNot(cpSet.contains).mkString(File.pathSeparator)
} else cp
addClasspathEntry(getClusterPath(sparkConf, newCp), env)
}

// Add the localized Hadoop config at the end of the classpath, in case it contains other
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark._
import org.apache.spark.internal.config._
import org.apache.spark.network.shuffledb.DBBackend
import org.apache.spark.network.yarn.{YarnShuffleService, YarnTestAccessor}
import org.apache.spark.tags.ExtendedYarnTest
import org.apache.spark.tags.{ExtendedLevelDBTest, ExtendedYarnTest}

/**
* SPARK-34828: Integration test for the external shuffle service with an alternate name and
Expand Down Expand Up @@ -77,6 +77,7 @@ abstract class YarnShuffleAlternateNameConfigSuite extends YarnShuffleIntegratio
}
}
}
@ExtendedLevelDBTest
@ExtendedYarnTest
class YarnShuffleAlternateNameConfigWithLevelDBBackendSuite
extends YarnShuffleAlternateNameConfigSuite {
Expand Down
Loading