@@ -69,7 +69,7 @@ class DatabricksAtomicCommitProtocol(jobId: String, path: String)
6969 taskContext : TaskAttemptContext , absoluteDir : String , ext : String ): String = {
7070 val filename = getFilename(taskContext, ext)
7171 val finalPath = new Path (absoluteDir, filename)
72- val fs = finalPath.getFileSystem(taskContext.getConfiguration)
72+ val fs = testingFs.getOrElse( finalPath.getFileSystem(taskContext.getConfiguration) )
7373 val startMarker = new Path (finalPath.getParent, new Path (s " _started_ $txnId" ))
7474 if (! fs.exists(startMarker)) {
7575 fs.create(startMarker, true ).close()
@@ -83,7 +83,7 @@ class DatabricksAtomicCommitProtocol(jobId: String, path: String)
8383 val fs = testingFs.getOrElse(_fs)
8484 val sparkSession = SparkSession .getActiveSession.get
8585 if (! sparkSession.sqlContext.getConf(
86- " com .databricks.sql.enableLogicalDelete" , " true" ).toBoolean) {
86+ " spark .databricks.sql.enableLogicalDelete" , " true" ).toBoolean) {
8787 return super .deleteWithJob(fs, path, recursive)
8888 }
8989 if (recursive && fs.getFileStatus(path).isFile) {
@@ -135,7 +135,7 @@ class DatabricksAtomicCommitProtocol(jobId: String, path: String)
135135 override def commitJob (jobContext : JobContext , taskCommits : Seq [TaskCommitMessage ]): Unit = {
136136 logInfo(" Committing job " + jobId)
137137 val root = new Path (path)
138- val fs = root.getFileSystem(jobContext.getConfiguration)
138+ val fs = testingFs.getOrElse( root.getFileSystem(jobContext.getConfiguration) )
139139 def qualify (path : Path ): Path = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
140140
141141 // Collects start markers and staged task files.
@@ -170,7 +170,7 @@ class DatabricksAtomicCommitProtocol(jobId: String, path: String)
170170 // Optional auto-vacuum.
171171 val sparkSession = SparkSession .getActiveSession.get
172172 if (sparkSession.sqlContext.getConf(
173- " com .databricks.sql.autoVacuumOnCommit" , " true" ).toBoolean) {
173+ " spark .databricks.sql.autoVacuumOnCommit" , " true" ).toBoolean) {
174174 logInfo(" Auto-vacuuming directories updated by " + jobId)
175175 try {
176176 vacuum(sparkSession, dirs.seq.toSeq, None )
@@ -197,7 +197,7 @@ class DatabricksAtomicCommitProtocol(jobId: String, path: String)
197197 // We must leave the start markers since other stray tasks may be writing to this same
198198 // directory, and we need to ensure their files stay hidden.
199199 stagedFiles.map(new Path (_)).foreach { f =>
200- val fs = f.getFileSystem(taskContext.getConfiguration)
200+ val fs = testingFs.getOrElse( f.getFileSystem(taskContext.getConfiguration) )
201201 fs.delete(f, false )
202202 }
203203 }
@@ -215,25 +215,38 @@ object DatabricksAtomicCommitProtocol extends Logging {
215215 */
216216 def vacuum (
217217 sparkSession : SparkSession , paths : Seq [Path ], horizonHours : Option [Double ]): List [Path ] = {
218- val defaultHours = sparkSession.sqlContext.getConf(
219- " com.databricks.sql.defaultVacuumRetentionHours" , " 48.0" ).toDouble
220- val hours = horizonHours.getOrElse(defaultHours)
221- val horizon = clock.getTimeMillis - (hours * 60 * 60 * 1000 ).toLong
222-
223- logInfo(s " Started VACUUM on $paths with $horizon (now - $hours hours) " )
218+ val now = clock.getTimeMillis
219+ val defaultDataHorizonHours = sparkSession.sqlContext.getConf(
220+ " spark.databricks.sql.vacuum.dataHorizonHours" , " 48.0" ).toDouble
221+ val dataHorizonHours = horizonHours.getOrElse(defaultDataHorizonHours)
222+ val dataHorizon = now - (dataHorizonHours * 60 * 60 * 1000 ).toLong
223+
224+ // Vacuum will start removing commit markers after this time has passed, so this should be
225+ // greater than the max amount of time we think a zombie executor can hang around and write
226+ // output after the job has finished. TODO(ekl) move to spark edge conf
227+ val metadataHorizonHours = sparkSession.sqlContext.getConf(
228+ " spark.databricks.sql.vacuum.metadataHorizonHours" , " 0.5" ).toDouble
229+ val metadataHorizon = math.max(
230+ dataHorizon,
231+ now - metadataHorizonHours * 60 * 60 * 1000 ).toLong
232+
233+ logInfo(
234+ s " Started VACUUM on $paths with data horizon $dataHorizon (now - $dataHorizonHours hours), " +
235+ s " metadata horizon $metadataHorizon (now - $metadataHorizonHours hours) " )
224236
225237 if (paths.length == 1 ) {
226- vacuum0(paths(0 ), horizon , sparkSession.sparkContext.hadoopConfiguration)
238+ vacuum0(paths(0 ), dataHorizon, metadataHorizon , sparkSession.sparkContext.hadoopConfiguration)
227239 } else {
228240 val pseq = paths.par
229241 pseq.tasksupport = tasksupport
230242 pseq.map { p =>
231- vacuum0(p, horizon , sparkSession.sparkContext.hadoopConfiguration)
243+ vacuum0(p, dataHorizon, metadataHorizon , sparkSession.sparkContext.hadoopConfiguration)
232244 }.reduce(_ ++ _)
233245 }
234246 }
235247
236- private [transaction] def vacuum0 (path : Path , horizon : Long , conf : Configuration ): List [Path ] = {
248+ private [transaction] def vacuum0 (
249+ path : Path , dataHorizon : Long , metadataHorizon : Long , conf : Configuration ): List [Path ] = {
237250 val fs = testingFs.getOrElse(path.getFileSystem(conf))
238251 val (dirs, initialFiles) = fs.listStatus(path).partition(_.isDirectory)
239252
@@ -246,51 +259,66 @@ object DatabricksAtomicCommitProtocol extends Logging {
246259
247260 val (state, resolvedFiles) = resolveCommitState(fs, path, initialFiles)
248261
249- // remove uncommitted and timed-out file outputs
262+ def canDelete (name : String ): Boolean = {
263+ val deletionTime = state.getDeletionTime(name)
264+ (isMetadataFile(name) && deletionTime <= metadataHorizon) ||
265+ (! isMetadataFile(name) && deletionTime <= dataHorizon)
266+ }
267+
250268 for (file <- resolvedFiles) {
251269 file.getPath.getName match {
252- // we wait for a horizon to avoid killing Spark jobs using those files
253- case name if state.getDeletionTime (name) > 0 && state.getDeletionTime (name) <= horizon =>
254- logInfo(s " Garbage collecting ${file.getPath} since it is marked as deleted . " )
270+ // Files marked for deletion, either by an overwrite transaction or previous VACUUM.
271+ case name if state.isDeleted (name) && canDelete (name) =>
272+ logInfo(s " Garbage collecting file ${file.getPath} marked for deletion . " )
255273 delete(file.getPath)
256274
275+ // Data files from failed tasks. They can be removed immediately since it is guaranteed
276+ // no one will read them.
257277 case name @ FILE_WITH_TXN_ID (txnId) if state.isCommitted(txnId) &&
258278 ! state.isFileCommitted(txnId, name) =>
259279 logInfo(s " Garbage collecting ${file.getPath} since it was written by a failed task. " )
260280 delete(file.getPath)
261281
282+ // Data files from timed out transactions can be removed, since we assume the job has
283+ // aborted by this time.
262284 case name @ FILE_WITH_TXN_ID (txnId) if ! state.isCommitted(txnId) &&
263- checkPositive(state.getStartTime(txnId)) <= horizon =>
285+ checkPositive(state.getStartTime(txnId)) <= dataHorizon =>
264286 logWarning(s " Garbage collecting ${file.getPath} since its job has timed out " +
265- s " ( ${state.getStartTime(txnId)} <= $horizon ). " )
287+ s " ( ${state.getStartTime(txnId)} <= $dataHorizon ). " )
266288 delete(file.getPath)
267289
268- // always safe to delete since the commit marker is present
290+ // Start markers from committed transactions can be removed after a short grace period,
291+ // since if we see the commit marker, the start marker is irrelevant.
269292 case STARTED_MARKER (txnId) if state.isCommitted(txnId) &&
270- checkPositive(file.getModificationTime) <= horizon =>
293+ checkPositive(state.getCommitTime(txnId)) <= metadataHorizon =>
271294 logInfo(s " Garbage collecting start marker ${file.getPath} of committed job. " )
272295 delete(file.getPath)
273296
274297 case _ =>
275298 }
276299 }
277300
278- // Queue up stale markers for deletion. We do this by writing out a _committed file that
279- // will cause them to be garbage collected in the next cycle.
301+ // There are some files we'd like to delete, but cannot safely do in this pass since we must
302+ // ensure the deletes here are observed only after the deletes above are observed. To work
303+ // around this we queue them up for deletion by writing out a _committed file that will cause
304+ // them to be garbage collected in the next call to vacuum.
280305 var deleteLater : List [Path ] = Nil
281306 for (file <- resolvedFiles) {
282307 file.getPath.getName match {
283- case name @ COMMITTED_MARKER (txnId) if state.getDeletionTime(name) == 0 &&
284- checkPositive(file.getModificationTime) <= horizon =>
285- val startMarker = new Path (file.getPath.getParent, s " _started_ $txnId" )
286- if (fs.exists(startMarker)) {
287- delete(startMarker) // make sure we delete it just in case
308+ // Commit markers from committed transactions.
309+ case name @ COMMITTED_MARKER (txnId) if ! state.isDeleted(name) =>
310+ // When a commit does not remove any data files, we can delete it earlier. Otherwise we
311+ // have to wait for the data horizon to pass, since those files must be purged first.
312+ val dataFilesAlreadyPurged = checkPositive(file.getModificationTime) <= dataHorizon
313+ if (dataFilesAlreadyPurged || (! state.removesDataFiles(txnId) &&
314+ checkPositive(file.getModificationTime) <= metadataHorizon)) {
315+ // Corresponding start marker and data files are guaranteed to be purged already.
316+ deleteLater ::= file.getPath
288317 }
289- deleteLater ::= file.getPath
290318
291- // the data files were deleted above, but we need to delay marker deletion
319+ // Start markers from timed out transactions.
292320 case STARTED_MARKER (txnId) if ! state.isCommitted(txnId) &&
293- checkPositive(file.getModificationTime) <= horizon =>
321+ checkPositive(file.getModificationTime) <= dataHorizon =>
294322 deleteLater ::= file.getPath
295323
296324 case _ =>
@@ -310,7 +338,7 @@ object DatabricksAtomicCommitProtocol extends Logging {
310338
311339 // recurse
312340 for (d <- dirs) {
313- deletedPaths :::= vacuum0(d.getPath, horizon , conf)
341+ deletedPaths :::= vacuum0(d.getPath, dataHorizon, metadataHorizon , conf)
314342 if (fs.listStatus(d.getPath).isEmpty) {
315343 logInfo(s " Garbage collecting empty directory ${d.getPath}" )
316344 delete(d.getPath)
0 commit comments