Skip to content

Commit 22d7e66

Browse files
committed
Fixed a lot of things
1 parent d963efa commit 22d7e66

File tree

5 files changed

+100
-57
lines changed

5 files changed

+100
-57
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ class HDFSBackedStateStoreProvider(
143143
*/
144144
override def iterator(): Iterator[InternalRow] = {
145145
verify(state == COMMITTED, "Cannot get iterator of store data before comitting")
146-
HDFSBackedStateStoreProvider.this.iterator(version)
146+
HDFSBackedStateStoreProvider.this.iterator(newVersion)
147147
}
148148

149149
/**
@@ -245,10 +245,8 @@ class HDFSBackedStateStoreProvider(
245245

246246
/** Load the required version of the map data from the backing files */
247247
private def loadMap(version: Long): MapType = {
248-
if (version < 0) return new MapType
249-
synchronized {
250-
loadedMaps.get(version)
251-
}.getOrElse {
248+
if (version <= 0) return new MapType
249+
synchronized { loadedMaps.get(version) }.getOrElse {
252250
val mapFromFile = readSnapshotFile(version).getOrElse {
253251
val prevMap = loadMap(version - 1)
254252
val deltaUpdates = readDeltaFile(version)
@@ -328,9 +326,7 @@ class HDFSBackedStateStoreProvider(
328326
val lastVersion = files.last.version
329327
val deltaFilesForLastVersion =
330328
filesForVersion(files, lastVersion).filter(_.isSnapshot == false)
331-
synchronized {
332-
loadedMaps.get(lastVersion)
333-
} match {
329+
synchronized { loadedMaps.get(lastVersion) } match {
334330
case Some(map) =>
335331
if (deltaFilesForLastVersion.size > maxDeltaChainForSnapshots) {
336332
writeSnapshotFile(lastVersion, map)
@@ -342,7 +338,7 @@ class HDFSBackedStateStoreProvider(
342338
}
343339
} catch {
344340
case NonFatal(e) =>
345-
logWarning(s"Error doing snapshots for $this")
341+
logWarning(s"Error doing snapshots for $this", e)
346342
}
347343
}
348344

@@ -356,7 +352,7 @@ class HDFSBackedStateStoreProvider(
356352
val files = fetchFiles()
357353
if (files.nonEmpty) {
358354
val earliestVersionToRetain = files.last.version - numBatchesToRetain
359-
if (earliestVersionToRetain >= 0) {
355+
if (earliestVersionToRetain > 0) {
360356
val earliestFileToRetain = filesForVersion(files, earliestVersionToRetain).head
361357
synchronized {
362358
loadedMaps.keys.filter(_ < earliestVersionToRetain).foreach(loadedMaps.remove)
@@ -368,7 +364,7 @@ class HDFSBackedStateStoreProvider(
368364
}
369365
} catch {
370366
case NonFatal(e) =>
371-
logWarning(s"Error cleaning up files for $this")
367+
logWarning(s"Error cleaning up files for $this", e)
372368
}
373369
}
374370

@@ -380,7 +376,6 @@ class HDFSBackedStateStoreProvider(
380376
.filter(_.isSnapshot == true)
381377
.takeWhile(_.version <= version)
382378
.lastOption
383-
384379
val deltaBatchFiles = latestSnapshotFileBeforeVersion match {
385380
case Some(snapshotFile) =>
386381
val deltaBatchIds = (snapshotFile.version + 1) to version

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,12 @@ class StateStoreRDD[INPUT: ClassTag, OUTPUT: ClassTag](
3030
dataRDD: RDD[INPUT],
3131
storeUpdateFunction: (StateStore, Iterator[INPUT]) => Iterator[OUTPUT],
3232
operatorId: Long,
33-
newStoreVersion: Long,
33+
storeVersion: Long,
3434
storeDirectory: String,
3535
storeCoordinator: StateStoreCoordinator) extends RDD[OUTPUT](dataRDD) {
3636

37+
val nextVersion = storeVersion + 1
38+
3739
override protected def getPartitions: Array[Partition] = dataRDD.partitions
3840
override def getPreferredLocations(partition: Partition): Seq[String] = {
3941
Seq.empty
@@ -47,11 +49,8 @@ class StateStoreRDD[INPUT: ClassTag, OUTPUT: ClassTag](
4749
var store: StateStore = null
4850

4951
Utils.tryWithSafeFinally {
50-
store = StateStore.get(
51-
StateStoreId(operatorId, partition.index),
52-
storeDirectory,
53-
newStoreVersion - 1
54-
)
52+
val storeId = StateStoreId(operatorId, partition.index)
53+
store = StateStore.get(storeId, storeDirectory, storeVersion)
5554
val inputIter = dataRDD.compute(partition, ctxt)
5655
val outputIter = storeUpdateFunction(store, inputIter)
5756
assert(store.hasCommitted)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@ package object state {
2727
def withStateStores[OUTPUT: ClassTag](
2828
storeUpdateFunction: (StateStore, Iterator[INPUT]) => Iterator[OUTPUT],
2929
operatorId: Long,
30-
newStoreVersion: Long,
30+
storeVersion: Long,
3131
storeDirectory: String,
3232
storeCoordinator: StateStoreCoordinator
33-
): RDD[OUTPUT] = {
33+
): StateStoreRDD[INPUT, OUTPUT] = {
3434
new StateStoreRDD(
35-
dataRDD, storeUpdateFunction, operatorId, newStoreVersion, storeDirectory, storeCoordinator)
35+
dataRDD, storeUpdateFunction, operatorId, storeVersion, storeDirectory, storeCoordinator)
3636
}
3737
}
3838
}

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.spark.sql.execution.streaming.state
1919

20+
import java.io.File
21+
import java.nio.file.Files
22+
2023
import scala.util.Random
2124

2225
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
@@ -29,15 +32,21 @@ import org.apache.spark.util.Utils
2932
class StateStoreRDDSuite extends SparkFunSuite with BeforeAndAfter with BeforeAndAfterAll {
3033

3134
private val conf = new SparkConf().setMaster("local").setAppName(this.getClass.getCanonicalName)
32-
private var tempDir = Utils.createTempDir().toString
35+
private var tempDir = Files.createTempDirectory("StateStoreRDDSuite").toString
36+
println(tempDir)
3337

3438
import StateStoreSuite._
3539

3640
after {
3741
StateStore.clearAll()
3842
}
3943

40-
test("versioning and immuability") {
44+
override def afterAll(): Unit = {
45+
super.afterAll()
46+
Utils.deleteRecursively(new File(tempDir))
47+
}
48+
49+
test("versioning and immutability") {
4150
withSpark(new SparkContext(conf)) { sc =>
4251
val path = Utils.createDirectory(tempDir, Random.nextString(10)).toString
4352
val increment = (store: StateStore, iter: Iterator[String]) => {
@@ -53,20 +62,53 @@ class StateStoreRDDSuite extends SparkFunSuite with BeforeAndAfter with BeforeAn
5362
}
5463
val opId = 0
5564
val rdd1 = makeRDD(sc, Seq("a", "b", "a"))
56-
.withStateStores(increment, opId, newStoreVersion = 0, path, null)
65+
.withStateStores(increment, opId, storeVersion = 0, path, null)
5766
assert(rdd1.collect().toSet === Set("a" -> 2, "b" -> 1))
5867

5968
// Generate next version of stores
6069
val rdd2 = makeRDD(sc, Seq("a", "c"))
61-
.withStateStores(increment, opId, newStoreVersion = 1, path, null)
70+
.withStateStores(increment, opId, storeVersion = 1, path, null)
6271
assert(rdd2.collect().toSet === Set("a" -> 3, "b" -> 1, "c" -> 1))
6372

6473
// Make sure the previous RDD still has the same data.
6574
assert(rdd1.collect().toSet === Set("a" -> 2, "b" -> 1))
6675
}
6776
}
6877

78+
test("recovering from files") {
79+
val opId = 0
80+
val path = Utils.createDirectory(tempDir, Random.nextString(10)).toString
81+
82+
def makeStoreRDD(sc: SparkContext, seq: Seq[String], storeVersion: Int): RDD[(String, Int)] = {
83+
makeRDD(sc, Seq("a")).withStateStores(increment, opId, storeVersion, path, null)
84+
}
85+
86+
// Generate RDDs and state store data
87+
withSpark(new SparkContext(conf)) { sc =>
88+
for (i <- 1 to 20) {
89+
require(makeStoreRDD(sc, Seq("a"), i - 1).collect().toSet === Set("a" -> i))
90+
}
91+
}
92+
93+
// With a new context, try using the earlier state store data
94+
withSpark(new SparkContext(conf)) { sc =>
95+
assert(makeStoreRDD(sc, Seq("a"), 20).collect().toSet === Set("a" -> 21))
96+
}
97+
}
98+
6999
private def makeRDD(sc: SparkContext, seq: Seq[String]): RDD[String] = {
70100
sc.makeRDD(seq, 2).groupBy(x => x).flatMap(_._2)
71101
}
102+
103+
private val increment = (store: StateStore, iter: Iterator[String]) => {
104+
iter.foreach { s =>
105+
store.update(
106+
wrapKey(s), oldRow => {
107+
val oldValue = oldRow.map(unwrapValue).getOrElse(0)
108+
wrapValue(oldValue + 1)
109+
})
110+
}
111+
store.commit()
112+
store.iterator().map(unwrapKeyValue)
113+
}
72114
}

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala

Lines changed: 39 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,8 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
7474
assert(store.commit() === 1)
7575

7676
assert(store.hasCommitted)
77-
assert(store.iterator() === Set("b" -> 2))
78-
assert(store.updates() === Set("b" -> 2))
79-
assert(provider.latestIterator() === Set("b" -> 2))
77+
assert(unwrapToSet(store.iterator()) === Set("b" -> 2))
78+
assert(unwrapToSet(provider.latestIterator()) === Set("b" -> 2))
8079
assert(fileExists(provider, version = 1, isSnapshot = false))
8180
assert(getDataFromFiles(provider) === Set("b" -> 2))
8281

@@ -92,8 +91,7 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
9291
val reloadedStore = new HDFSBackedStateStoreProvider(store.id, provider.directory).getStore(1)
9392
update(reloadedStore, "c", 4)
9493
assert(reloadedStore.commit() === 2)
95-
assert(reloadedStore.iterator() === Set("b" -> 2, "c" -> 4))
96-
assert(reloadedStore.updates() === Set("c" -> 4))
94+
assert(unwrapToSet(reloadedStore.iterator()) === Set("b" -> 2, "c" -> 4))
9795
assert(getDataFromFiles(provider) === Set("b" -> 2, "c" -> 4))
9896
assert(getDataFromFiles(provider, version = 1) === Set("b" -> 2))
9997
assert(getDataFromFiles(provider, version = 2) === Set("b" -> 2, "c" -> 4))
@@ -104,7 +102,7 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
104102
val store = provider.getStore(0)
105103
update(store, "a", 1)
106104
store.commit()
107-
assert(store.iterator() === Set("a" -> 1))
105+
assert(unwrapToSet(store.iterator()) === Set("a" -> 1))
108106

109107
// cancelUpdates should not change the data in the files
110108
val store1 = provider.getStore(1)
@@ -124,87 +122,92 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
124122
val store = provider.getStore(0)
125123
update(store, "a", 1)
126124
assert(store.commit() === 1)
127-
assert(store.iterator() === Set("a" -> 1))
125+
assert(unwrapToSet(store.iterator()) === Set("a" -> 1))
128126

129127
intercept[IllegalStateException] {
130128
provider.getStore(2)
131129
}
132130

133131
// Update store version with some data
134-
provider.getStore(1)
135-
update(store, "b", 1)
136-
assert(store.commit() === 2)
137-
assert(store.iterator() === Set("a" -> 1, "b" -> 1))
132+
val store1 = provider.getStore(1)
133+
update(store1, "b", 1)
134+
assert(store1.commit() === 2)
135+
assert(unwrapToSet(store1.iterator()) === Set("a" -> 1, "b" -> 1))
138136
assert(getDataFromFiles(provider) === Set("a" -> 1, "b" -> 1))
139137

140138
// Overwrite the version with other data
141-
provider.getStore(1)
142-
update(store, "c", 1)
143-
assert(store.commit() === 2)
144-
assert(store.iterator() === Set("a" -> 1, "c" -> 1))
139+
val store2 = provider.getStore(1)
140+
update(store2, "c", 1)
141+
assert(store2.commit() === 2)
142+
assert(unwrapToSet(store2.iterator()) === Set("a" -> 1, "c" -> 1))
145143
assert(getDataFromFiles(provider) === Set("a" -> 1, "c" -> 1))
146144
}
147145

148146
test("snapshotting") {
149147
val provider = newStoreProvider(maxDeltaChainForSnapshots = 5)
150148

151-
var currentVersion = -1
149+
var currentVersion = 0
152150
def updateVersionTo(targetVersion: Int): Unit = {
153151
for (i <- currentVersion + 1 to targetVersion) {
154-
val store = provider.getStore(i - 1)
152+
val store = provider.getStore(currentVersion)
155153
update(store, "a", i)
156154
store.commit()
157-
155+
currentVersion += 1
158156
}
159-
currentVersion = targetVersion
157+
require(currentVersion === targetVersion)
160158
}
161159

160+
162161
updateVersionTo(2)
163162
require(getDataFromFiles(provider) === Set("a" -> 2))
164163
provider.manage() // should not generate snapshot files
165164
assert(getDataFromFiles(provider) === Set("a" -> 2))
166-
for (i <- 0 to 2) {
165+
166+
for (i <- 1 to currentVersion) {
167167
assert(fileExists(provider, i, isSnapshot = false)) // all delta files present
168168
assert(!fileExists(provider, i, isSnapshot = true)) // no snapshot files present
169169
}
170170

171171
// After version 6, snapshotting should generate one snapshot file
172172
updateVersionTo(6)
173-
require(getDataFromFiles(provider) === Set("a" -> 6), "Store not updated correctly")
173+
require(getDataFromFiles(provider) === Set("a" -> 6), "store not updated correctly")
174174
provider.manage() // should generate snapshot files
175175
assert(getDataFromFiles(provider) === Set("a" -> 6), "snapshotting messed up the data")
176176
assert(getDataFromFiles(provider) === Set("a" -> 6))
177177

178178
val snapshotVersion = (0 to 6).find(version => fileExists(provider, version, isSnapshot = true))
179-
assert(snapshotVersion.nonEmpty, "Snapshot file not generated")
180-
179+
assert(snapshotVersion.nonEmpty, "snapshot file not generated")
181180

182181
// After version 20, snapshotting should generate newer snapshot files
183182
updateVersionTo(20)
184-
require(getDataFromFiles(provider) === Set("a" -> 20), "Store not updated correctly")
183+
require(getDataFromFiles(provider) === Set("a" -> 20), "store not updated correctly")
185184
provider.manage() // do snapshot
186185
assert(getDataFromFiles(provider) === Set("a" -> 20), "snapshotting messed up the data")
187186
assert(getDataFromFiles(provider) === Set("a" -> 20))
188187

189188
val latestSnapshotVersion = (0 to 20).filter(version =>
190189
fileExists(provider, version, isSnapshot = true)).lastOption
191-
assert(latestSnapshotVersion.nonEmpty, "No snapshot file found")
192-
assert(latestSnapshotVersion.get > snapshotVersion.get, "Newer snapshot not generated")
190+
assert(latestSnapshotVersion.nonEmpty, "no snapshot file found")
191+
assert(latestSnapshotVersion.get > snapshotVersion.get, "newer snapshot not generated")
193192

194193
}
195194

196195
test("cleaning") {
197196
val provider = newStoreProvider(maxDeltaChainForSnapshots = 5)
198197

199-
for (i <- 0 to 20) {
200-
val store = provider.getStore(i)
198+
for (i <- 1 to 20) {
199+
val store = provider.getStore(i - 1)
201200
update(store, "a", i)
202201
store.commit()
202+
provider.manage() // do cleanup
203203
}
204-
require(provider.latestIterator() === Set("a" -> 20), "Store not updated correctly")
205-
provider.manage() // do cleanup
206-
assert(fileExists(provider, 0, isSnapshot = false))
204+
require(
205+
unwrapToSet(provider.latestIterator()) === Set("a" -> 20),
206+
"store not updated correctly")
207+
208+
assert(!fileExists(provider, version = 1, isSnapshot = false)) // first file should be deleted
207209

210+
// last couple of versions should be retrievable
208211
assert(getDataFromFiles(provider, 20) === Set("a" -> 20))
209212
assert(getDataFromFiles(provider, 19) === Set("a" -> 19))
210213
}
@@ -214,7 +217,7 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
214217
version: Int = -1): Set[(String, Int)] = {
215218
val reloadedProvider = new HDFSBackedStateStoreProvider(provider.id, provider.directory)
216219
if (version < 0) {
217-
reloadedProvider.latestIterator.map(unwrapKeyValue).toSet
220+
reloadedProvider.latestIterator().map(unwrapKeyValue).toSet
218221
} else {
219222
reloadedProvider.iterator(version).map(unwrapKeyValue).toSet
220223
}
@@ -295,4 +298,8 @@ private[state] object StateStoreSuite {
295298
def unwrapKeyValue(row: InternalRow): (String, Int) = {
296299
(row.getString(0), row.getInt(1))
297300
}
301+
302+
def unwrapToSet(iterator: Iterator[InternalRow]): Set[(String, Int)] = {
303+
iterator.map(unwrapKeyValue).toSet
304+
}
298305
}

0 commit comments

Comments
 (0)