Skip to content

Commit b0f422c

Browse files
jose-torrestdas
authored andcommitted
[SPARK-23559][SS] Add epoch ID to DataWriterFactory.
## What changes were proposed in this pull request? Add an epoch ID argument to DataWriterFactory for use in streaming. As a side effect of passing in this value, DataWriter will now have a consistent lifecycle; commit() or abort() ends the lifecycle of a DataWriter instance in any execution mode. I considered making a separate streaming interface and adding the epoch ID only to that one, but I think it requires a lot of extra work for no real gain. I think it makes sense to define epoch 0 as the one and only epoch of a non-streaming query. ## How was this patch tested? existing unit tests Author: Jose Torres <[email protected]> Closes #20710 from jose-torres/api2.
1 parent ba622f4 commit b0f422c

File tree

9 files changed

+65
-28
lines changed

9 files changed

+65
-28
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,10 @@ case class KafkaStreamWriterFactory(
6565
topic: Option[String], producerParams: Map[String, String], schema: StructType)
6666
extends DataWriterFactory[InternalRow] {
6767

68-
override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = {
68+
override def createDataWriter(
69+
partitionId: Int,
70+
attemptNumber: Int,
71+
epochId: Long): DataWriter[InternalRow] = {
6972
new KafkaStreamDataWriter(topic, producerParams, schema.toAttributes)
7073
}
7174
}

sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.apache.spark.annotation.InterfaceStability;
2323

2424
/**
25-
* A data writer returned by {@link DataWriterFactory#createDataWriter(int, int)} and is
25+
* A data writer returned by {@link DataWriterFactory#createDataWriter(int, int, long)} and is
2626
* responsible for writing data for an input RDD partition.
2727
*
2828
* One Spark task has one exclusive data writer, so there is no thread-safe concern.
@@ -31,13 +31,17 @@
3131
* the {@link #write(Object)}, {@link #abort()} is called afterwards and the remaining records will
3232
* not be processed. If all records are successfully written, {@link #commit()} is called.
3333
*
34+
* Once a data writer returns successfully from {@link #commit()} or {@link #abort()}, its lifecycle
35+
* is over and Spark will not use it again.
36+
*
3437
* If this data writer succeeds(all records are successfully written and {@link #commit()}
3538
* succeeds), a {@link WriterCommitMessage} will be sent to the driver side and pass to
3639
* {@link DataSourceWriter#commit(WriterCommitMessage[])} with commit messages from other data
3740
* writers. If this data writer fails(one record fails to write or {@link #commit()} fails), an
38-
* exception will be sent to the driver side, and Spark will retry this writing task for some times,
39-
* each time {@link DataWriterFactory#createDataWriter(int, int)} gets a different `attemptNumber`,
40-
* and finally call {@link DataSourceWriter#abort(WriterCommitMessage[])} if all retry fail.
41+
* exception will be sent to the driver side, and Spark may retry this writing task a few times.
42+
* In each retry, {@link DataWriterFactory#createDataWriter(int, int, long)} will receive a
43+
* different `attemptNumber`. Spark will call {@link DataSourceWriter#abort(WriterCommitMessage[])}
44+
* when the configured number of retries is exhausted.
4145
*
4246
* Besides the retry mechanism, Spark may launch speculative tasks if the existing writing task
4347
* takes too long to finish. Different from retried tasks, which are launched one by one after the

sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ public interface DataWriterFactory<T> extends Serializable {
4848
* same task id but different attempt number, which means there are multiple
4949
* tasks with the same task id running at the same time. Implementations can
5050
* use this attempt number to distinguish writers of different task attempts.
51+
* @param epochId A monotonically increasing id for streaming queries that are split in to
52+
* discrete periods of execution. For non-streaming queries,
53+
* this ID will always be 0.
5154
*/
52-
DataWriter<T> createDataWriter(int partitionId, int attemptNumber);
55+
DataWriter<T> createDataWriter(int partitionId, int attemptNumber, long epochId);
5356
}

sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,10 @@
2323
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
2424

2525
/**
26-
* A {@link DataSourceWriter} for use with structured streaming. This writer handles commits and
27-
* aborts relative to an epoch ID determined by the execution engine.
26+
* A {@link DataSourceWriter} for use with structured streaming.
2827
*
29-
* {@link DataWriter} implementations generated by a StreamWriter may be reused for multiple epochs,
30-
* and so must reset any internal state after a successful commit.
28+
* Streaming queries are divided into intervals of data called epochs, with a monotonically
29+
* increasing numeric ID. This writer handles commits and aborts for each successive epoch.
3130
*/
3231
@InterfaceStability.Evolving
3332
public interface StreamWriter extends DataSourceWriter {
@@ -39,21 +38,21 @@ public interface StreamWriter extends DataSourceWriter {
3938
* If this method fails (by throwing an exception), this writing job is considered to have been
4039
* failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}.
4140
*
42-
* To support exactly-once processing, writer implementations should ensure that this method is
43-
* idempotent. The execution engine may call commit() multiple times for the same epoch
44-
* in some circumstances.
41+
* The execution engine may call commit() multiple times for the same epoch in some circumstances.
42+
* To support exactly-once data semantics, implementations must ensure that multiple commits for
43+
* the same epoch are idempotent.
4544
*/
4645
void commit(long epochId, WriterCommitMessage[] messages);
4746

4847
/**
49-
* Aborts this writing job because some data writers are failed and keep failing when retry, or
48+
* Aborts this writing job because some data writers are failed and keep failing when retried, or
5049
* the Spark job fails with some unknown reasons, or {@link #commit(WriterCommitMessage[])} fails.
5150
*
5251
* If this method fails (by throwing an exception), the underlying data source may require manual
5352
* cleanup.
5453
*
55-
* Unless the abort is triggered by the failure of commit, the given messages should have some
56-
* null slots as there maybe only a few data writers that are committed before the abort
54+
* Unless the abort is triggered by the failure of commit, the given messages will have some
55+
* null slots, as there may be only a few data writers that were committed before the abort
5756
* happens, or some data writers were committed but their commit messages haven't reached the
5857
* driver when the abort is triggered. So this is just a "best effort" for data sources to
5958
* clean up the data left by data writers.

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
2929
import org.apache.spark.sql.catalyst.expressions.Attribute
3030
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3131
import org.apache.spark.sql.execution.SparkPlan
32-
import org.apache.spark.sql.execution.streaming.StreamExecution
32+
import org.apache.spark.sql.execution.streaming.{MicroBatchExecution, StreamExecution}
3333
import org.apache.spark.sql.execution.streaming.continuous.{CommitPartitionEpoch, ContinuousExecution, EpochCoordinatorRef, SetWriterPartitions}
3434
import org.apache.spark.sql.sources.v2.writer._
3535
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
@@ -132,7 +132,8 @@ object DataWritingSparkTask extends Logging {
132132
val stageId = context.stageId()
133133
val partId = context.partitionId()
134134
val attemptId = context.attemptNumber()
135-
val dataWriter = writeTask.createDataWriter(partId, attemptId)
135+
val epochId = Option(context.getLocalProperty(MicroBatchExecution.BATCH_ID_KEY)).getOrElse("0")
136+
val dataWriter = writeTask.createDataWriter(partId, attemptId, epochId.toLong)
136137

137138
// write the data and commit this writer.
138139
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
@@ -172,18 +173,22 @@ object DataWritingSparkTask extends Logging {
172173
writeTask: DataWriterFactory[InternalRow],
173174
context: TaskContext,
174175
iter: Iterator[InternalRow]): WriterCommitMessage = {
175-
val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber())
176176
val epochCoordinator = EpochCoordinatorRef.get(
177177
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
178178
SparkEnv.get)
179179
val currentMsg: WriterCommitMessage = null
180180
var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
181181

182182
do {
183+
var dataWriter: DataWriter[InternalRow] = null
183184
// write the data and commit this writer.
184185
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
185186
try {
186-
iter.foreach(dataWriter.write)
187+
dataWriter = writeTask.createDataWriter(
188+
context.partitionId(), context.attemptNumber(), currentEpoch)
189+
while (iter.hasNext) {
190+
dataWriter.write(iter.next())
191+
}
187192
logInfo(s"Writer for partition ${context.partitionId()} is committing.")
188193
val msg = dataWriter.commit()
189194
logInfo(s"Writer for partition ${context.partitionId()} committed.")
@@ -196,9 +201,10 @@ object DataWritingSparkTask extends Logging {
196201
// Continuous shutdown always involves an interrupt. Just finish the task.
197202
}
198203
})(catchBlock = {
199-
// If there is an error, abort this writer
204+
// If there is an error, abort this writer. We enter this callback in the middle of
205+
// rethrowing an exception, so runContinuous will stop executing at this point.
200206
logError(s"Writer for partition ${context.partitionId()} is aborting.")
201-
dataWriter.abort()
207+
if (dataWriter != null) dataWriter.abort()
202208
logError(s"Writer for partition ${context.partitionId()} aborted.")
203209
})
204210
} while (!context.isInterrupted())
@@ -211,9 +217,12 @@ class InternalRowDataWriterFactory(
211217
rowWriterFactory: DataWriterFactory[Row],
212218
schema: StructType) extends DataWriterFactory[InternalRow] {
213219

214-
override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = {
220+
override def createDataWriter(
221+
partitionId: Int,
222+
attemptNumber: Int,
223+
epochId: Long): DataWriter[InternalRow] = {
215224
new InternalRowDataWriter(
216-
rowWriterFactory.createDataWriter(partitionId, attemptNumber),
225+
rowWriterFactory.createDataWriter(partitionId, attemptNumber, epochId),
217226
RowEncoder.apply(schema).resolveAndBind())
218227
}
219228
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,9 @@ class MicroBatchExecution(
469469
case _ => throw new IllegalArgumentException(s"unknown sink type for $sink")
470470
}
471471

472+
sparkSessionToRunBatch.sparkContext.setLocalProperty(
473+
MicroBatchExecution.BATCH_ID_KEY, currentBatchId.toString)
474+
472475
reportTimeTaken("queryPlanning") {
473476
lastExecution = new IncrementalExecution(
474477
sparkSessionToRunBatch,
@@ -518,3 +521,7 @@ class MicroBatchExecution(
518521
Optional.ofNullable(scalaOption.orNull)
519522
}
520523
}
524+
525+
object MicroBatchExecution {
526+
val BATCH_ID_KEY = "streaming.sql.batchId"
527+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, DataWriter, Dat
3131
* for production-quality sinks. It's intended for use in tests.
3232
*/
3333
case object PackedRowWriterFactory extends DataWriterFactory[Row] {
34-
def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = {
34+
override def createDataWriter(
35+
partitionId: Int,
36+
attemptNumber: Int,
37+
epochId: Long): DataWriter[Row] = {
3538
new PackedRowDataWriter()
3639
}
3740
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,10 @@ class MemoryStreamWriter(val sink: MemorySinkV2, outputMode: OutputMode)
147147
}
148148

149149
case class MemoryWriterFactory(outputMode: OutputMode) extends DataWriterFactory[Row] {
150-
def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = {
150+
override def createDataWriter(
151+
partitionId: Int,
152+
attemptNumber: Int,
153+
epochId: Long): DataWriter[Row] = {
151154
new MemoryDataWriter(partitionId, outputMode)
152155
}
153156
}

sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,10 @@ private[v2] object SimpleCounter {
207207
class SimpleCSVDataWriterFactory(path: String, jobId: String, conf: SerializableConfiguration)
208208
extends DataWriterFactory[Row] {
209209

210-
override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = {
210+
override def createDataWriter(
211+
partitionId: Int,
212+
attemptNumber: Int,
213+
epochId: Long): DataWriter[Row] = {
211214
val jobPath = new Path(new Path(path, "_temporary"), jobId)
212215
val filePath = new Path(jobPath, s"$jobId-$partitionId-$attemptNumber")
213216
val fs = filePath.getFileSystem(conf.value)
@@ -240,7 +243,10 @@ class SimpleCSVDataWriter(fs: FileSystem, file: Path) extends DataWriter[Row] {
240243
class InternalRowCSVDataWriterFactory(path: String, jobId: String, conf: SerializableConfiguration)
241244
extends DataWriterFactory[InternalRow] {
242245

243-
override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = {
246+
override def createDataWriter(
247+
partitionId: Int,
248+
attemptNumber: Int,
249+
epochId: Long): DataWriter[InternalRow] = {
244250
val jobPath = new Path(new Path(path, "_temporary"), jobId)
245251
val filePath = new Path(jobPath, s"$jobId-$partitionId-$attemptNumber")
246252
val fs = filePath.getFileSystem(conf.value)

0 commit comments

Comments
 (0)