Skip to content

Commit c2ce566

Browse files
zsxwingSeongjin Cho
authored andcommitted
[SPARK-30936][CORE] Set FAIL_ON_UNKNOWN_PROPERTIES to false by default to parse Spark events
### What changes were proposed in this pull request? Set `FAIL_ON_UNKNOWN_PROPERTIES` to `false` in `JsonProtocol` to allow ignore unknown fields in a Spark event. After this change, if we add new fields to a Spark event parsed by `ObjectMapper`, the event json string generated by a new Spark version can still be read by an old Spark History Server. Since Spark History Server is an extra service, it usually takes time to upgrade, and it's possible that a Spark application is upgraded before SHS. Forwards-compatibility will allow an old SHS to support new Spark applications (may lose some new features but most of functions should still work). ### Why are the changes needed? `JsonProtocol` is supposed to provide strong backwards-compatibility and forwards-compatibility guarantees: any version of Spark should be able to read JSON output written by any other version, including newer versions. However, the forwards-compatibility guarantee is broken for events parsed by `ObjectMapper`. If a new field is added to an event parsed by `ObjectMapper` (e.g., apache@6dc5921#diff-dc5c7a41fbb7479cef48b67eb41ad254R33), the event json string generated by a new Spark version cannot be parsed by an old version of SHS right now. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? The new added tests. Closes apache#27680 from zsxwing/SPARK-30936. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 31d7dbc commit c2ce566

File tree

3 files changed

+41
-15
lines changed

3 files changed

+41
-15
lines changed

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.util.{Properties, UUID}
2222
import scala.collection.JavaConverters._
2323
import scala.collection.Map
2424

25-
import com.fasterxml.jackson.databind.ObjectMapper
25+
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
2626
import com.fasterxml.jackson.module.scala.DefaultScalaModule
2727
import org.json4s.DefaultFormats
2828
import org.json4s.JsonAST._
@@ -59,6 +59,7 @@ private[spark] object JsonProtocol {
5959
private implicit val format = DefaultFormats
6060

6161
private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
62+
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
6263

6364
/** ------------------------------------------------- *
6465
* JSON serialization methods for SparkListenerEvents |

core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,28 @@ class JsonProtocolSuite extends SparkFunSuite {
483483
testAccumValue(Some("anything"), blocks, JString(blocks.toString))
484484
testAccumValue(Some("anything"), 123, JString("123"))
485485
}
486+
487+
test("SPARK-30936: forwards compatibility - ignore unknown fields") {
488+
val expected = TestListenerEvent("foo", 123)
489+
val unknownFieldsJson =
490+
"""{
491+
| "Event" : "org.apache.spark.util.TestListenerEvent",
492+
| "foo" : "foo",
493+
| "bar" : 123,
494+
| "unknown" : "unknown"
495+
|}""".stripMargin
496+
assert(JsonProtocol.sparkEventFromJson(parse(unknownFieldsJson)) === expected)
497+
}
498+
499+
test("SPARK-30936: backwards compatibility - set default values for missing fields") {
500+
val expected = TestListenerEvent("foo", 0)
501+
val unknownFieldsJson =
502+
"""{
503+
| "Event" : "org.apache.spark.util.TestListenerEvent",
504+
| "foo" : "foo"
505+
|}""".stripMargin
506+
assert(JsonProtocol.sparkEventFromJson(parse(unknownFieldsJson)) === expected)
507+
}
486508
}
487509

488510

@@ -2313,3 +2335,5 @@ private[spark] object JsonProtocolSuite extends Assertions {
23132335
|}
23142336
""".stripMargin
23152337
}
2338+
2339+
case class TestListenerEvent(foo: String, bar: Int) extends SparkListenerEvent

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -382,28 +382,27 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
382382
}
383383
}
384384

385-
testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.0") {
385+
testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_0") {
386386
// query-event-logs-version-2.0.0.txt has all types of events generated by
387-
// Structured Streaming in Spark 2.0.0.
387+
// Structured Streaming in Spark 2.0.0. Because we renamed the classes,
388388
// SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
389389
// to verify that we can skip broken jsons generated by Structured Streaming.
390-
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.0.txt")
390+
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.0.txt", 1)
391391
}
392392

393-
testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.1") {
393+
testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_1") {
394394
// query-event-logs-version-2.0.1.txt has all types of events generated by
395-
// Structured Streaming in Spark 2.0.1.
395+
// Structured Streaming in Spark 2.0.1. Because we renamed the classes,
396396
// SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
397397
// to verify that we can skip broken jsons generated by Structured Streaming.
398-
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.1.txt")
398+
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.1.txt", 1)
399399
}
400400

401-
testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.2") {
401+
testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_2") {
402402
// query-event-logs-version-2.0.2.txt has all types of events generated by
403-
// Structured Streaming in Spark 2.0.2.
404-
// SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
405-
// to verify that we can skip broken jsons generated by Structured Streaming.
406-
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.2.txt")
403+
// Structured Streaming in Spark 2.0.2. SPARK-18516 refactored Structured Streaming query events
404+
// in 2.1.0. This test is to verify we are able to load events generated by Spark 2.0.2.
405+
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.2.txt", 5)
407406
}
408407

409408
test("listener propagates observable metrics") {
@@ -463,7 +462,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
463462
}
464463
}
465464

466-
private def testReplayListenerBusWithBorkenEventJsons(fileName: String): Unit = {
465+
private def testReplayListenerBusWithBorkenEventJsons(
466+
fileName: String,
467+
expectedEventSize: Int): Unit = {
467468
val input = getClass.getResourceAsStream(s"/structured-streaming/$fileName")
468469
val events = mutable.ArrayBuffer[SparkListenerEvent]()
469470
try {
@@ -479,8 +480,8 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
479480
replayer.addListener(new SparkListener {})
480481
replayer.replay(input, fileName)
481482
// SparkListenerApplicationEnd is the only valid event
482-
assert(events.size === 1)
483-
assert(events(0).isInstanceOf[SparkListenerApplicationEnd])
483+
assert(events.size === expectedEventSize)
484+
assert(events.last.isInstanceOf[SparkListenerApplicationEnd])
484485
} finally {
485486
input.close()
486487
}

0 commit comments

Comments
 (0)