Skip to content

Commit daabb27

Browse files
committed
Fix
1 parent 9d032d0 commit daabb27

File tree

11 files changed

+143
-35
lines changed

11 files changed

+143
-35
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ private[kafka010] class KafkaSource(
100100
override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
101101
out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517)
102102
val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
103-
writer.write(VERSION)
103+
writer.write("v" + VERSION + "\n")
104104
writer.write(metadata.json)
105105
writer.flush
106106
}
@@ -111,13 +111,13 @@ private[kafka010] class KafkaSource(
111111
// HDFSMetadataLog guarantees that it never creates a partial file.
112112
assert(content.length != 0)
113113
if (content(0) == 'v') {
114-
if (content.startsWith(VERSION)) {
115-
KafkaSourceOffset(SerializedOffset(content.substring(VERSION.length)))
114+
val indexOfNewLine = content.indexOf("\n")
115+
if (indexOfNewLine > 0) {
116+
val version = parseVersion(content.substring(0, indexOfNewLine), VERSION)
117+
KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1)))
116118
} else {
117-
val versionInFile = content.substring(0, content.indexOf("\n"))
118119
throw new IllegalStateException(
119-
s"Unsupported format. Expected version is ${VERSION.stripLineEnd} " +
120-
s"but was $versionInFile. Please upgrade your Spark.")
120+
s"Log file was malformed: failed to detect the log file version line.")
121121
}
122122
} else {
123123
// The log was generated by Spark 2.1.0
@@ -351,7 +351,7 @@ private[kafka010] object KafkaSource {
351351
| source option "failOnDataLoss" to "false".
352352
""".stripMargin
353353

354-
private val VERSION = "v1\n"
354+
private[kafka010] val VERSION = 1
355355

356356
def getSortedExecutorList(sc: SparkContext): Array[String] = {
357357
val bm = sc.env.blockManager

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ class KafkaSourceSuite extends KafkaSourceTest {
203203
override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
204204
out.write(0)
205205
val writer = new BufferedWriter(new OutputStreamWriter(out, UTF_8))
206-
writer.write(s"v0\n${metadata.json}")
206+
writer.write(s"v99999\n${metadata.json}")
207207
writer.flush
208208
}
209209
}
@@ -225,7 +225,12 @@ class KafkaSourceSuite extends KafkaSourceTest {
225225
source.getOffset.get // Read initial offset
226226
}
227227

228-
assert(e.getMessage.contains("Please upgrade your Spark"))
228+
Seq(
229+
s"maximum supported log version is v${KafkaSource.VERSION}, but encountered v99999",
230+
"produced by a newer version of Spark and cannot be read by this version"
231+
).foreach { message =>
232+
assert(e.getMessage.contains(message))
233+
}
229234
}
230235
}
231236

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import org.apache.spark.sql.SparkSession
4040
* doing a compaction, it will read all old log files and merge them with the new batch.
4141
*/
4242
abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
43-
metadataLogVersion: String,
43+
metadataLogVersion: Int,
4444
sparkSession: SparkSession,
4545
path: String)
4646
extends HDFSMetadataLog[Array[T]](sparkSession, path) {
@@ -134,7 +134,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
134134

135135
override def serialize(logData: Array[T], out: OutputStream): Unit = {
136136
// called inside a try-finally where the underlying stream is closed in the caller
137-
out.write(metadataLogVersion.getBytes(UTF_8))
137+
out.write(("v" + metadataLogVersion).getBytes(UTF_8))
138138
logData.foreach { data =>
139139
out.write('\n')
140140
out.write(Serialization.write(data).getBytes(UTF_8))
@@ -146,10 +146,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
146146
if (!lines.hasNext) {
147147
throw new IllegalStateException("Incomplete log file")
148148
}
149-
val version = lines.next()
150-
if (version != metadataLogVersion) {
151-
throw new IllegalStateException(s"Unknown log version: ${version}")
152-
}
149+
val version = parseVersion(lines.next(), metadataLogVersion)
153150
lines.map(Serialization.read[T]).toArray
154151
}
155152

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ object SinkFileStatus {
7777
* (drops the deleted files).
7878
*/
7979
class FileStreamSinkLog(
80-
metadataLogVersion: String,
80+
metadataLogVersion: Int,
8181
sparkSession: SparkSession,
8282
path: String)
8383
extends CompactibleFileStreamLog[SinkFileStatus](metadataLogVersion, sparkSession, path) {
@@ -106,7 +106,7 @@ class FileStreamSinkLog(
106106
}
107107

108108
object FileStreamSinkLog {
109-
val VERSION = "v1"
109+
val VERSION = 1
110110
val DELETE_ACTION = "delete"
111111
val ADD_ACTION = "add"
112112
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry
3030
import org.apache.spark.sql.internal.SQLConf
3131

3232
class FileStreamSourceLog(
33-
metadataLogVersion: String,
33+
metadataLogVersion: Int,
3434
sparkSession: SparkSession,
3535
path: String)
3636
extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, sparkSession, path) {
@@ -120,5 +120,5 @@ class FileStreamSourceLog(
120120
}
121121

122122
object FileStreamSourceLog {
123-
val VERSION = "v1"
123+
val VERSION = 1
124124
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,11 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
231231
val input = fileManager.open(batchMetadataFile)
232232
try {
233233
Some(deserialize(input))
234+
} catch {
235+
case ise: IllegalStateException =>
236+
// re-throw the exception with the log file path added
237+
throw new IllegalStateException(
238+
s"Failed to read log file $batchMetadataFile. ${ise.getMessage}", ise)
234239
} finally {
235240
IOUtils.closeQuietly(input)
236241
}
@@ -304,6 +309,37 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
304309
new FileSystemManager(metadataPath, hadoopConf)
305310
}
306311
}
312+
313+
/**
314+
* Parse the log version from the given `text` -- will throw exception when the parsed version
315+
* exceeds `maxSupportedVersion`, or when `text` is malformed (such as "xyz", "v", "v-1",
316+
* "v123xyz" etc.)
317+
*/
318+
private[sql] def parseVersion(text: String, maxSupportedVersion: Int): Int = {
319+
if (text.length > 0 && text(0) == 'v') {
320+
val version =
321+
try {
322+
text.substring(1, text.length).toInt
323+
} catch {
324+
case _: NumberFormatException =>
325+
throw new IllegalStateException(s"Log file was malformed: failed to read correct log " +
326+
s"version from $text.")
327+
}
328+
if (version > 0) {
329+
if (version > maxSupportedVersion) {
330+
throw new IllegalStateException(s"UnsupportedLogVersion: maximum supported log version " +
331+
s"is v${maxSupportedVersion}, but encountered v$version. The log file was produced " +
332+
s"by a newer version of Spark and cannot be read by this version. Please upgrade.")
333+
} else {
334+
return version
335+
}
336+
}
337+
}
338+
339+
// reaching here means we failed to read the correct log version
340+
throw new IllegalStateException(s"Log file was malformed: failed to read correct log " +
341+
s"version from $text.")
342+
}
307343
}
308344

309345
object HDFSMetadataLog {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,8 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String)
5555
if (!lines.hasNext) {
5656
throw new IllegalStateException("Incomplete log file")
5757
}
58-
val version = lines.next()
59-
if (version != OffsetSeqLog.VERSION) {
60-
throw new IllegalStateException(s"Unknown log version: ${version}")
61-
}
58+
59+
val version = parseVersion(lines.next(), OffsetSeqLog.VERSION)
6260

6361
// read metadata
6462
val metadata = lines.next().trim match {
@@ -70,7 +68,7 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String)
7068

7169
override protected def serialize(offsetSeq: OffsetSeq, out: OutputStream): Unit = {
7270
// called inside a try-finally where the underlying stream is closed in the caller
73-
out.write(OffsetSeqLog.VERSION.getBytes(UTF_8))
71+
out.write(("v" + OffsetSeqLog.VERSION).getBytes(UTF_8))
7472

7573
// write metadata
7674
out.write('\n')
@@ -88,6 +86,6 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String)
8886
}
8987

9088
object OffsetSeqLog {
91-
private val VERSION = "v1"
89+
private[streaming] val VERSION = 1
9290
private val SERIALIZED_VOID_OFFSET = "-"
9391
}

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
122122
defaultMinBatchesToRetain = 1,
123123
compactibleLog => {
124124
val logs = Array("entry_1", "entry_2", "entry_3")
125-
val expected = s"""${FakeCompactibleFileStreamLog.VERSION}
125+
val expected = s"""v${FakeCompactibleFileStreamLog.VERSION}
126126
|"entry_1"
127127
|"entry_2"
128128
|"entry_3"""".stripMargin
@@ -132,7 +132,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
132132

133133
baos.reset()
134134
compactibleLog.serialize(Array(), baos)
135-
assert(FakeCompactibleFileStreamLog.VERSION === baos.toString(UTF_8.name()))
135+
assert(s"v${FakeCompactibleFileStreamLog.VERSION}" === baos.toString(UTF_8.name()))
136136
})
137137
}
138138

@@ -142,7 +142,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
142142
defaultCompactInterval = 3,
143143
defaultMinBatchesToRetain = 1,
144144
compactibleLog => {
145-
val logs = s"""${FakeCompactibleFileStreamLog.VERSION}
145+
val logs = s"""v${FakeCompactibleFileStreamLog.VERSION}
146146
|"entry_1"
147147
|"entry_2"
148148
|"entry_3"""".stripMargin
@@ -152,10 +152,36 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
152152

153153
assert(Nil ===
154154
compactibleLog.deserialize(
155-
new ByteArrayInputStream(FakeCompactibleFileStreamLog.VERSION.getBytes(UTF_8))))
155+
new ByteArrayInputStream(s"v${FakeCompactibleFileStreamLog.VERSION}".getBytes(UTF_8))))
156156
})
157157
}
158158

159+
test("deserialization log written by future version") {
160+
withTempDir { dir =>
161+
def newFakeCompactibleFileStreamLog(version: Int): FakeCompactibleFileStreamLog =
162+
new FakeCompactibleFileStreamLog(
163+
version,
164+
_fileCleanupDelayMs = Long.MaxValue, // this param does not matter here in this test case
165+
_defaultCompactInterval = 3, // this param does not matter here in this test case
166+
_defaultMinBatchesToRetain = 1, // this param does not matter here in this test case
167+
spark,
168+
dir.getCanonicalPath)
169+
170+
val writer = newFakeCompactibleFileStreamLog(version = 2)
171+
val reader = newFakeCompactibleFileStreamLog(version = 1)
172+
writer.add(0, Array("entry"))
173+
val e = intercept[IllegalStateException] {
174+
reader.get(0)
175+
}
176+
Seq(
177+
"maximum supported log version is v1, but encountered v2",
178+
"produced by a newer version of Spark and cannot be read by this version"
179+
).foreach { message =>
180+
assert(e.getMessage.contains(message))
181+
}
182+
}
183+
}
184+
159185
testWithUninterruptibleThread("compact") {
160186
withFakeCompactibleFileStreamLog(
161187
fileCleanupDelayMs = Long.MaxValue,
@@ -219,6 +245,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
219245
): Unit = {
220246
withTempDir { file =>
221247
val compactibleLog = new FakeCompactibleFileStreamLog(
248+
FakeCompactibleFileStreamLog.VERSION,
222249
fileCleanupDelayMs,
223250
defaultCompactInterval,
224251
defaultMinBatchesToRetain,
@@ -230,17 +257,18 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
230257
}
231258

232259
object FakeCompactibleFileStreamLog {
233-
val VERSION = "test_version"
260+
val VERSION = 1
234261
}
235262

236263
class FakeCompactibleFileStreamLog(
264+
metadataLogVersion: Int,
237265
_fileCleanupDelayMs: Long,
238266
_defaultCompactInterval: Int,
239267
_defaultMinBatchesToRetain: Int,
240268
sparkSession: SparkSession,
241269
path: String)
242270
extends CompactibleFileStreamLog[String](
243-
FakeCompactibleFileStreamLog.VERSION,
271+
metadataLogVersion,
244272
sparkSession,
245273
path
246274
) {

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
7474
action = FileStreamSinkLog.ADD_ACTION))
7575

7676
// scalastyle:off
77-
val expected = s"""$VERSION
77+
val expected = s"""v$VERSION
7878
|{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add"}
7979
|{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"}
8080
|{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin
@@ -84,14 +84,14 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
8484
assert(expected === baos.toString(UTF_8.name()))
8585
baos.reset()
8686
sinkLog.serialize(Array(), baos)
87-
assert(VERSION === baos.toString(UTF_8.name()))
87+
assert(s"v$VERSION" === baos.toString(UTF_8.name()))
8888
}
8989
}
9090

9191
test("deserialize") {
9292
withFileStreamSinkLog { sinkLog =>
9393
// scalastyle:off
94-
val logs = s"""$VERSION
94+
val logs = s"""v$VERSION
9595
|{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add"}
9696
|{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"}
9797
|{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin
@@ -125,7 +125,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
125125

126126
assert(expected === sinkLog.deserialize(new ByteArrayInputStream(logs.getBytes(UTF_8))))
127127

128-
assert(Nil === sinkLog.deserialize(new ByteArrayInputStream(VERSION.getBytes(UTF_8))))
128+
assert(Nil === sinkLog.deserialize(new ByteArrayInputStream(s"v$VERSION".getBytes(UTF_8))))
129129
}
130130
}
131131

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,33 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
128128
}
129129
}
130130

131+
test("HDFSMetadataLog: parseVersion") {
132+
withTempDir { dir =>
133+
val metadataLog = new HDFSMetadataLog[String](spark, dir.getAbsolutePath)
134+
def assertLogFileMalformed(func: => Int): Unit = {
135+
val e = intercept[IllegalStateException] { func }
136+
assert(e.getMessage.contains(s"Log file was malformed: failed to read correct log version"))
137+
}
138+
assertLogFileMalformed { metadataLog.parseVersion("", 100) }
139+
assertLogFileMalformed { metadataLog.parseVersion("xyz", 100) }
140+
assertLogFileMalformed { metadataLog.parseVersion("v10.x", 100) }
141+
assertLogFileMalformed { metadataLog.parseVersion("10", 100) }
142+
assertLogFileMalformed { metadataLog.parseVersion("v0", 100) }
143+
assertLogFileMalformed { metadataLog.parseVersion("v-10", 100) }
144+
145+
assert(metadataLog.parseVersion("v10", 10) === 10)
146+
assert(metadataLog.parseVersion("v10", 100) === 10)
147+
148+
val e = intercept[IllegalStateException] { metadataLog.parseVersion("v200", 100) }
149+
Seq(
150+
"maximum supported log version is v100, but encountered v200",
151+
"produced by a newer version of Spark and cannot be read by this version"
152+
).foreach { message =>
153+
assert(e.getMessage.contains(message))
154+
}
155+
}
156+
}
157+
131158
testWithUninterruptibleThread("HDFSMetadataLog: restart") {
132159
withTempDir { temp =>
133160
val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)

0 commit comments

Comments
 (0)