Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,29 @@ private class HistoryServerDiskManager(

// Go through the recorded store directories and remove any that may have been removed by
// external code.
val orphans = listing.view(classOf[ApplicationStoreInfo]).asScala.filter { info =>
!new File(info.path).exists()
}.toSeq
val (existences, orphans) = listing
.view(classOf[ApplicationStoreInfo])
.asScala
.toSeq
.partition { info =>
new File(info.path).exists()
}

orphans.foreach { info =>
listing.delete(info.getClass(), info.path)
}

// Reading level db would trigger table file compaction, then it may cause size of level db
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could apply "partition" instead of filter in L78, so that we only call iterate with view once and retrieve two lists (orphans, existing) - L90 can use existing to avoid calling view again.

Copy link
Contributor Author

@zhli1142015 zhli1142015 Jun 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HeartSaVioR , thank you for your comments, updated it.

// directory changed. When service restarts, "currentUsage" is calculated from real directory
// size. Update "ApplicationStoreInfo.size" to ensure "currentUsage" equals
// sum of "ApplicationStoreInfo.size".
existences.foreach { info =>
val fileSize = sizeOf(new File(info.path))
if (fileSize != info.size) {
listing.write(info.copy(size = fileSize))
}
}

logInfo("Initialized disk manager: " +
s"current usage = ${Utils.bytesToString(currentUsage.get())}, " +
s"max usage = ${Utils.bytesToString(maxUsage)}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,4 +158,50 @@ class HistoryServerDiskManagerSuite extends SparkFunSuite with BeforeAndAfter {
assert(manager.approximateSize(50L, true) > 50L)
}

test("SPARK-32024: update ApplicationStoreInfo.size during initializing") {
val manager = mockManager()
val leaseA = manager.lease(2)
doReturn(3L).when(manager).sizeOf(meq(leaseA.tmpPath))
val dstA = leaseA.commit("app1", None)
assert(manager.free() === 0)
assert(manager.committed() === 3)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we track and verify ApplicationStoreInfo in store as well? I see the test represents how HistoryServerDiskManager fails without the patch, but it would be pretty much intuitive if we also verify what we've changed directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, updated.

// Listing store tracks dstA now.
assert(store.read(classOf[ApplicationStoreInfo], dstA.getAbsolutePath).size === 3)

// Simulate: service restarts, new disk manager (manager1) is initialized.
val manager1 = mockManager()
// Simulate: event KVstore compaction before restart, directory size reduces.
doReturn(2L).when(manager1).sizeOf(meq(dstA))
doReturn(2L).when(manager1).sizeOf(meq(new File(testDir, "apps")))
manager1.initialize()
// "ApplicationStoreInfo.size" is updated for dstA.
assert(store.read(classOf[ApplicationStoreInfo], dstA.getAbsolutePath).size === 2)
assert(manager1.free() === 1)
// If "ApplicationStoreInfo.size" is not correctly updated, "IllegalStateException"
// would be thrown.
val leaseB = manager1.lease(2)
assert(manager1.free() === 1)
doReturn(2L).when(manager1).sizeOf(meq(leaseB.tmpPath))
val dstB = leaseB.commit("app2", None)
assert(manager1.committed() === 2)
// Listing store tracks dstB only, dstA is evicted by "makeRoom()".
assert(store.read(classOf[ApplicationStoreInfo], dstB.getAbsolutePath).size === 2)

val manager2 = mockManager()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add some empty lines in overall to bring readability.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated

// Simulate: cache entities are written after replaying, directory size increases.
doReturn(3L).when(manager2).sizeOf(meq(dstB))
doReturn(3L).when(manager2).sizeOf(meq(new File(testDir, "apps")))
manager2.initialize()
// "ApplicationStoreInfo.size" is updated for dstB.
assert(store.read(classOf[ApplicationStoreInfo], dstB.getAbsolutePath).size === 3)
assert(manager2.free() === 0)
val leaseC = manager2.lease(2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to explicitly mention it leads eviction on cached entities, hence free() goes to 1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated

doReturn(2L).when(manager2).sizeOf(meq(leaseC.tmpPath))
val dstC = leaseC.commit("app3", None)
assert(manager2.free() === 1)
assert(manager2.committed() === 2)
// Listing store tracks dstC only, dstB is evicted by "makeRoom()".
assert(store.read(classOf[ApplicationStoreInfo], dstC.getAbsolutePath).size === 2)
}

}