Skip to content

Commit 227ff64

Browse files
committed
[SC-5557] Initial implementation of directory level atomicity (omits delete hook)
## What changes were proposed in this pull request? This implements a file commit protocol optimized for cloud storage. Files are written directly to their final locations. Their commit status is determined by the presence of specially named marker files. Job commit proceeds as follows: 1) When tasks request a file to write, we create a `_started_$txnId` marker in the output directory. The output files are hidden from readers while the start transaction marker is present. 2) We commit the job by replacing the _started marker with a `_committed_$txnId` file that contains a list of files added and removed in that directory. The protocol is fail-open. That is, if a start marker is not present, we assume the file is committed. Note that this is only atomic per-directory, and may suffer from update anomalies if there are multiple concurrent writers. To clean up garbage files, we provide the sql VACUUM command that takes a time horizon. `VACUUM '/path/to/directory' [RETAIN <num> HOURS]` will remove garbage / uncommitted files after the specified number of hours, defaulting to 48. ### Config flags: spark.sql.sources.commitProtocolClass -- commit protocol (default DatabricksAtomicCommitProtocol) com.databricks.sql.enableFilterUncommitted -- whether to enable the read protocol (default true) com.databricks.sql.ignoreCorruptCommitMarkers -- whether to ignore unreadable commit markers rather than raising error (default false) ## How was this patch tested? Unit tests. TODO(ekl): randomized tests in a follow-up to verify correctness Author: Eric Liang <[email protected]> Author: Eric Liang <[email protected]> Closes apache#164 from ericl/directory-atomicity.
1 parent dae6c6f commit 227ff64

File tree

15 files changed

+1048
-103
lines changed

15 files changed

+1048
-103
lines changed

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ statement
150150
| SET ROLE .*? #failNativeCommand
151151
| SET .*? #setConfiguration
152152
| RESET #resetConfiguration
153+
| VACUUM path=STRING (RETAIN number HOURS)? #vacuumPath
153154
| unsupportedHiveNativeCommands .*? #failNativeCommand
154155
;
155156

@@ -701,6 +702,7 @@ nonReserved
701702
| AND | CASE | CAST | DISTINCT | DIV | ELSE | END | FUNCTION | INTERVAL | MACRO | OR | STRATIFY | THEN
702703
| UNBOUNDED | WHEN
703704
| DATABASE | SELECT | FROM | WHERE | HAVING | TO | TABLE | WITH | NOT | CURRENT_DATE | CURRENT_TIMESTAMP
705+
| VACUUM | RETAIN | HOURS
704706
;
705707

706708
SELECT: 'SELECT';
@@ -810,6 +812,9 @@ START: 'START';
810812
TRANSACTION: 'TRANSACTION';
811813
COMMIT: 'COMMIT';
812814
ROLLBACK: 'ROLLBACK';
815+
VACUUM: 'VACUUM';
816+
RETAIN: 'RETAIN';
817+
HOURS: 'HOURS';
813818
MACRO: 'MACRO';
814819

815820
IF: 'IF';
Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
/* Copyright © 2016 Databricks, Inc.
2+
*
3+
* Portions of this software incorporate or are derived from software contained within Apache Spark,
4+
* and this modified software differs from the Apache Spark software provided under the Apache
5+
* License, Version 2.0, a copy of which you may obtain at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*/
8+
9+
package org.apache.spark.sql.transaction
10+
11+
import java.io._
12+
import java.nio.charset.StandardCharsets
13+
14+
import scala.collection.mutable
15+
import scala.util.control.NonFatal
16+
17+
import org.apache.hadoop.fs._
18+
import org.apache.hadoop.mapreduce._
19+
import org.json4s.NoTypeHints
20+
import org.json4s.jackson.Serialization
21+
22+
import org.apache.spark.SparkEnv
23+
import org.apache.spark.internal.Logging
24+
import org.apache.spark.internal.io.FileCommitProtocol
25+
import org.apache.spark.sql.SparkSession
26+
27+
/**
28+
* File commit protocol optimized for cloud storage. Files are written directly to their final
29+
* locations. Their commit status is determined by the presence of specially named marker files.
30+
*
31+
* Job commit proceeds as follows:
32+
*
33+
* 1) When tasks request a file to write, we create a `_started_$txnId` marker in the output
34+
* directory. The output files, which have $txnId embedded in their name, are hidden from
35+
* readers while the start marker is present.
36+
* 2) We commit the job by creating a new `_committed_$txnId` marker that contains a list of
37+
* files added and removed in that directory.
38+
*
39+
* Note that this is only atomic per-directory, and that we only provide snapshot isolation and
40+
* not serializability.
41+
*/
42+
class DatabricksAtomicCommitProtocol(jobId: String, path: String)
43+
extends FileCommitProtocol with Serializable with Logging {
44+
45+
import FileCommitProtocol._
46+
import DatabricksAtomicReadProtocol._
47+
import DatabricksAtomicCommitProtocol._
48+
49+
// Globally unique alphanumeric string. We decouple this from jobId for possible future use.
50+
private val txnId: TxnId = math.abs(scala.util.Random.nextLong).toString
51+
52+
// The list of files staged by this committer. These are collected to the driver on task commit.
53+
private val stagedFiles = mutable.Set[String]()
54+
55+
// The list of files staged for deletion by the driver.
56+
@transient private val stagedDeletions = mutable.Set[Path]()
57+
58+
override def newTaskTempFile(
59+
taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
60+
if (dir.isDefined) {
61+
newTaskTempFileAbsPath(taskContext, new Path(path, dir.get).toString, ext)
62+
} else {
63+
newTaskTempFileAbsPath(taskContext, new Path(path).toString, ext)
64+
}
65+
}
66+
67+
override def newTaskTempFileAbsPath(
68+
taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = {
69+
val filename = getFilename(taskContext, ext)
70+
val finalPath = new Path(absoluteDir, filename)
71+
val fs = finalPath.getFileSystem(taskContext.getConfiguration)
72+
val startMarker = new Path(finalPath.getParent, new Path(s"_started_$txnId"))
73+
if (!fs.exists(startMarker)) {
74+
fs.create(startMarker, true).close()
75+
logDebug("Created start marker: " + startMarker)
76+
}
77+
stagedFiles += finalPath.toString
78+
finalPath.toString
79+
}
80+
81+
private def getFilename(taskContext: TaskAttemptContext, ext: String): String = {
82+
// Note that %05d does not truncate the split number, so if we have more than 100000 tasks,
83+
// the file name is fine and won't overflow.
84+
val split = taskContext.getTaskAttemptID.getTaskID.getId
85+
86+
// Include the job and task attempt ids so that file writes never collide.
87+
val taskAttemptId = taskContext.getTaskAttemptID.getId
88+
89+
// e.g. part-00001-tid-177723428-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.gz.parquet
90+
f"part-$split%05d-tid-$txnId-$jobId-$taskAttemptId$ext"
91+
}
92+
93+
override def setupJob(jobContext: JobContext): Unit = {
94+
val root = new Path(path)
95+
root.getFileSystem(jobContext.getConfiguration).mkdirs(root)
96+
}
97+
98+
override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = {
99+
logInfo("Committing job " + jobId)
100+
val root = new Path(path)
101+
val fs = root.getFileSystem(jobContext.getConfiguration)
102+
def qualify(path: Path): Path = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
103+
104+
// Collects start markers and staged task files.
105+
taskCommits.foreach { t =>
106+
val task = t.obj.asInstanceOf[this.type]
107+
stagedFiles ++= task.stagedFiles
108+
}
109+
110+
val addedByDir = stagedFiles.toSeq.map(new Path(_)).map(qualify)
111+
.groupBy(_.getParent).map(kv => (kv._1, kv._2.map(_.getName.toString)))
112+
113+
val removedByDir = stagedDeletions.toSeq.map(qualify)
114+
.groupBy(_.getParent).map(kv => (kv._1, kv._2.map(_.getName.toString)))
115+
116+
// Commit each updated directory in parallel.
117+
val dirs = (addedByDir.keys ++ removedByDir.keys).toSet.par
118+
dirs.tasksupport = DatabricksAtomicCommitProtocol.tasksupport
119+
dirs.foreach { dir =>
120+
val commitMarker = new Path(dir, s"_committed_$txnId")
121+
val output = fs.create(commitMarker)
122+
try {
123+
serializeFileChanges(
124+
addedByDir.getOrElse(dir, Nil), removedByDir.getOrElse(dir, Nil), output)
125+
} finally {
126+
output.close()
127+
}
128+
// We don't delete the start marker here since from a correctness perspective, it is
129+
// possible a concurrent reader sees neither the start nor end marker even with a re-list
130+
}
131+
logInfo("Job commit completed for " + jobId)
132+
}
133+
134+
override def abortJob(jobContext: JobContext): Unit = {
135+
/* no-op */
136+
}
137+
138+
override def setupTask(taskContext: TaskAttemptContext): Unit = {
139+
/* no-op */
140+
}
141+
142+
override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
143+
new TaskCommitMessage(this)
144+
}
145+
146+
override def abortTask(taskContext: TaskAttemptContext): Unit = {
147+
// We must leave the start markers since other stray tasks may be writing to this same
148+
// directory, and we need to ensure their files stay hidden.
149+
stagedFiles.map(new Path(_)).foreach { f =>
150+
val fs = f.getFileSystem(taskContext.getConfiguration)
151+
fs.delete(f, false)
152+
}
153+
}
154+
}
155+
156+
object DatabricksAtomicCommitProtocol extends Logging {
157+
import DatabricksAtomicReadProtocol._
158+
159+
private val sparkSession = SparkSession.builder.getOrCreate()
160+
161+
import scala.collection.parallel.ThreadPoolTaskSupport
162+
import java.util.concurrent.{LinkedBlockingQueue, ThreadFactory, ThreadPoolExecutor, TimeUnit}
163+
164+
private lazy val tasksupport = new ThreadPoolTaskSupport({
165+
val pool = new ThreadPoolExecutor(
166+
100,
167+
100,
168+
100L,
169+
TimeUnit.SECONDS,
170+
new LinkedBlockingQueue[Runnable])
171+
pool.setThreadFactory(new ThreadFactory {
172+
override def newThread(task: Runnable): Thread = {
173+
val thread = new Thread(task, "DatabricksAtomicCommitProtocolWorker")
174+
thread.setDaemon(true)
175+
thread
176+
}
177+
})
178+
pool
179+
})
180+
181+
/**
182+
* Traverses the given directories and cleans up uncommitted or garbage files and markers. A
183+
* horizon may be specified beyond which we assume pending jobs have failed. Files written by
184+
* those jobs will be removed as well. Vacuuming will be done in parallel if possible.
185+
*
186+
* @return the list of deleted files
187+
*/
188+
def vacuum(path: Path, horizon: Long): List[Path] = {
189+
val fs = path.getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
190+
val (dirs, initialFiles) = fs.listStatus(path).partition(_.isDirectory)
191+
192+
def checkPositive(time: Long): Long = { assert(time > 0); time }
193+
var deletedPaths: List[Path] = Nil
194+
def delete(p: Path): Unit = {
195+
deletedPaths ::= p
196+
fs.delete(p, false)
197+
}
198+
199+
val (state, resolvedFiles) = resolveCommitState(fs, path, initialFiles)
200+
201+
// remove uncommitted and timed-out file outputs
202+
for (file <- resolvedFiles) {
203+
file.getPath.getName match {
204+
// we wait for a horizon to avoid killing Spark jobs using those files
205+
case name if state.getDeletionTime(name) > 0 && state.getDeletionTime(name) < horizon =>
206+
logInfo(s"Garbage collecting ${file.getPath} since it is marked as deleted.")
207+
delete(file.getPath)
208+
209+
case name @ FILE_WITH_TXN_ID(txnId) if state.isCommitted(txnId) &&
210+
!state.isFileCommitted(txnId, name) =>
211+
logInfo(s"Garbage collecting ${file.getPath} since it was written by a failed task.")
212+
delete(file.getPath)
213+
214+
case name @ FILE_WITH_TXN_ID(txnId) if !state.isCommitted(txnId) &&
215+
checkPositive(state.getStartTime(txnId)) < horizon =>
216+
logInfo(s"Garbage collecting ${file.getPath} since its job has timed out " +
217+
s"(${state.getStartTime(txnId)} < $horizon).")
218+
delete(file.getPath)
219+
220+
case STARTED_MARKER(txnId) if state.isCommitted(txnId) &&
221+
checkPositive(file.getModificationTime) < horizon =>
222+
logInfo(s"Garbage collecting start marker ${file.getPath} of committed job.")
223+
delete(file.getPath)
224+
225+
case _ =>
226+
}
227+
}
228+
229+
// recurse
230+
for (d <- dirs) {
231+
deletedPaths :::= vacuum(d.getPath, horizon)
232+
if (fs.listStatus(d.getPath).isEmpty) {
233+
logInfo(s"Garbage collecting empty directory ${d.getPath}")
234+
delete(d.getPath)
235+
}
236+
}
237+
238+
deletedPaths
239+
}
240+
}

0 commit comments

Comments
 (0)