diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 5205a2d568ac..1bc4962093e1 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -135,8 +135,11 @@ class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec { private[this] val defaultSeed: Int = 0x9747b28c // LZ4BlockOutputStream.DEFAULT_SEED override def compressedOutputStream(s: OutputStream): OutputStream = { + compressedOutputStream(s, syncFlush = false) + } + + def compressedOutputStream(s: OutputStream, syncFlush: Boolean): OutputStream = { val blockSize = conf.get(IO_COMPRESSION_LZ4_BLOCKSIZE).toInt - val syncFlush = false new LZ4BlockOutputStream( s, blockSize, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 9be0497e4660..d0a8353fe13c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1464,6 +1464,21 @@ object SQLConf { .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(TimeUnit.MINUTES.toMillis(10)) // 10 minutes + val FILE_SINK_LOG_WRITE_METADATA_VERSION = + buildConf("spark.sql.streaming.fileSink.log.writeMetadataVersion") + .doc("The version of file stream sink log metadata. By default the version is set to " + + "the highest version current Spark handles, as higher version tends to be better in " + + "some aspects. You may want to set this to lower value when the outputs should be " + + "readable from lower version of Spark. " + + "Note that it doesn't 'rewrite' the old batch files: to ensure the metadata to be " + + "read by lower version of Spark, the metadata log should be written from the scratch, " + + "or at least one compact batch should be written with configured version. " + + "Available metadata versions: 1 (all versions) 2 (3.1.0+)") + .version("3.1.0") + .intConf + .checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2") + .createOptional + val FILE_SOURCE_LOG_DELETION = buildConf("spark.sql.streaming.fileSource.log.deletion") .internal() .doc("Whether to delete the expired log files in file stream source.") @@ -2808,6 +2823,8 @@ class SQLConf extends Serializable with Logging { def fileSinkLogCleanupDelay: Long = getConf(FILE_SINK_LOG_CLEANUP_DELAY) + def fileSinkWriteMetadataLogVersion: Option[Int] = getConf(FILE_SINK_LOG_WRITE_METADATA_VERSION) + def fileSourceLogDeletion: Boolean = getConf(FILE_SOURCE_LOG_DELETION) def fileSourceLogCompactInterval: Int = getConf(FILE_SOURCE_LOG_COMPACT_INTERVAL) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index 29537cc0e573..45c3d2e20c62 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -17,16 +17,19 @@ package org.apache.spark.sql.execution.streaming -import java.io.{InputStream, IOException, OutputStream} +import java.io.{DataInputStream, DataOutputStream, InputStream, IOException, OutputStream} import java.nio.charset.StandardCharsets.UTF_8 +import scala.collection.mutable import scala.io.{Source => IOSource} import scala.reflect.ClassTag +import com.google.common.io.ByteStreams import org.apache.hadoop.fs.Path import org.json4s.NoTypeHints import org.json4s.jackson.Serialization +import org.apache.spark.io.LZ4CompressionCodec import org.apache.spark.sql.SparkSession import org.apache.spark.util.{SizeEstimator, Utils} @@ -68,6 +71,19 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( protected def defaultCompactInterval: Int + /** + * In some case, log files being written from the application A should be able to be read from + * application B, which Spark versions between twos may not be same. To support writing log file + * which is readable from lower version of Spark, this method receives additional metadata log + * version which will be only used for writing. + * + * Note that this class doesn't "rewrite" the old batch files: to ensure the metadata to be read + * by lower version of Spark, the metadata log should be written with proper version from the + * scratch, or at least one compact batch should be written with proper version. (so that reader + * will ignore previous batch logs which may be written with higher version) + */ + protected def writeMetadataLogVersion: Option[Int] = None + protected final lazy val compactInterval: Int = { // SPARK-18187: "compactInterval" can be set by user via defaultCompactInterval. // If there are existing log entries, then we should ensure a compatible compactInterval @@ -106,6 +122,8 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( interval } + private val sparkConf = sparkSession.sparkContext.getConf + /** * Filter out the obsolete logs. */ @@ -132,24 +150,88 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( } } + protected def serializeEntryToV2(data: T): Array[Byte] + protected def deserializeEntryFromV2(serialized: Array[Byte]): T + override def serialize(logData: Array[T], out: OutputStream): Unit = { // called inside a try-finally where the underlying stream is closed in the caller - out.write(("v" + metadataLogVersion).getBytes(UTF_8)) + val version = writeMetadataLogVersion.getOrElse(metadataLogVersion) + out.write(("v" + version).getBytes(UTF_8)) + version match { + case 1 => serializeToV1(out, logData) + case 2 => serializeToV2(out, logData) + case _ => + throw new IllegalStateException(s"UnsupportedLogVersion: unknown log version is provided" + + s", v$metadataLogVersion.") + } + } + + private def serializeToV1(out: OutputStream, logData: Array[T]): Unit = { logData.foreach { data => out.write('\n') out.write(Serialization.write(data).getBytes(UTF_8)) } } + private def serializeToV2(out: OutputStream, logData: Array[T]): Unit = { + out.write('\n') + val dos = compressStream(out) + if (logData.nonEmpty) { + logData.foreach { data => + val serialized = serializeEntryToV2(data) + dos.writeInt(serialized.length) + dos.write(serialized) + } + } + dos.writeInt(-1) + dos.flush() + } + override def deserialize(in: InputStream): Array[T] = { - val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines() - if (!lines.hasNext) { + val line = readLine(in) + if (line == null || line.isEmpty) { throw new IllegalStateException("Incomplete log file") } - validateVersion(lines.next(), metadataLogVersion) + + val version = parseVersion(line) + version match { + case 1 if version <= metadataLogVersion => deserializeFromV1(in) + case 2 if version <= metadataLogVersion => deserializeFromV2(in) + case version => + throw new IllegalStateException(s"UnsupportedLogVersion: maximum supported log version " + + s"is v${metadataLogVersion}, but encountered v$version. The log file was produced " + + s"by a newer version of Spark and cannot be read by this version. Please upgrade.") + } + } + + private def deserializeFromV1(in: InputStream): Array[T] = { + val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines() lines.map(Serialization.read[T]).toArray } + private def deserializeFromV2(in: InputStream): Array[T] = { + val list = new scala.collection.mutable.ArrayBuffer[T] + + val dis = decompressStream(in) + var eof = false + + while (!eof) { + val size = dis.readInt() + if (size == -1) { + eof = true + } else if (size < 0) { + throw new IOException( + s"Error to deserialize file: size cannot be $size") + } else { + val buffer = new Array[Byte](size) + ByteStreams.readFully(dis, buffer, 0, size) + list += deserializeEntryFromV2(buffer) + } + } + + list.toArray + } + override def add(batchId: Long, logs: Array[T]): Boolean = { val batchAdded = if (isCompactionBatch(batchId, compactInterval)) { @@ -284,6 +366,33 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( } } } + + private def readLine(in: InputStream): String = { + val line = new mutable.ArrayBuffer[Byte]() + var eol = false + while (!eol) { + val b = in.read() + if (b == -1 || b == '\n') { + eol = true + } else { + line += b.toByte + } + } + + new String(line.toArray, UTF_8) + } + + private def compressStream(outputStream: OutputStream): DataOutputStream = { + // set syncFlush to true since we don't call close for compressed stream but call flush instead + val compressed = new LZ4CompressionCodec(sparkConf) + .compressedOutputStream(outputStream, syncFlush = true) + new DataOutputStream(compressed) + } + + private def decompressStream(inputStream: InputStream): DataInputStream = { + val compressed = new LZ4CompressionCodec(sparkConf).compressedInputStream(inputStream) + new DataInputStream(compressed) + } } object CompactibleFileStreamLog { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala index 17b6874a6164..ceaab47589d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.streaming +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream} import java.net.URI import org.apache.hadoop.fs.{FileStatus, Path} @@ -97,6 +98,10 @@ class FileStreamSinkLog( s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $defaultCompactInterval) " + "to a positive value.") + // The validation of version is done in SQLConf. + protected override val writeMetadataLogVersion: Option[Int] = + sparkSession.sessionState.conf.fileSinkWriteMetadataLogVersion + override def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = { val deletedFiles = logs.filter(_.action == FileStreamSinkLog.DELETE_ACTION).map(_.path).toSet if (deletedFiles.isEmpty) { @@ -105,10 +110,45 @@ class FileStreamSinkLog( logs.filter(f => !deletedFiles.contains(f.path)) } } + + override protected def serializeEntryToV2(data: SinkFileStatus): Array[Byte] = { + val baos = new ByteArrayOutputStream() + val dos = new DataOutputStream(baos) + + dos.writeUTF(data.path) + dos.writeLong(data.size) + dos.writeBoolean(data.isDir) + dos.writeLong(data.modificationTime) + dos.writeInt(data.blockReplication) + dos.writeLong(data.blockSize) + dos.writeUTF(data.action) + dos.close() + + baos.toByteArray + } + + override protected def deserializeEntryFromV2(serialized: Array[Byte]): SinkFileStatus = { + val bais = new ByteArrayInputStream(serialized) + val dis = new DataInputStream(bais) + + val status = SinkFileStatus( + dis.readUTF(), + dis.readLong(), + dis.readBoolean(), + dis.readLong(), + dis.readInt(), + dis.readLong(), + dis.readUTF()) + + dis.close() + + status + } } object FileStreamSinkLog { - val VERSION = 1 + val VERSION = 2 + val SUPPORTED_VERSIONS = Seq(1, 2) val DELETE_ACTION = "delete" val ADD_ACTION = "add" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala index c43887774c13..8d386268625b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.streaming +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream} import java.util.{LinkedHashMap => JLinkedHashMap} import java.util.Map.Entry @@ -122,8 +123,34 @@ class FileStreamSourceLog( } batches } + + override protected def serializeEntryToV2(data: FileEntry): Array[Byte] = { + val baos = new ByteArrayOutputStream() + val dos = new DataOutputStream(baos) + + dos.writeUTF(data.path) + dos.writeLong(data.timestamp) + dos.writeLong(data.batchId) + dos.close() + + baos.toByteArray + } + + override protected def deserializeEntryFromV2(serialized: Array[Byte]): FileEntry = { + val bais = new ByteArrayInputStream(serialized) + val dis = new DataInputStream(bais) + + val entry = FileEntry( + dis.readUTF(), + dis.readLong(), + dis.readLong()) + + dis.close() + + entry + } } object FileStreamSourceLog { - val VERSION = 1 + val VERSION = 2 } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 5c86f8a50dda..ddac2c269a27 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -249,6 +249,16 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: * "v123xyz" etc.) */ private[sql] def validateVersion(text: String, maxSupportedVersion: Int): Int = { + val version = parseVersion(text) + if (version > maxSupportedVersion) { + throw new IllegalStateException(s"UnsupportedLogVersion: maximum supported log version " + + s"is v${maxSupportedVersion}, but encountered v$version. The log file was produced " + + s"by a newer version of Spark and cannot be read by this version. Please upgrade.") + } + version + } + + private[sql] def parseVersion(text: String): Int = { if (text.length > 0 && text(0) == 'v') { val version = try { @@ -258,15 +268,8 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: throw new IllegalStateException(s"Log file was malformed: failed to read correct log " + s"version from $text.") } - if (version > 0) { - if (version > maxSupportedVersion) { - throw new IllegalStateException(s"UnsupportedLogVersion: maximum supported log version " + - s"is v${maxSupportedVersion}, but encountered v$version. The log file was produced " + - s"by a newer version of Spark and cannot be read by this version. Please upgrade.") - } else { - return version - } - } + + if (version > 0) return version } // reaching here means we failed to read the correct log version diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala index ead17d50b4e1..79235e6893a3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala @@ -109,45 +109,49 @@ class CompactibleFileStreamLogSuite extends SharedSparkSession { }) } - test("serialize") { + test("serialize & deserialize") { withFakeCompactibleFileStreamLog( fileCleanupDelayMs = Long.MaxValue, defaultCompactInterval = 3, defaultMinBatchesToRetain = 1, compactibleLog => { val logs = Array("entry_1", "entry_2", "entry_3") - val expected = s"""v${FakeCompactibleFileStreamLog.VERSION} - |"entry_1" - |"entry_2" - |"entry_3"""".stripMargin + val baos = new ByteArrayOutputStream() compactibleLog.serialize(logs, baos) - assert(expected === baos.toString(UTF_8.name())) + + val actualLogs = compactibleLog.deserialize(new ByteArrayInputStream(baos.toByteArray)) + assert(actualLogs === logs) baos.reset() compactibleLog.serialize(Array(), baos) - assert(s"v${FakeCompactibleFileStreamLog.VERSION}" === baos.toString(UTF_8.name())) + val actualLogs2 = compactibleLog.deserialize(new ByteArrayInputStream(baos.toByteArray)) + assert(actualLogs2.isEmpty) }) } - test("deserialize") { - withFakeCompactibleFileStreamLog( - fileCleanupDelayMs = Long.MaxValue, - defaultCompactInterval = 3, - defaultMinBatchesToRetain = 1, - compactibleLog => { - val logs = s"""v${FakeCompactibleFileStreamLog.VERSION} - |"entry_1" - |"entry_2" - |"entry_3"""".stripMargin - val expected = Array("entry_1", "entry_2", "entry_3") - assert(expected === - compactibleLog.deserialize(new ByteArrayInputStream(logs.getBytes(UTF_8)))) - - assert(Nil === - compactibleLog.deserialize( - new ByteArrayInputStream(s"v${FakeCompactibleFileStreamLog.VERSION}".getBytes(UTF_8)))) - }) + test("write older version of metadata for compatibility") { + withTempDir { dir => + def newFakeCompactibleFileStreamLog( + readVersion: Int, + writeVersion: Option[Int]): FakeCompactibleFileStreamLog = + new FakeCompactibleFileStreamLog( + readVersion, + writeVersion, + _fileCleanupDelayMs = Long.MaxValue, // this param does not matter here in this test case + _defaultCompactInterval = 3, // this param does not matter here in this test case + _defaultMinBatchesToRetain = 1, // this param does not matter here in this test case + spark, + dir.getCanonicalPath) + + // writer understands version 2 but to ensure compatibility it sets the write version to 1 + val writer = newFakeCompactibleFileStreamLog(2, Some(1)) + // suppose a reader only understand version 1 + val reader = newFakeCompactibleFileStreamLog(1, None) + writer.add(0, Array("entry")) + // reader can read the metadata log writer just wrote + assert(Array("entry") === reader.get(0).get) + } } test("deserialization log written by future version") { @@ -155,6 +159,7 @@ class CompactibleFileStreamLogSuite extends SharedSparkSession { def newFakeCompactibleFileStreamLog(version: Int): FakeCompactibleFileStreamLog = new FakeCompactibleFileStreamLog( version, + None, _fileCleanupDelayMs = Long.MaxValue, // this param does not matter here in this test case _defaultCompactInterval = 3, // this param does not matter here in this test case _defaultMinBatchesToRetain = 1, // this param does not matter here in this test case @@ -263,6 +268,7 @@ class CompactibleFileStreamLogSuite extends SharedSparkSession { withTempDir { file => val compactibleLog = new FakeCompactibleFileStreamLog( FakeCompactibleFileStreamLog.VERSION, + None, fileCleanupDelayMs, defaultCompactInterval, defaultMinBatchesToRetain, @@ -274,11 +280,12 @@ class CompactibleFileStreamLogSuite extends SharedSparkSession { } object FakeCompactibleFileStreamLog { - val VERSION = 1 + val VERSION = 2 } class FakeCompactibleFileStreamLog( metadataLogVersion: Int, + _writeMetadataLogVersion: Option[Int], _fileCleanupDelayMs: Long, _defaultCompactInterval: Int, _defaultMinBatchesToRetain: Int, @@ -296,7 +303,17 @@ class FakeCompactibleFileStreamLog( override protected def defaultCompactInterval: Int = _defaultCompactInterval + override protected def writeMetadataLogVersion: Option[Int] = _writeMetadataLogVersion + override protected val minBatchesToRetain: Int = _defaultMinBatchesToRetain override def compactLogs(logs: Seq[String]): Seq[String] = logs + + override protected def serializeEntryToV2(data: String): Array[Byte] = { + data.getBytes(UTF_8) + } + + override protected def deserializeEntryFromV2(serialized: Array[Byte]): String = { + new String(serialized, UTF_8) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala index 6d615b5ef044..3a6ef124f46a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala @@ -37,7 +37,25 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { import CompactibleFileStreamLog._ import FileStreamSinkLog._ - test("compactLogs") { + def executeFuncWithMetadataVersion(metadataVersion: Int, func: => Any): Unit = { + withSQLConf( + Seq(SQLConf.FILE_SINK_LOG_WRITE_METADATA_VERSION.key -> metadataVersion.toString): _*) { + func + } + } + + // This makes sure tests are passing for all supported versions on write version, where + // the read version is set to the highest supported version. This ensures Spark can read + // older versions of file stream sink metadata log. + def testWithAllMetadataVersions(name: String)(func: => Any): Unit = { + for (version <- FileStreamSinkLog.SUPPORTED_VERSIONS) { + test(s"$name - metadata version $version") { + executeFuncWithMetadataVersion(version, func) + } + } + } + + testWithAllMetadataVersions("compactLogs") { withFileStreamSinkLog { sinkLog => val logs = Seq( newFakeSinkFileStatus("/a/b/x", FileStreamSinkLog.ADD_ACTION), @@ -53,7 +71,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { } } - test("serialize") { + testWithAllMetadataVersions("serialize & deserialize") { withFileStreamSinkLog { sinkLog => val logs = Array( SinkFileStatus( @@ -81,63 +99,20 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { blockSize = 30000L, action = FileStreamSinkLog.ADD_ACTION)) - // scalastyle:off - val expected = s"""v$VERSION - |{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add"} - |{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"} - |{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin - // scalastyle:on val baos = new ByteArrayOutputStream() sinkLog.serialize(logs, baos) - assert(expected === baos.toString(UTF_8.name())) - baos.reset() - sinkLog.serialize(Array(), baos) - assert(s"v$VERSION" === baos.toString(UTF_8.name())) - } - } - - test("deserialize") { - withFileStreamSinkLog { sinkLog => - // scalastyle:off - val logs = s"""v$VERSION - |{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add"} - |{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"} - |{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin - // scalastyle:on - - val expected = Seq( - SinkFileStatus( - path = "/a/b/x", - size = 100L, - isDir = false, - modificationTime = 1000L, - blockReplication = 1, - blockSize = 10000L, - action = FileStreamSinkLog.ADD_ACTION), - SinkFileStatus( - path = "/a/b/y", - size = 200L, - isDir = false, - modificationTime = 2000L, - blockReplication = 2, - blockSize = 20000L, - action = FileStreamSinkLog.DELETE_ACTION), - SinkFileStatus( - path = "/a/b/z", - size = 300L, - isDir = false, - modificationTime = 3000L, - blockReplication = 3, - blockSize = 30000L, - action = FileStreamSinkLog.ADD_ACTION)) - assert(expected === sinkLog.deserialize(new ByteArrayInputStream(logs.getBytes(UTF_8)))) + val actualLogs = sinkLog.deserialize(new ByteArrayInputStream(baos.toByteArray)) + assert(actualLogs === logs) - assert(Nil === sinkLog.deserialize(new ByteArrayInputStream(s"v$VERSION".getBytes(UTF_8)))) + baos.reset() + sinkLog.serialize(Array(), baos) + val actualLogs2 = sinkLog.deserialize(new ByteArrayInputStream(baos.toByteArray)) + assert(actualLogs2.isEmpty) } } - test("compact") { + testWithAllMetadataVersions("compact") { withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") { withFileStreamSinkLog { sinkLog => for (batchId <- 0 to 10) { @@ -157,7 +132,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { } } - test("delete expired file") { + testWithAllMetadataVersions("delete expired file") { // Set FILE_SINK_LOG_CLEANUP_DELAY to 0 so that we can detect the deleting behaviour // deterministically and one min batches to retain withSQLConf( @@ -233,7 +208,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { } } - test("read Spark 2.1.0 log format") { + testWithAllMetadataVersions("read Spark 2.1.0 log format") { assert(readFromResource("file-sink-log-version-2.1.0") === Seq( // SinkFileStatus("/a/b/0", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION), -> deleted SinkFileStatus("/a/b/1", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION), @@ -248,7 +223,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { )) } - test("getLatestBatchId") { + testWithAllMetadataVersions("getLatestBatchId") { withCountOpenLocalFileSystemAsLocalFileSystem { val scheme = CountOpenLocalFileSystem.scheme withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {