Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
import testImplicits._

test("roundtrip in to_avro and from_avro - int and string") {
val df = spark.range(10).select('id, 'id.cast("string").as("str"))
val df = spark.range(10).select($"id", $"id".cast("string").as("str"))

val avroDF = df.select(
functions.to_avro('id).as("a"),
functions.to_avro('str).as("b"))
functions.to_avro($"id").as("a"),
functions.to_avro($"str").as("b"))
val avroTypeLong = s"""
|{
| "type": "int",
Expand All @@ -54,13 +54,13 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
|}
""".stripMargin
checkAnswer(avroDF.select(
functions.from_avro('a, avroTypeLong),
functions.from_avro('b, avroTypeStr)), df)
functions.from_avro($"a", avroTypeLong),
functions.from_avro($"b", avroTypeStr)), df)
}

test("roundtrip in to_avro and from_avro - struct") {
val df = spark.range(10).select(struct('id, 'id.cast("string").as("str")).as("struct"))
val avroStructDF = df.select(functions.to_avro('struct).as("avro"))
val df = spark.range(10).select(struct($"id", $"id".cast("string").as("str")).as("struct"))
val avroStructDF = df.select(functions.to_avro($"struct").as("avro"))
val avroTypeStruct = s"""
|{
| "type": "record",
Expand All @@ -72,13 +72,13 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
|}
""".stripMargin
checkAnswer(avroStructDF.select(
functions.from_avro('avro, avroTypeStruct)), df)
functions.from_avro($"avro", avroTypeStruct)), df)
}

test("handle invalid input in from_avro") {
val count = 10
val df = spark.range(count).select(struct('id, 'id.as("id2")).as("struct"))
val avroStructDF = df.select(functions.to_avro('struct).as("avro"))
val df = spark.range(count).select(struct($"id", $"id".as("id2")).as("struct"))
val avroStructDF = df.select(functions.to_avro($"struct").as("avro"))
val avroTypeStruct = s"""
|{
| "type": "record",
Expand All @@ -93,15 +93,15 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
intercept[SparkException] {
avroStructDF.select(
functions.from_avro(
'avro, avroTypeStruct, Map("mode" -> "FAILFAST").asJava)).collect()
$"avro", avroTypeStruct, Map("mode" -> "FAILFAST").asJava)).collect()
}

// For PERMISSIVE mode, the result should be row of null columns.
val expected = (0 until count).map(_ => Row(Row(null, null)))
checkAnswer(
avroStructDF.select(
functions.from_avro(
'avro, avroTypeStruct, Map("mode" -> "PERMISSIVE").asJava)),
$"avro", avroTypeStruct, Map("mode" -> "PERMISSIVE").asJava)),
expected)
}

Expand Down Expand Up @@ -161,9 +161,9 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession {

test("SPARK-27506: roundtrip in to_avro and from_avro with different compatible schemas") {
val df = spark.range(10).select(
struct('id.as("col1"), 'id.cast("string").as("col2")).as("struct")
struct($"id".as("col1"), $"id".cast("string").as("col2")).as("struct")
)
val avroStructDF = df.select(functions.to_avro('struct).as("avro"))
val avroStructDF = df.select(functions.to_avro($"struct").as("avro"))
val actualAvroSchema =
s"""
|{
Expand All @@ -190,20 +190,20 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
|""".stripMargin

val expected = spark.range(10).select(
struct('id.as("col1"), 'id.cast("string").as("col2"), lit("").as("col3")).as("struct")
struct($"id".as("col1"), $"id".cast("string").as("col2"), lit("").as("col3")).as("struct")
)

checkAnswer(
avroStructDF.select(
functions.from_avro(
'avro,
$"avro",
actualAvroSchema,
Map("avroSchema" -> evolvedAvroSchema).asJava)),
expected)
}

test("roundtrip in to_avro and from_avro - struct with nullable Avro schema") {
val df = spark.range(10).select(struct('id, 'id.cast("string").as("str")).as("struct"))
val df = spark.range(10).select(struct($"id", $"id".cast("string").as("str")).as("struct"))
val avroTypeStruct = s"""
|{
| "type": "record",
Expand All @@ -214,13 +214,13 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
| ]
|}
""".stripMargin
val avroStructDF = df.select(functions.to_avro('struct, avroTypeStruct).as("avro"))
val avroStructDF = df.select(functions.to_avro($"struct", avroTypeStruct).as("avro"))
checkAnswer(avroStructDF.select(
functions.from_avro('avro, avroTypeStruct)), df)
functions.from_avro($"avro", avroTypeStruct)), df)
}

test("to_avro with unsupported nullable Avro schema") {
val df = spark.range(10).select(struct('id, 'id.cast("string").as("str")).as("struct"))
val df = spark.range(10).select(struct($"id", $"id".cast("string").as("str")).as("struct"))
for (unsupportedAvroType <- Seq("""["null", "int", "long"]""", """["int", "long"]""")) {
val avroTypeStruct = s"""
|{
Expand All @@ -233,7 +233,7 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
|}
""".stripMargin
val message = intercept[SparkException] {
df.select(functions.to_avro('struct, avroTypeStruct).as("avro")).show()
df.select(functions.to_avro($"struct", avroTypeStruct).as("avro")).show()
}.getCause.getMessage
assert(message.contains("Only UNION of a null type and a non-null type is supported"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession {
withTempDir { dir =>
val expected = timestampInputData.map(t => Row(new Timestamp(t._1)))
val timestampAvro = timestampFile(dir.getAbsolutePath)
val df = spark.read.format("avro").load(timestampAvro).select('timestamp_millis)
val df = spark.read.format("avro").load(timestampAvro).select($"timestamp_millis")

checkAnswer(df, expected)

Expand All @@ -144,7 +144,7 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession {
withTempDir { dir =>
val expected = timestampInputData.map(t => Row(new Timestamp(t._2)))
val timestampAvro = timestampFile(dir.getAbsolutePath)
val df = spark.read.format("avro").load(timestampAvro).select('timestamp_micros)
val df = spark.read.format("avro").load(timestampAvro).select($"timestamp_micros")

checkAnswer(df, expected)

Expand All @@ -160,7 +160,7 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession {
val expected = timestampInputData.map(t =>
Row(DateTimeUtils.microsToLocalDateTime(DateTimeUtils.millisToMicros(t._3))))
val timestampAvro = timestampFile(dir.getAbsolutePath)
val df = spark.read.format("avro").load(timestampAvro).select('local_timestamp_millis)
val df = spark.read.format("avro").load(timestampAvro).select($"local_timestamp_millis")

checkAnswer(df, expected)

Expand All @@ -176,7 +176,7 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession {
val expected = timestampInputData.map(t =>
Row(DateTimeUtils.microsToLocalDateTime(DateTimeUtils.millisToMicros(t._4))))
val timestampAvro = timestampFile(dir.getAbsolutePath)
val df = spark.read.format("avro").load(timestampAvro).select('local_timestamp_micros)
val df = spark.read.format("avro").load(timestampAvro).select($"local_timestamp_micros")

checkAnswer(df, expected)

Expand All @@ -194,7 +194,8 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession {
withTempDir { dir =>
val timestampAvro = timestampFile(dir.getAbsolutePath)
val df =
spark.read.format("avro").load(timestampAvro).select('timestamp_millis, 'timestamp_micros)
spark.read.format("avro").load(timestampAvro)
.select($"timestamp_millis", $"timestamp_micros")

val expected = timestampInputData.map(t => Row(new Timestamp(t._1), new Timestamp(t._2)))

Expand Down Expand Up @@ -226,7 +227,7 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession {
withTempDir { dir =>
val timestampAvro = timestampFile(dir.getAbsolutePath)
val df = spark.read.format("avro").load(timestampAvro).select(
'local_timestamp_millis, 'local_timestamp_micros)
$"local_timestamp_millis", $"local_timestamp_micros")

val expected = timestampInputData.map(t =>
Row(DateTimeUtils.microsToLocalDateTime(DateTimeUtils.millisToMicros(t._3)),
Expand Down Expand Up @@ -260,7 +261,7 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession {
withTempDir { dir =>
val timestampAvro = timestampFile(dir.getAbsolutePath)
val schema = StructType(StructField("long", TimestampType, true) :: Nil)
val df = spark.read.format("avro").schema(schema).load(timestampAvro).select('long)
val df = spark.read.format("avro").schema(schema).load(timestampAvro).select($"long")

val expected = timestampInputData.map(t => Row(new Timestamp(t._5)))

Expand All @@ -272,7 +273,7 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession {
withTempDir { dir =>
val timestampAvro = timestampFile(dir.getAbsolutePath)
val schema = StructType(StructField("long", TimestampNTZType, true) :: Nil)
val df = spark.read.format("avro").schema(schema).load(timestampAvro).select('long)
val df = spark.read.format("avro").schema(schema).load(timestampAvro).select($"long")

val expected = timestampInputData.map(t =>
Row(DateTimeUtils.microsToLocalDateTime(DateTimeUtils.millisToMicros(t._5))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ class DeprecatedAvroFunctionsSuite extends QueryTest with SharedSparkSession {
import testImplicits._

test("roundtrip in to_avro and from_avro - int and string") {
val df = spark.range(10).select('id, 'id.cast("string").as("str"))
val df = spark.range(10).select($"id", $"id".cast("string").as("str"))

val avroDF = df.select(to_avro('id).as("a"), to_avro('str).as("b"))
val avroDF = df.select(to_avro($"id").as("a"), to_avro($"str").as("b"))
val avroTypeLong = s"""
|{
| "type": "int",
Expand All @@ -49,12 +49,12 @@ class DeprecatedAvroFunctionsSuite extends QueryTest with SharedSparkSession {
| "name": "str"
|}
""".stripMargin
checkAnswer(avroDF.select(from_avro('a, avroTypeLong), from_avro('b, avroTypeStr)), df)
checkAnswer(avroDF.select(from_avro($"a", avroTypeLong), from_avro($"b", avroTypeStr)), df)
}

test("roundtrip in to_avro and from_avro - struct") {
val df = spark.range(10).select(struct('id, 'id.cast("string").as("str")).as("struct"))
val avroStructDF = df.select(to_avro('struct).as("avro"))
val df = spark.range(10).select(struct($"id", $"id".cast("string").as("str")).as("struct"))
val avroStructDF = df.select(to_avro($"struct").as("avro"))
val avroTypeStruct = s"""
|{
| "type": "record",
Expand All @@ -65,7 +65,7 @@ class DeprecatedAvroFunctionsSuite extends QueryTest with SharedSparkSession {
| ]
|}
""".stripMargin
checkAnswer(avroStructDF.select(from_avro('avro, avroTypeStruct)), df)
checkAnswer(avroStructDF.select(from_avro($"avro", avroTypeStruct)), df)
}

test("roundtrip in to_avro and from_avro - array with null") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -792,9 +792,9 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {

val windowedAggregation = kafka
.withWatermark("timestamp", "10 seconds")
.groupBy(window($"timestamp", "5 seconds") as 'window)
.agg(count("*") as 'count)
.select($"window".getField("start") as 'window, $"count")
.groupBy(window($"timestamp", "5 seconds") as Symbol("window"))
.agg(count("*") as Symbol("count"))
.select($"window".getField("start") as Symbol("window"), $"count")

val query = windowedAggregation
.writeStream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ abstract class KafkaSinkBatchSuiteBase extends KafkaSinkSuiteBase {
test("SPARK-20496: batch - enforce analyzed plans") {
val inputEvents =
spark.range(1, 1000)
.select(to_json(struct("*")) as 'value)
.select(to_json(struct("*")) as Symbol("value"))

val topic = newTopic()
testUtils.createTopic(topic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class PrometheusServletSuite extends SparkFunSuite with PrivateMethodTester {
val key = "local-1592132938718.driver.LiveListenerBus." +
"listenerProcessingTime.org.apache.spark.HeartbeatReceiver"
val sink = createPrometheusServlet()
val suffix = sink invokePrivate PrivateMethod[String]('normalizeKey)(key)
val suffix = sink invokePrivate PrivateMethod[String](Symbol("normalizeKey"))(key)
assert(suffix == "metrics_local_1592132938718_driver_LiveListenerBus_" +
"listenerProcessingTime_org_apache_spark_HeartbeatReceiver_")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ object StructuredSessionization {
// Sessionize the events. Track number of events, start and end timestamps of session,
// and report session updates.
val sessionUpdates = events
.groupBy(session_window($"eventTime", "10 seconds") as 'session, 'sessionId)
.groupBy(session_window($"eventTime", "10 seconds") as Symbol("session"), $"sessionId")
.agg(count("*").as("numEvents"))
.selectExpr("sessionId", "CAST(session.start AS LONG)", "CAST(session.end AS LONG)",
"CAST(session.end AS LONG) - CAST(session.start AS LONG) AS durationMs",
Expand Down
6 changes: 3 additions & 3 deletions mllib/src/test/scala/org/apache/spark/ml/FunctionsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class FunctionsSuite extends MLTest {
(Vectors.sparse(3, Seq((0, 2.0), (2, 3.0))), OldVectors.sparse(3, Seq((0, 20.0), (2, 30.0))))
).toDF("vec", "oldVec")

val result = df.select(vector_to_array('vec), vector_to_array('oldVec))
val result = df.select(vector_to_array($"vec"), vector_to_array($"oldVec"))
.as[(Seq[Double], Seq[Double])].collect().toSeq

val expected = Seq(
Expand Down Expand Up @@ -65,7 +65,7 @@ class FunctionsSuite extends MLTest {
(Vectors.sparse(3, Seq((0, 2.0), (2, 3.0))), OldVectors.sparse(3, Seq((0, 20.0), (2, 30.0))))
).toDF("vec", "oldVec")
val dfArrayFloat = df3.select(
vector_to_array('vec, dtype = "float32"), vector_to_array('oldVec, dtype = "float32"))
vector_to_array($"vec", dtype = "float32"), vector_to_array($"oldVec", dtype = "float32"))

// Check values are correct
val result3 = dfArrayFloat.as[(Seq[Float], Seq[Float])].collect().toSeq
Expand All @@ -82,7 +82,7 @@ class FunctionsSuite extends MLTest {

val thrown2 = intercept[IllegalArgumentException] {
df3.select(
vector_to_array('vec, dtype = "float16"), vector_to_array('oldVec, dtype = "float16"))
vector_to_array($"vec", dtype = "float16"), vector_to_array($"oldVec", dtype = "float16"))
}
assert(thrown2.getMessage.contains(
s"Unsupported dtype: float16. Valid values: float64, float32."))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ import org.apache.spark.unsafe.types.UTF8String
* // SQL verbs can be used to construct logical query plans.
* scala> import org.apache.spark.sql.catalyst.plans.logical._
* scala> import org.apache.spark.sql.catalyst.dsl.plans._
* scala> LocalRelation('key.int, 'value.string).where('key === 1).select('value).analyze
* scala> LocalRelation($"key".int, $"value".string).where('key === 1).select('value).analyze
* res3: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
* Project [value#3]
* Filter (key#2 = 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -641,20 +641,20 @@ class AnalysisErrorSuite extends AnalysisTest {
}

test("Join can work on binary types but can't work on map types") {
val left = LocalRelation(Symbol("a").binary, Symbol("b").map(StringType, StringType))
val right = LocalRelation(Symbol("c").binary, Symbol("d").map(StringType, StringType))
val left = LocalRelation($"a".binary, Symbol("b").map(StringType, StringType))
val right = LocalRelation($"c".binary, Symbol("d").map(StringType, StringType))

val plan1 = left.join(
right,
joinType = Cross,
condition = Some(Symbol("a") === Symbol("c")))
condition = Some($"a" === $"c"))

assertAnalysisSuccess(plan1)

val plan2 = left.join(
right,
joinType = Cross,
condition = Some(Symbol("b") === Symbol("d")))
condition = Some($"b" === $"d"))
assertAnalysisError(plan2, "EqualTo does not support ordering on type map" :: Nil)
}

Expand Down Expand Up @@ -722,7 +722,7 @@ class AnalysisErrorSuite extends AnalysisTest {
test("Error on filter condition containing aggregate expressions") {
val a = AttributeReference("a", IntegerType)()
val b = AttributeReference("b", IntegerType)()
val plan = Filter(Symbol("a") === UnresolvedFunction("max", Seq(b), true), LocalRelation(a, b))
val plan = Filter($"a" === UnresolvedFunction("max", Seq(b), true), LocalRelation(a, b))
assertAnalysisError(plan,
"Aggregate/Window/Generate expressions are not valid in where clause of the query" :: Nil)
}
Expand Down
Loading