Skip to content

Commit 6bc4be6

Browse files
committed
[SPARK-14078] Streaming Parquet Based FileSink
This PR adds a new `Sink` implementation that writes out Parquet files. In order to correctly handle partial failures while maintaining exactly once semantics, the files for each batch are written out to a unique directory and then atomically appended to a metadata log. When a parquet based `DataSource` is initialized for reading, we first check for this log directory and use it instead of file listing when present. Unit tests are added, as well as a stress test that checks the answer after non-deterministic injected failures. Author: Michael Armbrust <[email protected]> Closes #11897 from marmbrus/fileSink.
1 parent 919bf32 commit 6bc4be6

File tree

14 files changed

+430
-15
lines changed

14 files changed

+430
-15
lines changed

sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,15 @@ trait ContinuousQuery {
9191
*/
9292
def awaitTermination(timeoutMs: Long): Boolean
9393

94+
/**
95+
* Blocks until all available data in the source has been processed an committed to the sink.
96+
* This method is intended for testing. Note that in the case of continually arriving data, this
97+
* method may block forever. Additionally, this method is only guranteed to block until data that
98+
* has been synchronously appended data to a [[org.apache.spark.sql.execution.streaming.Source]]
99+
* prior to invocation. (i.e. `getOffset` must immediately reflect the addition).
100+
*/
101+
def processAllAvailable(): Unit
102+
94103
/**
95104
* Stops the execution of this query if it is running. This method blocks until the threads
96105
* performing execution has stopped.

sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,12 @@ import org.apache.spark.sql.execution.streaming.{Offset, StreamExecution}
3232
*/
3333
@Experimental
3434
class ContinuousQueryException private[sql](
35-
val query: ContinuousQuery,
35+
@transient val query: ContinuousQuery,
3636
val message: String,
3737
val cause: Throwable,
3838
val startOffset: Option[Offset] = None,
39-
val endOffset: Option[Offset] = None
40-
) extends Exception(message, cause) {
39+
val endOffset: Option[Offset] = None)
40+
extends Exception(message, cause) {
4141

4242
/** Time when the exception occurred */
4343
val time: Long = System.currentTimeMillis

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,15 @@ import java.util.ServiceLoader
2222
import scala.collection.JavaConverters._
2323
import scala.language.{existentials, implicitConversions}
2424
import scala.util.{Failure, Success, Try}
25+
import scala.util.control.NonFatal
2526

2627
import org.apache.hadoop.fs.Path
2728

2829
import org.apache.spark.deploy.SparkHadoopUtil
2930
import org.apache.spark.internal.Logging
3031
import org.apache.spark.sql._
3132
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
32-
import org.apache.spark.sql.execution.streaming.{FileStreamSource, Sink, Source}
33+
import org.apache.spark.sql.execution.streaming._
3334
import org.apache.spark.sql.sources._
3435
import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
3536
import org.apache.spark.util.Utils
@@ -176,14 +177,41 @@ case class DataSource(
176177

177178
/** Returns a sink that can be used to continually write data. */
178179
def createSink(): Sink = {
179-
val datasourceClass = providingClass.newInstance() match {
180-
case s: StreamSinkProvider => s
180+
providingClass.newInstance() match {
181+
case s: StreamSinkProvider => s.createSink(sqlContext, options, partitionColumns)
182+
case format: FileFormat =>
183+
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
184+
val path = caseInsensitiveOptions.getOrElse("path", {
185+
throw new IllegalArgumentException("'path' is not specified")
186+
})
187+
188+
new FileStreamSink(sqlContext, path, format)
181189
case _ =>
182190
throw new UnsupportedOperationException(
183191
s"Data source $className does not support streamed writing")
184192
}
193+
}
185194

186-
datasourceClass.createSink(sqlContext, options, partitionColumns)
195+
/**
196+
* Returns true if there is a single path that has a metadata log indicating which files should
197+
* be read.
198+
*/
199+
def hasMetadata(path: Seq[String]): Boolean = {
200+
path match {
201+
case Seq(singlePath) =>
202+
try {
203+
val hdfsPath = new Path(singlePath)
204+
val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
205+
val metadataPath = new Path(hdfsPath, FileStreamSink.metadataDir)
206+
val res = fs.exists(metadataPath)
207+
res
208+
} catch {
209+
case NonFatal(e) =>
210+
logWarning(s"Error while looking for metadata directory.")
211+
false
212+
}
213+
case _ => false
214+
}
187215
}
188216

189217
/** Create a resolved [[BaseRelation]] that can be used to read data from this [[DataSource]] */
@@ -200,6 +228,34 @@ case class DataSource(
200228
case (_: RelationProvider, Some(_)) =>
201229
throw new AnalysisException(s"$className does not allow user-specified schemas.")
202230

231+
// We are reading from the results of a streaming query. Load files from the metadata log
232+
// instead of listing them using HDFS APIs.
233+
case (format: FileFormat, _)
234+
if hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths) =>
235+
val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
236+
val fileCatalog =
237+
new StreamFileCatalog(sqlContext, basePath)
238+
val dataSchema = userSpecifiedSchema.orElse {
239+
format.inferSchema(
240+
sqlContext,
241+
caseInsensitiveOptions,
242+
fileCatalog.allFiles())
243+
}.getOrElse {
244+
throw new AnalysisException(
245+
s"Unable to infer schema for $format at ${fileCatalog.allFiles().mkString(",")}. " +
246+
"It must be specified manually")
247+
}
248+
249+
HadoopFsRelation(
250+
sqlContext,
251+
fileCatalog,
252+
partitionSchema = fileCatalog.partitionSpec().partitionColumns,
253+
dataSchema = dataSchema,
254+
bucketSpec = None,
255+
format,
256+
options)
257+
258+
// This is a non-streaming file based datasource.
203259
case (format: FileFormat, _) =>
204260
val allPaths = caseInsensitiveOptions.get("path") ++ paths
205261
val globbedPaths = allPaths.flatMap { path =>

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset {
6464
assert(sources.size == offsets.size)
6565
new StreamProgress ++ sources.zip(offsets).collect { case (s, Some(o)) => (s, o) }
6666
}
67+
68+
override def toString: String =
69+
offsets.map(_.map(_.toString).getOrElse("-")).mkString("[", ", ", "]")
6770
}
6871

6972
object CompositeOffset {
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.streaming
19+
20+
import java.util.UUID
21+
22+
import org.apache.hadoop.fs.Path
23+
24+
import org.apache.spark.internal.Logging
25+
import org.apache.spark.sql.{DataFrame, SQLContext}
26+
import org.apache.spark.sql.sources.FileFormat
27+
28+
object FileStreamSink {
29+
// The name of the subdirectory that is used to store metadata about which files are valid.
30+
val metadataDir = "_spark_metadata"
31+
}
32+
33+
/**
34+
* A sink that writes out results to parquet files. Each batch is written out to a unique
35+
* directory. After all of the files in a batch have been succesfully written, the list of
36+
* file paths is appended to the log atomically. In the case of partial failures, some duplicate
37+
* data may be present in the target directory, but only one copy of each file will be present
38+
* in the log.
39+
*/
40+
class FileStreamSink(
41+
sqlContext: SQLContext,
42+
path: String,
43+
fileFormat: FileFormat) extends Sink with Logging {
44+
45+
private val basePath = new Path(path)
46+
private val logPath = new Path(basePath, FileStreamSink.metadataDir)
47+
private val fileLog = new HDFSMetadataLog[Seq[String]](sqlContext, logPath.toUri.toString)
48+
49+
override def addBatch(batchId: Long, data: DataFrame): Unit = {
50+
if (fileLog.get(batchId).isDefined) {
51+
logInfo(s"Skipping already committed batch $batchId")
52+
} else {
53+
val files = writeFiles(data)
54+
if (fileLog.add(batchId, files)) {
55+
logInfo(s"Committed batch $batchId")
56+
} else {
57+
logWarning(s"Race while writing batch $batchId")
58+
}
59+
}
60+
}
61+
62+
/** Writes the [[DataFrame]] to a UUID-named dir, returning the list of files paths. */
63+
private def writeFiles(data: DataFrame): Seq[String] = {
64+
val ctx = sqlContext
65+
val outputDir = path
66+
val format = fileFormat
67+
val schema = data.schema
68+
69+
val file = new Path(basePath, UUID.randomUUID().toString).toUri.toString
70+
data.write.parquet(file)
71+
sqlContext.read
72+
.schema(data.schema)
73+
.parquet(file)
74+
.inputFiles
75+
.map(new Path(_))
76+
.filterNot(_.getName.startsWith("_"))
77+
.map(_.toUri.toString)
78+
}
79+
80+
override def toString: String = s"FileSink[$path]"
81+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ class FileStreamSource(
4444
private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)
4545

4646
private val seenFiles = new OpenHashSet[String]
47-
metadataLog.get(None, maxBatchId).foreach { case (batchId, files) =>
47+
metadataLog.get(None, Some(maxBatchId)).foreach { case (batchId, files) =>
4848
files.foreach(seenFiles.add)
4949
}
5050

@@ -114,18 +114,24 @@ class FileStreamSource(
114114
val endId = end.asInstanceOf[LongOffset].offset
115115

116116
assert(startId <= endId)
117-
val files = metadataLog.get(Some(startId + 1), endId).map(_._2).flatten
118-
logDebug(s"Return files from batches ${startId + 1}:$endId")
117+
val files = metadataLog.get(Some(startId + 1), Some(endId)).map(_._2).flatten
118+
logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId")
119119
logDebug(s"Streaming ${files.mkString(", ")}")
120120
dataFrameBuilder(files)
121121

122122
}
123123

124124
private def fetchAllFiles(): Seq[String] = {
125-
fs.listStatus(new Path(path))
125+
val startTime = System.nanoTime()
126+
val files = fs.listStatus(new Path(path))
126127
.filterNot(_.getPath.getName.startsWith("_"))
127128
.map(_.getPath.toUri.toString)
129+
val endTime = System.nanoTime()
130+
logDebug(s"Listed ${files.size} in ${(endTime.toDouble - startTime) / 1000000}ms")
131+
files
128132
}
129133

130134
override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.offset == -1)
135+
136+
override def toString: String = s"FileSource[$path]"
131137
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,11 +170,12 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String)
170170
}
171171
}
172172

173-
override def get(startId: Option[Long], endId: Long): Array[(Long, T)] = {
174-
val batchIds = fc.util().listStatus(metadataPath, batchFilesFilter)
173+
override def get(startId: Option[Long], endId: Option[Long]): Array[(Long, T)] = {
174+
val files = fc.util().listStatus(metadataPath, batchFilesFilter)
175+
val batchIds = files
175176
.map(_.getPath.getName.toLong)
176177
.filter { batchId =>
177-
batchId <= endId && (startId.isEmpty || batchId >= startId.get)
178+
(endId.isEmpty || batchId <= endId.get) && (startId.isEmpty || batchId >= startId.get)
178179
}
179180
batchIds.sorted.map(batchId => (batchId, get(batchId))).filter(_._2.isDefined).map {
180181
case (batchId, metadataOption) =>

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,6 @@ case class LongOffset(offset: Long) extends Offset {
3030

3131
def +(increment: Long): LongOffset = new LongOffset(offset + increment)
3232
def -(decrement: Long): LongOffset = new LongOffset(offset - decrement)
33+
34+
override def toString: String = s"#$offset"
3335
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ trait MetadataLog[T] {
4242
* Return metadata for batches between startId (inclusive) and endId (inclusive). If `startId` is
4343
* `None`, just return all batches before endId (inclusive).
4444
*/
45-
def get(startId: Option[Long], endId: Long): Array[(Long, T)]
45+
def get(startId: Option[Long], endId: Option[Long]): Array[(Long, T)]
4646

4747
/**
4848
* Return the latest batch Id and its metadata if exist.

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,12 @@ class StreamExecution(
239239
logInfo(s"Committed offsets for batch $currentBatchId.")
240240
true
241241
} else {
242+
noNewData = true
243+
awaitBatchLock.synchronized {
244+
// Wake up any threads that are waiting for the stream to progress.
245+
awaitBatchLock.notifyAll()
246+
}
247+
242248
false
243249
}
244250
}
@@ -334,6 +340,18 @@ class StreamExecution(
334340
logDebug(s"Unblocked at $newOffset for $source")
335341
}
336342

343+
/** A flag to indicate that a batch has completed with no new data available. */
344+
@volatile private var noNewData = false
345+
346+
override def processAllAvailable(): Unit = {
347+
noNewData = false
348+
while (!noNewData) {
349+
awaitBatchLock.synchronized { awaitBatchLock.wait(10000) }
350+
if (streamDeathCause != null) { throw streamDeathCause }
351+
}
352+
if (streamDeathCause != null) { throw streamDeathCause }
353+
}
354+
337355
override def awaitTermination(): Unit = {
338356
if (state == INITIALIZED) {
339357
throw new IllegalStateException("Cannot wait for termination on a query that has not started")

0 commit comments

Comments
 (0)