Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ case class KafkaStreamWriterFactory(
topic: Option[String], producerParams: Map[String, String], schema: StructType)
extends DataWriterFactory[InternalRow] {

override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = {
override def createDataWriter(
partitionId: Int,
attemptNumber: Int,
epochId: Long): DataWriter[InternalRow] = {
new KafkaStreamDataWriter(topic, producerParams, schema.toAttributes)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.spark.annotation.InterfaceStability;

/**
* A data writer returned by {@link DataWriterFactory#createDataWriter(int, int)} and is
* A data writer returned by {@link DataWriterFactory#createDataWriter(int, int, long)} and is
* responsible for writing data for an input RDD partition.
*
* One Spark task has one exclusive data writer, so there is no thread-safe concern.
Expand All @@ -36,8 +36,9 @@
* {@link DataSourceWriter#commit(WriterCommitMessage[])} with commit messages from other data
* writers. If this data writer fails(one record fails to write or {@link #commit()} fails), an
* exception will be sent to the driver side, and Spark will retry this writing task for some times,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark may retry... (in continuous we dont retry the task)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for some times --> for a few times

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Break this sentence. very long.

* each time {@link DataWriterFactory#createDataWriter(int, int)} gets a different `attemptNumber`,
* and finally call {@link DataSourceWriter#abort(WriterCommitMessage[])} if all retry fail.
* each time {@link DataWriterFactory#createDataWriter(int, int, long)} gets a different
* `attemptNumber`, and finally call {@link DataSourceWriter#abort(WriterCommitMessage[])} if all
* retry fail.
*
* Besides the retry mechanism, Spark may launch speculative tasks if the existing writing task
* takes too long to finish. Different from retried tasks, which are launched one by one after the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ public interface DataWriterFactory<T> extends Serializable {
* same task id but different attempt number, which means there are multiple
* tasks with the same task id running at the same time. Implementations can
* use this attempt number to distinguish writers of different task attempts.
* @param epochId A monotonically increasing id for streaming queries that are split in to
* discrete periods of execution. For queries that execute as a single batch, this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For non-streaming queries, this...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, make it clear that, this is batchId for MicroBatch processing and epochId for Continuous processing

* id will always be zero.
*/
DataWriter<T> createDataWriter(int partitionId, int attemptNumber);
DataWriter<T> createDataWriter(int partitionId, int attemptNumber, long epochId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add clear lifecycle semantics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we using the same interface for streaming and batch here? Is there a compelling reason to do so instead of adding StreamingWriterFactory? Are the guarantees for an epoch identical to those of a single batch job?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The guarantees are identical, and in the current execution model, each epoch is in fact processed by a single batch job.

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@

/**
* A {@link DataSourceWriter} for use with structured streaming. This writer handles commits and
* aborts relative to an epoch ID determined by the execution engine.
*
* {@link DataWriter} implementations generated by a StreamWriter may be reused for multiple epochs,
* and so must reset any internal state after a successful commit.
* aborts relative to an epoch ID provided by the execution engine.
*/
@InterfaceStability.Evolving
public interface StreamWriter extends DataSourceWriter {
Expand All @@ -39,21 +36,21 @@ public interface StreamWriter extends DataSourceWriter {
* If this method fails (by throwing an exception), this writing job is considered to have been
* failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}.
*
* To support exactly-once processing, writer implementations should ensure that this method is
* idempotent. The execution engine may call commit() multiple times for the same epoch
* in some circumstances.
* The execution engine may call commit() multiple times for the same epoch in some circumstances.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somewhere in this file, add docs about what epochId means for MB and C execution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

* To support exactly-once data semantics, implementations must ensure that multiple commits for
* the same epoch are idempotent.
*/
void commit(long epochId, WriterCommitMessage[] messages);

/**
* Aborts this writing job because some data writers are failed and keep failing when retry, or
* Aborts this writing job because some data writers are failed and keep failing when retried, or
* the Spark job fails with some unknown reasons, or {@link #commit(WriterCommitMessage[])} fails.
*
* If this method fails (by throwing an exception), the underlying data source may require manual
* cleanup.
*
* Unless the abort is triggered by the failure of commit, the given messages should have some
* null slots as there maybe only a few data writers that are committed before the abort
* Unless the abort is triggered by the failure of commit, the given messages will have some
* null slots, as there may be only a few data writers that were committed before the abort
* happens, or some data writers were committed but their commit messages haven't reached the
* driver when the abort is triggered. So this is just a "best effort" for data sources to
* clean up the data left by data writers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.streaming.StreamExecution
import org.apache.spark.sql.execution.streaming.{MicroBatchExecution, StreamExecution}
import org.apache.spark.sql.execution.streaming.continuous.{CommitPartitionEpoch, ContinuousExecution, EpochCoordinatorRef, SetWriterPartitions}
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
Expand Down Expand Up @@ -132,7 +132,8 @@ object DataWritingSparkTask extends Logging {
val stageId = context.stageId()
val partId = context.partitionId()
val attemptId = context.attemptNumber()
val dataWriter = writeTask.createDataWriter(partId, attemptId)
val epochId = Option(context.getLocalProperty(MicroBatchExecution.BATCH_ID_KEY)).getOrElse("0")
val dataWriter = writeTask.createDataWriter(partId, attemptId, epochId.toLong)

// write the data and commit this writer.
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
Expand Down Expand Up @@ -172,17 +173,19 @@ object DataWritingSparkTask extends Logging {
writeTask: DataWriterFactory[InternalRow],
context: TaskContext,
iter: Iterator[InternalRow]): WriterCommitMessage = {
val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber())
val epochCoordinator = EpochCoordinatorRef.get(
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
SparkEnv.get)
val currentMsg: WriterCommitMessage = null
var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong

do {
var dataWriter: DataWriter[InternalRow] = null
// write the data and commit this writer.
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
try {
dataWriter = writeTask.createDataWriter(
context.partitionId(), context.attemptNumber(), currentEpoch)
iter.foreach(dataWriter.write)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix this! dont use foreach.

logInfo(s"Writer for partition ${context.partitionId()} is committing.")
val msg = dataWriter.commit()
Expand All @@ -198,7 +201,7 @@ object DataWritingSparkTask extends Logging {
})(catchBlock = {
// If there is an error, abort this writer
logError(s"Writer for partition ${context.partitionId()} is aborting.")
dataWriter.abort()
if (dataWriter != null) dataWriter.abort()
logError(s"Writer for partition ${context.partitionId()} aborted.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add comment that the exception will be rethrown.

})
} while (!context.isInterrupted())
Expand All @@ -211,9 +214,12 @@ class InternalRowDataWriterFactory(
rowWriterFactory: DataWriterFactory[Row],
schema: StructType) extends DataWriterFactory[InternalRow] {

override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = {
override def createDataWriter(
partitionId: Int,
attemptNumber: Int,
epochId: Long): DataWriter[InternalRow] = {
new InternalRowDataWriter(
rowWriterFactory.createDataWriter(partitionId, attemptNumber),
rowWriterFactory.createDataWriter(partitionId, attemptNumber, epochId),
RowEncoder.apply(schema).resolveAndBind())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,9 @@ class MicroBatchExecution(
case _ => throw new IllegalArgumentException(s"unknown sink type for $sink")
}

sparkSessionToRunBatch.sparkContext.setLocalProperty(
MicroBatchExecution.BATCH_ID_KEY, currentBatchId.toString)

reportTimeTaken("queryPlanning") {
lastExecution = new IncrementalExecution(
sparkSessionToRunBatch,
Expand Down Expand Up @@ -518,3 +521,7 @@ class MicroBatchExecution(
Optional.ofNullable(scalaOption.orNull)
}
}

object MicroBatchExecution {
val BATCH_ID_KEY = "streaming.sql.batchId"
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, DataWriter, Dat
* for production-quality sinks. It's intended for use in tests.
*/
case object PackedRowWriterFactory extends DataWriterFactory[Row] {
def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = {
override def createDataWriter(
partitionId: Int,
attemptNumber: Int,
epochId: Long): DataWriter[Row] = {
new PackedRowDataWriter()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,10 @@ class MemoryStreamWriter(val sink: MemorySinkV2, outputMode: OutputMode)
}

case class MemoryWriterFactory(outputMode: OutputMode) extends DataWriterFactory[Row] {
def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = {
override def createDataWriter(
partitionId: Int,
attemptNumber: Int,
epochId: Long): DataWriter[Row] = {
new MemoryDataWriter(partitionId, outputMode)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,10 @@ private[v2] object SimpleCounter {
class SimpleCSVDataWriterFactory(path: String, jobId: String, conf: SerializableConfiguration)
extends DataWriterFactory[Row] {

override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = {
override def createDataWriter(
partitionId: Int,
attemptNumber: Int,
epochId: Long): DataWriter[Row] = {
val jobPath = new Path(new Path(path, "_temporary"), jobId)
val filePath = new Path(jobPath, s"$jobId-$partitionId-$attemptNumber")
val fs = filePath.getFileSystem(conf.value)
Expand Down Expand Up @@ -240,7 +243,10 @@ class SimpleCSVDataWriter(fs: FileSystem, file: Path) extends DataWriter[Row] {
class InternalRowCSVDataWriterFactory(path: String, jobId: String, conf: SerializableConfiguration)
extends DataWriterFactory[InternalRow] {

override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = {
override def createDataWriter(
partitionId: Int,
attemptNumber: Int,
epochId: Long): DataWriter[InternalRow] = {
val jobPath = new Path(new Path(path, "_temporary"), jobId)
val filePath = new Path(jobPath, s"$jobId-$partitionId-$attemptNumber")
val fs = filePath.getFileSystem(conf.value)
Expand Down