Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -363,12 +363,22 @@ private[spark] object JsonProtocol {
case v: Long => JInt(v)
// We only have 3 kind of internal accumulator types, so if it's not int or long, it must be
// the blocks accumulator, whose type is `java.util.List[(BlockId, BlockStatus)]`
case v =>
JArray(v.asInstanceOf[java.util.List[(BlockId, BlockStatus)]].asScala.toList.map {
case (id, status) =>
("Block ID" -> id.toString) ~
("Status" -> blockStatusToJson(status))
case v: java.util.List[_] =>
JArray(v.asScala.toList.flatMap {
case (id: BlockId, status: BlockStatus) =>
Some(
("Block ID" -> id.toString) ~
("Status" -> blockStatusToJson(status))
)
case _ =>
// Ignore unsupported types. A user may put `METRICS_PREFIX` in the name. We should
// not crash.
None
})
case _ =>
// Ignore unsupported types. A user may put `METRICS_PREFIX` in the name. We should not
// crash.
JNothing
}
} else {
// For all external accumulators, just use strings
Expand Down
48 changes: 48 additions & 0 deletions core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,54 @@ class JsonProtocolSuite extends SparkFunSuite {
testAccumValue(Some("anything"), 123, JString("123"))
}

/** Create an AccumulableInfo and verify we can serialize and deserialize it. */
private def testAccumulableInfo(
name: String,
value: Option[Any],
expectedValue: Option[Any]): Unit = {
val isInternal = name.startsWith(InternalAccumulator.METRICS_PREFIX)
val accum = AccumulableInfo(
123L,
Some(name),
update = value,
value = value,
internal = isInternal,
countFailedValues = false)
val json = JsonProtocol.accumulableInfoToJson(accum)
val newAccum = JsonProtocol.accumulableInfoFromJson(json)
assert(newAccum == accum.copy(update = expectedValue, value = expectedValue))
}

test("SPARK-31923: unexpected value type of internal accumulator") {
// Because a user may use `METRICS_PREFIX` in an accumulator name, we should test unexpected
// types to make sure we don't crash.
import InternalAccumulator.METRICS_PREFIX
testAccumulableInfo(
METRICS_PREFIX + "fooString",
value = Some("foo"),
expectedValue = None)
testAccumulableInfo(
METRICS_PREFIX + "fooList",
value = Some(java.util.Arrays.asList("string")),
expectedValue = Some(java.util.Collections.emptyList())
)
val blocks = Seq(
(TestBlockId("block1"), BlockStatus(StorageLevel.MEMORY_ONLY, 1L, 2L)),
(TestBlockId("block2"), BlockStatus(StorageLevel.DISK_ONLY, 3L, 4L)))
testAccumulableInfo(
METRICS_PREFIX + "fooList",
value = Some(java.util.Arrays.asList(
"string",
blocks(0),
blocks(1))),
expectedValue = Some(blocks.asJava)
)
testAccumulableInfo(
METRICS_PREFIX + "fooSet",
value = Some(Set("foo")),
expectedValue = None)
}

test("SPARK-30936: forwards compatibility - ignore unknown fields") {
val expected = TestListenerEvent("foo", 123)
val unknownFieldsJson =
Expand Down