@@ -106,7 +106,7 @@ class DatabricksAtomicCommitProtocolSuite extends QueryTest with SharedSQLContex
106106 }
107107 }
108108
109- test (" vacuum removes files from failed tasks immediately" ) {
109+ testWithMonotonicClockFs (" vacuum removes files from failed tasks immediately" ) {
110110 withTempDir { dir =>
111111 val f1 = " part-r-00001-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv"
112112 val f2 = " part-r-00002-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv"
@@ -143,7 +143,7 @@ class DatabricksAtomicCommitProtocolSuite extends QueryTest with SharedSQLContex
143143 }
144144 }
145145
146- test (" vacuum removes uncommitted files after timeout" ) {
146+ testWithMonotonicClockFs (" vacuum removes uncommitted files after timeout" ) {
147147 withTempDir { dir =>
148148 val f1 = " part-r-00001-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv"
149149 val f2 = " part-r-00002-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv"
@@ -174,7 +174,7 @@ class DatabricksAtomicCommitProtocolSuite extends QueryTest with SharedSQLContex
174174 }
175175 }
176176
177- test (" vacuum removes deleted files after timeout" ) {
177+ testWithMonotonicClockFs (" vacuum removes deleted files after timeout" ) {
178178 withTempDir { dir =>
179179 val f1 = " part-r-00001-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv"
180180 val f2 = " part-r-00002-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv"
@@ -205,7 +205,7 @@ class DatabricksAtomicCommitProtocolSuite extends QueryTest with SharedSQLContex
205205 }
206206 }
207207
208- test (" zero-length commit markers mean txn is pending" ) {
208+ testWithMonotonicClockFs (" zero-length commit markers mean txn is pending" ) {
209209 withTempDir { dir =>
210210 val f1 = " part-r-00001-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv"
211211 create(dir, f1)
@@ -337,34 +337,63 @@ class DatabricksAtomicCommitProtocolSuite extends QueryTest with SharedSQLContex
337337 }
338338 }
339339
340- private def makeFakeTimedFs (clock : Clock ): FileSystem = {
341- val fakeFs = new RawLocalFileSystem () {
342- private val creationTimes = mutable.Map [Path , Long ]()
340+ private class FakeClockFileSystem (clock : Clock ) extends RawLocalFileSystem {
341+ private val creationTimes = mutable.Map [Path , Long ]()
343342
344- // override file creation to save our custom timestamps
345- override def create (path : Path ): FSDataOutputStream = create(path, false )
346- override def create (path : Path , overwrite : Boolean ): FSDataOutputStream = {
347- creationTimes(path.makeQualified(this )) = clock.getTimeMillis()
348- super .create(path, overwrite)
349- }
343+ // override file creation to save our custom timestamps
344+ override def create (path : Path ): FSDataOutputStream = create(path, false )
345+ override def create (path : Path , overwrite : Boolean ): FSDataOutputStream = {
346+ creationTimes(path.makeQualified(this )) = clock.getTimeMillis()
347+ super .create(path, overwrite)
348+ }
350349
351- // fill in our custom timestamps on list
352- override def listStatus (path : Path ): Array [FileStatus ] = {
353- super .listStatus(path).map { stat =>
354- new FileStatus (
355- stat.getLen,
356- stat.isDir,
357- 0 ,
358- stat.getBlockSize,
359- creationTimes.getOrElse(stat.getPath, stat.getModificationTime),
360- stat.getPath)
361- }
350+ // fill in our custom timestamps on list
351+ override def listStatus (path : Path ): Array [FileStatus ] = {
352+ super .listStatus(path).map { stat =>
353+ new FileStatus (
354+ stat.getLen,
355+ stat.isDir,
356+ 0 ,
357+ stat.getBlockSize,
358+ creationTimes.getOrElse(stat.getPath, stat.getModificationTime),
359+ stat.getPath)
362360 }
363361 }
362+ }
363+
364+ private def makeFakeTimedFs (clock : Clock ): FileSystem = {
365+ val fakeFs = new FakeClockFileSystem (clock)
364366 fakeFs.initialize(new File (" /" ).toURI, new Configuration ())
365367 fakeFs
366368 }
367369
370+ private class MonotonicSystemClock extends SystemClock {
371+ private var lastTime : Long = 0L
372+
373+ override def getTimeMillis (): Long = synchronized {
374+ val time = super .getTimeMillis()
375+ if (time < lastTime) {
376+ logWarning(s " System clock went back in time: $time < $lastTime" )
377+ lastTime
378+ } else {
379+ lastTime = time
380+ time
381+ }
382+ }
383+ }
384+
385+ def testWithMonotonicClockFs (name : String )(runTest : => Unit ): Unit = {
386+ test(name) {
387+ val fakeFs = makeFakeTimedFs(new MonotonicSystemClock )
388+ try {
389+ DatabricksAtomicReadProtocol .testingFs = Some (fakeFs)
390+ runTest
391+ } finally {
392+ DatabricksAtomicReadProtocol .clock = new SystemClock
393+ }
394+ }
395+ }
396+
368397 def withFakeClockAndFs (f : ManualClock => Unit ): Unit = {
369398 val clock = new ManualClock (System .currentTimeMillis)
370399 val fakeFs = makeFakeTimedFs(clock)
@@ -468,7 +497,7 @@ class DatabricksAtomicCommitProtocolSuite extends QueryTest with SharedSQLContex
468497 }
469498 }
470499
471- test (" vacuum horizon respects sql config" ) {
500+ testWithMonotonicClockFs (" vacuum horizon respects sql config" ) {
472501 withTempDir { dir =>
473502 val df = spark.range(10 ).selectExpr(" id" , " id as A" , " id as B" )
474503 df.write.partitionBy(" A" , " B" ).mode(" overwrite" ).parquet(dir.getAbsolutePath)
@@ -482,7 +511,7 @@ class DatabricksAtomicCommitProtocolSuite extends QueryTest with SharedSQLContex
482511 }
483512 }
484513
485- test (" auto vacuum respects sql config" ) {
514+ testWithMonotonicClockFs (" auto vacuum respects sql config" ) {
486515 withTempDir { dir =>
487516 val df = spark.range(10 ).selectExpr(" id" , " id as A" , " id as B" )
488517 for (i <- 1 to 2 ) {
@@ -506,7 +535,7 @@ class DatabricksAtomicCommitProtocolSuite extends QueryTest with SharedSQLContex
506535 }
507536 }
508537
509- test (" vacuum table basic" ) {
538+ testWithMonotonicClockFs (" vacuum table basic" ) {
510539 withTempDir { dir =>
511540 withTable(" testTable" ) {
512541 spark.sql(
@@ -521,7 +550,7 @@ class DatabricksAtomicCommitProtocolSuite extends QueryTest with SharedSQLContex
521550 }
522551 }
523552
524- test (" vacuum table works with custom locations" ) {
553+ testWithMonotonicClockFs (" vacuum table works with custom locations" ) {
525554 withTempDir { dir =>
526555 spark.range(10 ).repartition(1 ).write.mode(" overwrite" ).parquet(dir.getAbsolutePath)
527556 spark.range(10 ).repartition(1 ).write.mode(" overwrite" ).parquet(dir.getAbsolutePath)
@@ -679,7 +708,7 @@ class DatabricksAtomicCommitProtocolSuite extends QueryTest with SharedSQLContex
679708 * not observing them immediately (and possibly not in order).
680709 */
681710 var numDeletes = 0
682- val inconsistentFs = new RawLocalFileSystem ( ) {
711+ val inconsistentFs = new FakeClockFileSystem ( new MonotonicSystemClock ) {
683712 val deletedFiles = mutable.Set [Path ]()
684713
685714 def flushDeletes (): Unit = {
0 commit comments