Skip to content

Commit 16f7c4c

Browse files
authored
Merge pull request #1 from anishshri-db/story/state-v2-changelog
Add support for changelog checkpointing with new operator
2 parents 3a93b92 + 1ff70db commit 16f7c4c

File tree

7 files changed

+413
-163
lines changed

7 files changed

+413
-163
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -144,12 +144,6 @@ class RocksDB(
144144
@volatile private var numKeysOnWritingVersion = 0L
145145
@volatile private var fileManagerMetrics = RocksDBFileManagerMetrics.EMPTY_METRICS
146146

147-
// TODO: support changelog checkpointing with column families
148-
if (useColumnFamilies && enableChangelogCheckpointing) {
149-
throw new RuntimeException("Changelog checkpointing is not supported with multiple " +
150-
"column families")
151-
}
152-
153147
// SPARK-46249 - Keep track of recorded metrics per version which can be used for querying later
154148
// Updates and access to recordedMetrics are protected by the DB instance lock
155149
@GuardedBy("acquireLock")
@@ -205,7 +199,7 @@ class RocksDB(
205199
if (enableChangelogCheckpointing && !readOnly) {
206200
// Make sure we don't leak resource.
207201
changelogWriter.foreach(_.abort())
208-
changelogWriter = Some(fileManager.getChangeLogWriter(version + 1))
202+
changelogWriter = Some(fileManager.getChangeLogWriter(version + 1, useColumnFamilies))
209203
}
210204
this
211205
}
@@ -217,12 +211,18 @@ class RocksDB(
217211
for (v <- loadedVersion + 1 to endVersion) {
218212
var changelogReader: StateStoreChangelogReader = null
219213
try {
220-
changelogReader = fileManager.getChangelogReader(v)
221-
changelogReader.foreach { case (key, value) =>
222-
if (value != null) {
223-
put(key, value)
224-
} else {
225-
remove(key)
214+
changelogReader = fileManager.getChangelogReader(v, useColumnFamilies)
215+
changelogReader.foreach { case (recordType, key, value, colFamilyName) =>
216+
if (useColumnFamilies && !checkColFamilyExists(colFamilyName)) {
217+
createColFamilyIfAbsent(colFamilyName)
218+
}
219+
220+
recordType match {
221+
case RecordType.PUT_RECORD =>
222+
put(key, value, colFamilyName)
223+
224+
case RecordType.DELETE_RECORD =>
225+
remove(key, colFamilyName)
226226
}
227227
}
228228
} finally {
@@ -289,6 +289,7 @@ class RocksDB(
289289
}
290290
}
291291
db.put(colFamilyNameToHandleMap(colFamilyName), writeOptions, key, value)
292+
changelogWriter.foreach(_.put(key, value, colFamilyName))
292293
} else {
293294
if (conf.trackTotalNumberOfRows) {
294295
val oldValue = db.get(readOptions, key)
@@ -319,6 +320,7 @@ class RocksDB(
319320
}
320321
}
321322
db.delete(colFamilyNameToHandleMap(colFamilyName), writeOptions, key)
323+
changelogWriter.foreach(_.delete(key, colFamilyName))
322324
} else {
323325
if (conf.trackTotalNumberOfRows) {
324326
val value = db.get(readOptions, key)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -149,21 +149,31 @@ class RocksDBFileManager(
149149

150150
@volatile private var rootDirChecked: Boolean = false
151151

152-
def getChangeLogWriter(version: Long): StateStoreChangelogWriter = {
152+
def getChangeLogWriter(version: Long,
153+
useColumnFamilies: Boolean = false): StateStoreChangelogWriter = {
153154
val changelogFile = dfsChangelogFile(version)
154155
if (!rootDirChecked) {
155156
val rootDir = new Path(dfsRootDir)
156157
if (!fm.exists(rootDir)) fm.mkdirs(rootDir)
157158
rootDirChecked = true
158159
}
159-
val changelogWriter = new StateStoreChangelogWriter(fm, changelogFile, codec)
160+
val changelogWriter = if (useColumnFamilies) {
161+
new StateStoreChangelogWriterV2(fm, changelogFile, codec)
162+
} else {
163+
new StateStoreChangelogWriterV1(fm, changelogFile, codec)
164+
}
160165
changelogWriter
161166
}
162167

163168
// Get the changelog file at version
164-
def getChangelogReader(version: Long): StateStoreChangelogReader = {
169+
def getChangelogReader(version: Long,
170+
useColumnFamilies: Boolean = false): StateStoreChangelogReader = {
165171
val changelogFile = dfsChangelogFile(version)
166-
new StateStoreChangelogReader(fm, changelogFile, codec)
172+
if (useColumnFamilies) {
173+
new StateStoreChangelogReaderV2(fm, changelogFile, codec)
174+
} else {
175+
new StateStoreChangelogReaderV1(fm, changelogFile, codec)
176+
}
167177
}
168178

169179
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala

Lines changed: 191 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,20 @@ import org.apache.spark.sql.execution.streaming.CheckpointFileManager.Cancellabl
3333
import org.apache.spark.util.NextIterator
3434

3535
/**
36-
* Write changes to the key value state store instance to a changelog file.
37-
* There are 2 types of records, put and delete.
38-
* A put record is written as: | key length | key content | value length | value content |
39-
* A delete record is written as: | key length | key content | -1 |
40-
* Write an Int -1 to signal the end of file.
41-
* The overall changelog format is: | put record | delete record | ... | put record | -1 |
36+
* Enum used to write record types to changelog files used with RocksDBStateStoreProvider.
37+
*/
38+
object RecordType extends Enumeration {
39+
type RecordType = Value
40+
41+
val PUT_RECORD = Value("put_record")
42+
val DELETE_RECORD = Value("delete_record")
43+
}
44+
45+
/**
46+
* Base class for state store changelog writer
47+
* @param fm - checkpoint file manager used to manage streaming query checkpoint
48+
* @param file - name of file to use to write changelog
49+
* @param compressionCodec - compression method using for writing changelog file
4250
*/
4351
class StateStoreChangelogWriter(
4452
fm: CheckpointFileManager,
@@ -52,25 +60,27 @@ class StateStoreChangelogWriter(
5260

5361
private var backingFileStream: CancellableFSDataOutputStream =
5462
fm.createAtomic(file, overwriteIfPossible = true)
55-
private var compressedStream: DataOutputStream = compressStream(backingFileStream)
63+
protected var compressedStream: DataOutputStream = compressStream(backingFileStream)
5664
var size = 0
5765

5866
def put(key: Array[Byte], value: Array[Byte]): Unit = {
59-
assert(compressedStream != null)
60-
compressedStream.writeInt(key.length)
61-
compressedStream.write(key)
62-
compressedStream.writeInt(value.length)
63-
compressedStream.write(value)
64-
size += 1
67+
throw new UnsupportedOperationException("Operation not supported on base changelog writer " +
68+
"implementation")
69+
}
70+
71+
def put(key: Array[Byte], value: Array[Byte], colFamilyName: String): Unit = {
72+
throw new UnsupportedOperationException("Operation not supported on base changelog writer " +
73+
"implementation")
6574
}
6675

6776
def delete(key: Array[Byte]): Unit = {
68-
assert(compressedStream != null)
69-
compressedStream.writeInt(key.length)
70-
compressedStream.write(key)
71-
// -1 in the value field means record deletion.
72-
compressedStream.writeInt(-1)
73-
size += 1
77+
throw new UnsupportedOperationException("Operation not supported on base changelog writer " +
78+
"implementation")
79+
}
80+
81+
def delete(key: Array[Byte], colFamilyName: String): Unit = {
82+
throw new UnsupportedOperationException("Operation not supported on base changelog writer " +
83+
"implementation")
7484
}
7585

7686
def abort(): Unit = {
@@ -109,18 +119,94 @@ class StateStoreChangelogWriter(
109119
}
110120
}
111121

122+
/**
123+
* Write changes to the key value state store instance to a changelog file.
124+
* There are 2 types of records, put and delete.
125+
* A put record is written as: | key length | key content | value length | value content |
126+
* A delete record is written as: | key length | key content | -1 |
127+
* Write an Int -1 to signal the end of file.
128+
* The overall changelog format is: | put record | delete record | ... | put record | -1 |
129+
*/
130+
class StateStoreChangelogWriterV1(
131+
fm: CheckpointFileManager,
132+
file: Path,
133+
compressionCodec: CompressionCodec)
134+
extends StateStoreChangelogWriter(fm, file, compressionCodec) {
135+
136+
override def put(key: Array[Byte], value: Array[Byte]): Unit = {
137+
assert(compressedStream != null)
138+
compressedStream.writeInt(key.size)
139+
compressedStream.write(key)
140+
compressedStream.writeInt(value.size)
141+
compressedStream.write(value)
142+
size += 1
143+
}
144+
145+
override def delete(key: Array[Byte]): Unit = {
146+
assert(compressedStream != null)
147+
compressedStream.writeInt(key.size)
148+
compressedStream.write(key)
149+
// -1 in the value field means record deletion.
150+
compressedStream.writeInt(-1)
151+
size += 1
152+
}
153+
}
112154

113155
/**
114-
* Read an iterator of change record from the changelog file.
115-
* A record is represented by ByteArrayPair(key: Array[Byte], value: Array[Byte])
116-
* A put record is returned as a ByteArrayPair(key, value)
117-
* A delete record is return as a ByteArrayPair(key, null)
156+
* Write changes to the key value state store instance to a changelog file.
157+
* There are 2 types of records, put and delete.
158+
* A put record is written as: | record type | key length
159+
* | key content | value length | value content | col family name length | col family name | -1 |
160+
* A delete record is written as: | record type | key length | key content | -1
161+
* | col family name length | col family name | -1 |
162+
* Write an Int -1 to signal the end of file.
163+
* The overall changelog format is: | put record | delete record | ... | put record | -1 |
164+
*/
165+
class StateStoreChangelogWriterV2(
166+
fm: CheckpointFileManager,
167+
file: Path,
168+
compressionCodec: CompressionCodec)
169+
extends StateStoreChangelogWriter(fm, file, compressionCodec) {
170+
171+
override def put(key: Array[Byte], value: Array[Byte], colFamilyName: String): Unit = {
172+
assert(compressedStream != null)
173+
compressedStream.writeInt(RecordType.PUT_RECORD.toString.getBytes.size)
174+
compressedStream.write(RecordType.PUT_RECORD.toString.getBytes)
175+
compressedStream.writeInt(key.size)
176+
compressedStream.write(key)
177+
compressedStream.writeInt(value.size)
178+
compressedStream.write(value)
179+
compressedStream.writeInt(colFamilyName.getBytes.size)
180+
compressedStream.write(colFamilyName.getBytes)
181+
size += 1
182+
}
183+
184+
override def delete(key: Array[Byte], colFamilyName: String): Unit = {
185+
assert(compressedStream != null)
186+
compressedStream.writeInt(RecordType.DELETE_RECORD.toString.getBytes.size)
187+
compressedStream.write(RecordType.DELETE_RECORD.toString.getBytes)
188+
compressedStream.writeInt(key.size)
189+
compressedStream.write(key)
190+
// -1 in the value field means record deletion.
191+
compressedStream.writeInt(-1)
192+
compressedStream.writeInt(colFamilyName.getBytes.size)
193+
compressedStream.write(colFamilyName.getBytes)
194+
size += 1
195+
}
196+
}
197+
198+
/**
199+
* Base class for state store changelog reader
200+
* @param fm - checkpoint file manager used to manage streaming query checkpoint
201+
* @param fileToRead - name of file to use to read changelog
202+
* @param compressionCodec - de-compression method using for reading changelog file
118203
*/
119204
class StateStoreChangelogReader(
120205
fm: CheckpointFileManager,
121206
fileToRead: Path,
122207
compressionCodec: CompressionCodec)
123-
extends NextIterator[(Array[Byte], Array[Byte])] with Logging {
208+
extends NextIterator[(RecordType.Value, Array[Byte], Array[Byte], String)]
209+
with Logging {
124210

125211
private def decompressStream(inputStream: DataInputStream): DataInputStream = {
126212
val compressed = compressionCodec.compressedInputStream(inputStream)
@@ -133,11 +219,30 @@ class StateStoreChangelogReader(
133219
case f: FileNotFoundException =>
134220
throw QueryExecutionErrors.failedToReadStreamingStateFileError(fileToRead, f)
135221
}
136-
private val input: DataInputStream = decompressStream(sourceStream)
222+
protected val input: DataInputStream = decompressStream(sourceStream)
137223

138224
def close(): Unit = { if (input != null) input.close() }
139225

140-
override def getNext(): (Array[Byte], Array[Byte]) = {
226+
override def getNext(): (RecordType.Value, Array[Byte], Array[Byte], String) = {
227+
throw new UnsupportedOperationException("Iterator operations not supported on base " +
228+
"changelog reader implementation")
229+
}
230+
}
231+
232+
/**
233+
* Read an iterator of change record from the changelog file.
234+
* A record is represented by ByteArrayPair(recordType: RecordType.Value,
235+
* key: Array[Byte], value: Array[Byte], colFamilyName: String)
236+
* A put record is returned as a ByteArrayPair(recordType, key, value, colFamilyName)
237+
* A delete record is return as a ByteArrayPair(recordType, key, null, colFamilyName)
238+
*/
239+
class StateStoreChangelogReaderV1(
240+
fm: CheckpointFileManager,
241+
fileToRead: Path,
242+
compressionCodec: CompressionCodec) extends StateStoreChangelogReader(fm, fileToRead,
243+
compressionCodec) {
244+
245+
override def getNext(): (RecordType.Value, Array[Byte], Array[Byte], String) = {
141246
val keySize = input.readInt()
142247
// A -1 key size mean end of file.
143248
if (keySize == -1) {
@@ -153,12 +258,70 @@ class StateStoreChangelogReader(
153258
val valueSize = input.readInt()
154259
if (valueSize < 0) {
155260
// A deletion record
156-
(keyBuffer, null)
261+
(RecordType.DELETE_RECORD, keyBuffer, null, StateStore.DEFAULT_COL_FAMILY_NAME)
157262
} else {
158263
val valueBuffer = new Array[Byte](valueSize)
159264
ByteStreams.readFully(input, valueBuffer, 0, valueSize)
160265
// A put record.
161-
(keyBuffer, valueBuffer)
266+
(RecordType.PUT_RECORD, keyBuffer, valueBuffer, StateStore.DEFAULT_COL_FAMILY_NAME)
267+
}
268+
}
269+
}
270+
}
271+
272+
/**
273+
* Read an iterator of change record from the changelog file.
274+
* A record is represented by ByteArrayPair(recordType: RecordType.Value,
275+
* key: Array[Byte], value: Array[Byte], colFamilyName: String)
276+
* A put record is returned as a ByteArrayPair(recordType, key, value, colFamilyName)
277+
* A delete record is return as a ByteArrayPair(recordType, key, null, colFamilyName)
278+
*/
279+
class StateStoreChangelogReaderV2(
280+
fm: CheckpointFileManager,
281+
fileToRead: Path,
282+
compressionCodec: CompressionCodec) extends StateStoreChangelogReader(fm, fileToRead,
283+
compressionCodec) {
284+
285+
private def parseBuffer(input: DataInputStream): Array[Byte] = {
286+
val blockSize = input.readInt()
287+
val blockBuffer = new Array[Byte](blockSize)
288+
ByteStreams.readFully(input, blockBuffer, 0, blockSize)
289+
blockBuffer
290+
}
291+
292+
override def getNext(): (RecordType.Value, Array[Byte], Array[Byte], String) = {
293+
val recordTypeSize = input.readInt()
294+
// A -1 key size mean end of file.
295+
if (recordTypeSize == -1) {
296+
finished = true
297+
null
298+
} else if (recordTypeSize < 0) {
299+
throw new IOException(
300+
s"Error reading streaming state file $fileToRead: " +
301+
s"record type size cannot be $recordTypeSize")
302+
} else {
303+
val recordTypeBuffer = new Array[Byte](recordTypeSize)
304+
ByteStreams.readFully(input, recordTypeBuffer, 0, recordTypeSize)
305+
val recordTypeStr = recordTypeBuffer.map(_.toChar).mkString
306+
val recordType = RecordType.withName(recordTypeStr)
307+
recordType match {
308+
case RecordType.PUT_RECORD =>
309+
val keyBuffer = parseBuffer(input)
310+
val valueBuffer = parseBuffer(input)
311+
val colFamilyNameBuffer = parseBuffer(input)
312+
(RecordType.PUT_RECORD, keyBuffer, valueBuffer,
313+
colFamilyNameBuffer.map(_.toChar).mkString)
314+
315+
case RecordType.DELETE_RECORD =>
316+
val keyBuffer = parseBuffer(input)
317+
val valueSize = input.readInt()
318+
assert(valueSize == -1)
319+
val colFamilyNameBuffer = parseBuffer(input)
320+
(RecordType.DELETE_RECORD, keyBuffer, null,
321+
colFamilyNameBuffer.map(_.toChar).mkString)
322+
323+
case _ =>
324+
throw new IOException("Failed to process unknown record type")
162325
}
163326
}
164327
}

0 commit comments

Comments
 (0)