Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class PrometheusServletSuite extends SparkFunSuite with PrivateMethodTester {
val key = "local-1592132938718.driver.LiveListenerBus." +
"listenerProcessingTime.org.apache.spark.HeartbeatReceiver"
val sink = createPrometheusServlet()
val suffix = sink invokePrivate PrivateMethod[String]('normalizeKey)(key)
val suffix = sink invokePrivate PrivateMethod[String](Symbol("normalizeKey"))(key)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this be broken again if Scala removes symbol completely?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK seems they won't. Then I'm OK with it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems so but I don't know they have a plan to remove Symbol completely.
At least Scala 3 seems to have Symbol itself.
https://scalacenter.github.io/scala-3-migration-guide/docs/incompatibilities/dropped-features.html

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quoting the sentence in the doc:

Although scala.Symbol is useful for migration, beware that it is deprecated and that it will be removed from the scala-library. You are recommended, as a second step, to replace them with plain string literals "xwy" or a dedicated class.

unless the sentence is incorrect I would consider it as targeted as future removal.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed. Scala 3 seems to deprecate Symbol.

assert(suffix == "metrics_local_1592132938718_driver_LiveListenerBus_" +
"listenerProcessingTime_org_apache_spark_HeartbeatReceiver_")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ object SimpleTypedAggregator {
.getOrCreate()

import spark.implicits._
val ds = spark.range(20).select(('id % 3).as("key"), 'id).as[(Long, Long)]
val ds = spark.range(20).select((Symbol("id") % 3).as("key"), Symbol("id")).as[(Long, Long)]
println("input data:")
ds.show()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
import testImplicits._

test("roundtrip in to_avro and from_avro - int and string") {
val df = spark.range(10).select('id, 'id.cast("string").as("str"))
val df = spark.range(10).select(Symbol("id"), Symbol("id").cast("string").as("str"))

val avroDF = df.select(
functions.to_avro('id).as("a"),
functions.to_avro('str).as("b"))
functions.to_avro(Symbol("id")).as("a"),
functions.to_avro(Symbol("str")).as("b"))
val avroTypeLong = s"""
|{
| "type": "int",
Expand All @@ -54,13 +54,14 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
|}
""".stripMargin
checkAnswer(avroDF.select(
functions.from_avro('a, avroTypeLong),
functions.from_avro('b, avroTypeStr)), df)
functions.from_avro(Symbol("a"), avroTypeLong),
functions.from_avro(Symbol("b"), avroTypeStr)), df)
}

test("roundtrip in to_avro and from_avro - struct") {
val df = spark.range(10).select(struct('id, 'id.cast("string").as("str")).as("struct"))
val avroStructDF = df.select(functions.to_avro('struct).as("avro"))
val df = spark.range(10).select(
struct(Symbol("id"), Symbol("id").cast("string").as("str")).as("struct"))
val avroStructDF = df.select(functions.to_avro(Symbol("struct")).as("avro"))
val avroTypeStruct = s"""
|{
| "type": "record",
Expand All @@ -72,13 +73,13 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
|}
""".stripMargin
checkAnswer(avroStructDF.select(
functions.from_avro('avro, avroTypeStruct)), df)
functions.from_avro(Symbol("avro"), avroTypeStruct)), df)
}

test("handle invalid input in from_avro") {
val count = 10
val df = spark.range(count).select(struct('id, 'id.as("id2")).as("struct"))
val avroStructDF = df.select(functions.to_avro('struct).as("avro"))
val df = spark.range(count).select(struct(Symbol("id"), Symbol("id").as("id2")).as("struct"))
val avroStructDF = df.select(functions.to_avro(Symbol("struct")).as("avro"))
val avroTypeStruct = s"""
|{
| "type": "record",
Expand All @@ -93,15 +94,15 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
intercept[SparkException] {
avroStructDF.select(
functions.from_avro(
'avro, avroTypeStruct, Map("mode" -> "FAILFAST").asJava)).collect()
Symbol("avro"), avroTypeStruct, Map("mode" -> "FAILFAST").asJava)).collect()
}

// For PERMISSIVE mode, the result should be row of null columns.
val expected = (0 until count).map(_ => Row(Row(null, null)))
checkAnswer(
avroStructDF.select(
functions.from_avro(
'avro, avroTypeStruct, Map("mode" -> "PERMISSIVE").asJava)),
Symbol("avro"), avroTypeStruct, Map("mode" -> "PERMISSIVE").asJava)),
expected)
}

Expand Down Expand Up @@ -161,9 +162,9 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession {

test("SPARK-27506: roundtrip in to_avro and from_avro with different compatible schemas") {
val df = spark.range(10).select(
struct('id.as("col1"), 'id.cast("string").as("col2")).as("struct")
struct(Symbol("id").as("col1"), Symbol("id").cast("string").as("col2")).as("struct")
)
val avroStructDF = df.select(functions.to_avro('struct).as("avro"))
val avroStructDF = df.select(functions.to_avro(Symbol("struct")).as("avro"))
val actualAvroSchema =
s"""
|{
Expand All @@ -190,20 +191,24 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
|""".stripMargin

val expected = spark.range(10).select(
struct('id.as("col1"), 'id.cast("string").as("col2"), lit("").as("col3")).as("struct")
struct(
Symbol("id").as("col1"),
Symbol("id").cast("string").as("col2"),
lit("").as("col3")).as("struct")
)

checkAnswer(
avroStructDF.select(
functions.from_avro(
'avro,
Symbol("avro"),
actualAvroSchema,
Map("avroSchema" -> evolvedAvroSchema).asJava)),
expected)
}

test("roundtrip in to_avro and from_avro - struct with nullable Avro schema") {
val df = spark.range(10).select(struct('id, 'id.cast("string").as("str")).as("struct"))
val df = spark.range(10).select(
struct(Symbol("id"), Symbol("id").cast("string").as("str")).as("struct"))
val avroTypeStruct = s"""
|{
| "type": "record",
Expand All @@ -214,13 +219,14 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
| ]
|}
""".stripMargin
val avroStructDF = df.select(functions.to_avro('struct, avroTypeStruct).as("avro"))
val avroStructDF = df.select(functions.to_avro(Symbol("struct"), avroTypeStruct).as("avro"))
checkAnswer(avroStructDF.select(
functions.from_avro('avro, avroTypeStruct)), df)
functions.from_avro(Symbol("avro"), avroTypeStruct)), df)
}

test("to_avro with unsupported nullable Avro schema") {
val df = spark.range(10).select(struct('id, 'id.cast("string").as("str")).as("struct"))
val df = spark.range(10).select(
struct(Symbol("id"), Symbol("id").cast("string").as("str")).as("struct"))
for (unsupportedAvroType <- Seq("""["null", "int", "long"]""", """["int", "long"]""")) {
val avroTypeStruct = s"""
|{
Expand All @@ -233,7 +239,7 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
|}
""".stripMargin
val message = intercept[SparkException] {
df.select(functions.to_avro('struct, avroTypeStruct).as("avro")).show()
df.select(functions.to_avro(Symbol("struct"), avroTypeStruct).as("avro")).show()
}.getCause.getMessage
assert(message.contains("Only UNION of a null type and a non-null type is supported"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession {
withTempDir { dir =>
val expected = timestampInputData.map(t => Row(new Timestamp(t._1)))
val timestampAvro = timestampFile(dir.getAbsolutePath)
val df = spark.read.format("avro").load(timestampAvro).select('timestamp_millis)
val df = spark.read.format("avro").load(timestampAvro).select(Symbol("timestamp_millis"))

checkAnswer(df, expected)

Expand All @@ -137,7 +137,7 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession {
withTempDir { dir =>
val expected = timestampInputData.map(t => Row(new Timestamp(t._2)))
val timestampAvro = timestampFile(dir.getAbsolutePath)
val df = spark.read.format("avro").load(timestampAvro).select('timestamp_micros)
val df = spark.read.format("avro").load(timestampAvro).select(Symbol("timestamp_micros"))

checkAnswer(df, expected)

Expand All @@ -151,8 +151,8 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession {
test("Logical type: user specified output schema with different timestamp types") {
withTempDir { dir =>
val timestampAvro = timestampFile(dir.getAbsolutePath)
val df =
spark.read.format("avro").load(timestampAvro).select('timestamp_millis, 'timestamp_micros)
val df = spark.read.format("avro").load(timestampAvro).select(
Symbol("timestamp_millis"), Symbol("timestamp_micros"))

val expected = timestampInputData.map(t => Row(new Timestamp(t._1), new Timestamp(t._2)))

Expand Down Expand Up @@ -184,7 +184,7 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession {
withTempDir { dir =>
val timestampAvro = timestampFile(dir.getAbsolutePath)
val schema = StructType(StructField("long", TimestampType, true) :: Nil)
val df = spark.read.format("avro").schema(schema).load(timestampAvro).select('long)
val df = spark.read.format("avro").schema(schema).load(timestampAvro).select(Symbol("long"))

val expected = timestampInputData.map(t => Row(new Timestamp(t._3)))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ class DeprecatedAvroFunctionsSuite extends QueryTest with SharedSparkSession {
import testImplicits._

test("roundtrip in to_avro and from_avro - int and string") {
val df = spark.range(10).select('id, 'id.cast("string").as("str"))
val df = spark.range(10).select(Symbol("id"), Symbol("id").cast("string").as("str"))

val avroDF = df.select(to_avro('id).as("a"), to_avro('str).as("b"))
val avroDF = df.select(to_avro(Symbol("id")).as("a"), to_avro(Symbol("str")).as("b"))
val avroTypeLong = s"""
|{
| "type": "int",
Expand All @@ -49,12 +49,14 @@ class DeprecatedAvroFunctionsSuite extends QueryTest with SharedSparkSession {
| "name": "str"
|}
""".stripMargin
checkAnswer(avroDF.select(from_avro('a, avroTypeLong), from_avro('b, avroTypeStr)), df)
checkAnswer(
avroDF.select(from_avro(Symbol("a"), avroTypeLong), from_avro(Symbol("b"), avroTypeStr)), df)
}

test("roundtrip in to_avro and from_avro - struct") {
val df = spark.range(10).select(struct('id, 'id.cast("string").as("str")).as("struct"))
val avroStructDF = df.select(to_avro('struct).as("avro"))
val df = spark.range(10).select(
struct(Symbol("id"), Symbol("id").cast("string").as("str")).as("struct"))
val avroStructDF = df.select(to_avro(Symbol("struct")).as("avro"))
val avroTypeStruct = s"""
|{
| "type": "record",
Expand All @@ -65,7 +67,7 @@ class DeprecatedAvroFunctionsSuite extends QueryTest with SharedSparkSession {
| ]
|}
""".stripMargin
checkAnswer(avroStructDF.select(from_avro('avro, avroTypeStruct)), df)
checkAnswer(avroStructDF.select(from_avro(Symbol("avro"), avroTypeStruct)), df)
}

test("roundtrip in to_avro and from_avro - array with null") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,9 +554,9 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {

val windowedAggregation = kafka
.withWatermark("timestamp", "10 seconds")
.groupBy(window($"timestamp", "5 seconds") as 'window)
.agg(count("*") as 'count)
.select($"window".getField("start") as 'window, $"count")
.groupBy(window($"timestamp", "5 seconds") as Symbol("window"))
.agg(count("*") as Symbol("count"))
.select($"window".getField("start") as Symbol("window"), $"count")

val query = windowedAggregation
.writeStream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ abstract class KafkaSinkBatchSuiteBase extends KafkaSinkSuiteBase {
test("SPARK-20496: batch - enforce analyzed plans") {
val inputEvents =
spark.range(1, 1000)
.select(to_json(struct("*")) as 'value)
.select(to_json(struct("*")) as Symbol("value"))

val topic = newTopic()
testUtils.createTopic(topic)
Expand Down
8 changes: 5 additions & 3 deletions mllib/src/test/scala/org/apache/spark/ml/FunctionsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class FunctionsSuite extends MLTest {
(Vectors.sparse(3, Seq((0, 2.0), (2, 3.0))), OldVectors.sparse(3, Seq((0, 20.0), (2, 30.0))))
).toDF("vec", "oldVec")

val result = df.select(vector_to_array('vec), vector_to_array('oldVec))
val result = df.select(vector_to_array(Symbol("vec")), vector_to_array(Symbol("oldVec")))
.as[(Seq[Double], Seq[Double])].collect().toSeq

val expected = Seq(
Expand Down Expand Up @@ -65,7 +65,8 @@ class FunctionsSuite extends MLTest {
(Vectors.sparse(3, Seq((0, 2.0), (2, 3.0))), OldVectors.sparse(3, Seq((0, 20.0), (2, 30.0))))
).toDF("vec", "oldVec")
val dfArrayFloat = df3.select(
vector_to_array('vec, dtype = "float32"), vector_to_array('oldVec, dtype = "float32"))
vector_to_array(Symbol("vec"), dtype = "float32"),
vector_to_array(Symbol("oldVec"), dtype = "float32"))

// Check values are correct
val result3 = dfArrayFloat.as[(Seq[Float], Seq[Float])].collect().toSeq
Expand All @@ -82,7 +83,8 @@ class FunctionsSuite extends MLTest {

val thrown2 = intercept[IllegalArgumentException] {
df3.select(
vector_to_array('vec, dtype = "float16"), vector_to_array('oldVec, dtype = "float16"))
vector_to_array(Symbol("vec"), dtype = "float16"),
vector_to_array(Symbol("oldVec"), dtype = "float16"))
}
assert(thrown2.getMessage.contains(
s"Unsupported dtype: float16. Valid values: float64, float32."))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class AnalysisSuite extends AnalysisTest with Matchers {
test("fail for unresolved plan") {
intercept[AnalysisException] {
// `testRelation` does not have column `b`.
testRelation.select('b).analyze
testRelation.select(Symbol("b")).analyze
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
*/
class PullOutNondeterministicSuite extends AnalysisTest {

private lazy val a = 'a.int
private lazy val b = 'b.int
private lazy val a = Symbol("a").int
private lazy val b = Symbol("b").int
private lazy val r = LocalRelation(a, b)
private lazy val rnd = Rand(10).as('_nondeterministic)
private lazy val rnd = Rand(10).as(Symbol("_nondeterministic"))
private lazy val rndref = rnd.toAttribute

test("no-op on filter") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ import org.apache.spark.sql.types._

class ResolveGroupingAnalyticsSuite extends AnalysisTest {

lazy val a = 'a.int
lazy val b = 'b.string
lazy val c = 'c.string
lazy val a = Symbol("a").int
lazy val b = Symbol("b").string
lazy val c = Symbol("c").string
lazy val unresolved_a = UnresolvedAttribute("a")
lazy val unresolved_b = UnresolvedAttribute("b")
lazy val unresolved_c = UnresolvedAttribute("c")
lazy val gid = 'spark_grouping_id.long.withNullability(false)
lazy val hive_gid = 'grouping__id.long.withNullability(false)
lazy val gid = Symbol("spark_grouping_id").long.withNullability(false)
lazy val hive_gid = Symbol("grouping__id").long.withNullability(false)
lazy val grouping_a = Cast(ShiftRight(gid, 1) & 1L, ByteType, Option(TimeZone.getDefault().getID))
lazy val nulInt = Literal(null, IntegerType)
lazy val nulStr = Literal(null, StringType)
Expand Down Expand Up @@ -287,7 +287,7 @@ class ResolveGroupingAnalyticsSuite extends AnalysisTest {
GroupingSets(Seq(Seq(), Seq(unresolved_a), Seq(unresolved_a, unresolved_b)),
Seq(unresolved_a, unresolved_b), r1, Seq(unresolved_a, unresolved_b)))
val expected = Project(Seq(a, b), Sort(
Seq(SortOrder('aggOrder.byte.withNullability(false), Ascending)), true,
Seq(SortOrder(Symbol("aggOrder").byte.withNullability(false), Ascending)), true,
Aggregate(Seq(a, b, gid),
Seq(a, b, grouping_a.as("aggOrder")),
Expand(
Expand All @@ -308,7 +308,7 @@ class ResolveGroupingAnalyticsSuite extends AnalysisTest {
GroupingSets(Seq(Seq(), Seq(unresolved_a), Seq(unresolved_a, unresolved_b)),
Seq(unresolved_a, unresolved_b), r1, Seq(unresolved_a, unresolved_b)))
val expected3 = Project(Seq(a, b), Sort(
Seq(SortOrder('aggOrder.long.withNullability(false), Ascending)), true,
Seq(SortOrder(Symbol("aggOrder").long.withNullability(false), Ascending)), true,
Aggregate(Seq(a, b, gid),
Seq(a, b, gid.as("aggOrder")),
Expand(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ class ResolveHintsSuite extends AnalysisTest {
test("do not traverse past existing broadcast hints") {
checkAnalysis(
UnresolvedHint("MAPJOIN", Seq("table"),
ResolvedHint(table("table").where('a > 1), HintInfo(strategy = Some(BROADCAST)))),
ResolvedHint(testRelation.where('a > 1), HintInfo(strategy = Some(BROADCAST))).analyze,
caseSensitive = false)
ResolvedHint(table("table").where(Symbol("a") > 1), HintInfo(strategy = Some(BROADCAST)))),
ResolvedHint(testRelation.where(
Symbol("a") > 1), HintInfo(strategy = Some(BROADCAST))).analyze, caseSensitive = false)
}

test("should work for subqueries") {
Expand All @@ -83,7 +83,7 @@ class ResolveHintsSuite extends AnalysisTest {
caseSensitive = false)

checkAnalysis(
UnresolvedHint("MAPJOIN", Seq("tableAlias"), table("table").subquery('tableAlias)),
UnresolvedHint("MAPJOIN", Seq("tableAlias"), table("table").subquery(Symbol("tableAlias"))),
ResolvedHint(testRelation, HintInfo(strategy = Some(BROADCAST))),
caseSensitive = false)

Expand All @@ -96,8 +96,10 @@ class ResolveHintsSuite extends AnalysisTest {

test("do not traverse past subquery alias") {
checkAnalysis(
UnresolvedHint("MAPJOIN", Seq("table"), table("table").where('a > 1).subquery('tableAlias)),
testRelation.where('a > 1).analyze,
UnresolvedHint(
"MAPJOIN", Seq("table"),
table("table").where(Symbol("a") > 1).subquery(Symbol("tableAlias"))),
testRelation.where(Symbol("a") > 1).analyze,
caseSensitive = false)
}

Expand All @@ -109,8 +111,9 @@ class ResolveHintsSuite extends AnalysisTest {
|SELECT /*+ BROADCAST(ctetable) */ * FROM ctetable
""".stripMargin
),
ResolvedHint(testRelation.where('a > 1).select('a), HintInfo(strategy = Some(BROADCAST)))
.select('a).analyze,
ResolvedHint(
testRelation.where(Symbol("a") > 1).select(Symbol("a")),
HintInfo(strategy = Some(BROADCAST))).select(Symbol("a")).analyze,
caseSensitive = false)
}

Expand All @@ -122,7 +125,7 @@ class ResolveHintsSuite extends AnalysisTest {
|SELECT /*+ BROADCAST(table) */ * FROM ctetable
""".stripMargin
),
testRelation.where('a > 1).select('a).select('a).analyze,
testRelation.where(Symbol("a") > 1).select(Symbol("a")).select(Symbol("a")).analyze,
caseSensitive = false)
}

Expand Down
Loading