Skip to content

Conversation

@jose-torres
Copy link
Contributor

@jose-torres jose-torres commented Mar 2, 2018

What changes were proposed in this pull request?

Add an epoch ID argument to DataWriterFactory for use in streaming. As a side effect of passing in this value, DataWriter will now have a consistent lifecycle; commit() or abort() ends the lifecycle of a DataWriter instance in any execution mode.

I considered making a separate streaming interface and adding the epoch ID only to that one, but I think it requires a lot of extra work for no real gain. I think it makes sense to define epoch 0 as the one and only epoch of a non-streaming query.

How was this patch tested?

existing unit tests

@jose-torres
Copy link
Contributor Author

@tdas @rdblue @cloud-fan

I haven't forgotten that we need a design doc before finalization; SPARK-23556 tracks that.

@SparkQA
Copy link

SparkQA commented Mar 2, 2018

Test build #87862 has finished for PR 20710 at commit 5bbd497.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 2, 2018

Test build #87909 has finished for PR 20710 at commit cb6b2cf.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

* id will always be zero.
*/
DataWriter<T> createDataWriter(int partitionId, int attemptNumber);
DataWriter<T> createDataWriter(int partitionId, int attemptNumber, long epochId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add clear lifecycle semantics.

logError(s"Writer for partition ${context.partitionId()} is aborting.")
dataWriter.abort()
if (dataWriter != null) dataWriter.abort()
logError(s"Writer for partition ${context.partitionId()} aborted.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add comment that the exception will be rethrown.

try {
dataWriter = writeTask.createDataWriter(
context.partitionId(), context.attemptNumber(), currentEpoch)
iter.foreach(dataWriter.write)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix this! dont use foreach.

* succeeds), a {@link WriterCommitMessage} will be sent to the driver side and pass to
* {@link DataSourceWriter#commit(WriterCommitMessage[])} with commit messages from other data
* writers. If this data writer fails(one record fails to write or {@link #commit()} fails), an
* exception will be sent to the driver side, and Spark will retry this writing task for some times,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark may retry... (in continuous we dont retry the task)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for some times --> for a few times

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Break this sentence. very long.

* tasks with the same task id running at the same time. Implementations can
* use this attempt number to distinguish writers of different task attempts.
* @param epochId A monotonically increasing id for streaming queries that are split in to
* discrete periods of execution. For queries that execute as a single batch, this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For non-streaming queries, this...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, make it clear that, this is batchId for MicroBatch processing and epochId for Continuous processing

* To support exactly-once processing, writer implementations should ensure that this method is
* idempotent. The execution engine may call commit() multiple times for the same epoch
* in some circumstances.
* The execution engine may call commit() multiple times for the same epoch in some circumstances.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somewhere in this file, add docs about what epochId means for MB and C execution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@SparkQA
Copy link

SparkQA commented Mar 3, 2018

Test build #87912 has finished for PR 20710 at commit 544eb1b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 3, 2018

Test build #87915 has finished for PR 20710 at commit 79495b1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 3, 2018

Test build #87918 has finished for PR 20710 at commit 215c225.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 3, 2018

Test build #87917 has finished for PR 20710 at commit 9fb74e2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* and finally call {@link DataSourceWriter#abort(WriterCommitMessage[])} if all retry fail.
* exception will be sent to the driver side, and Spark may retry this writing task a few times.
* In each retry, {@link DataWriterFactory#createDataWriter(int, int, long)} will receive a
* different `attemptNumber`. Spark will call {@link DataSourceWriter#abort(WriterCommitMessage[])}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not clear to me. Isnt it the case that abort will be called every time a task attempt ends in an error?
This seems to give the impression that abort is called only after N failed attempts have been made.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The local abort will be called every time a task attempt fails. The global abort referenced here is called only when the job fails.

iter.foreach(dataWriter.write)
dataWriter = writeTask.createDataWriter(
context.partitionId(), context.attemptNumber(), currentEpoch)
while (iter.hasNext) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason to change foreach to a while loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC (/cc @tdas) foreach can be problematic in tight loops, because it introduces a lambda that isn't always optimized away.

@cloud-fan
Copy link
Contributor

LGTM

* this ID will always be 0.
*/
DataWriter<T> createDataWriter(int partitionId, int attemptNumber);
DataWriter<T> createDataWriter(int partitionId, int attemptNumber, long epochId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we using the same interface for streaming and batch here? Is there a compelling reason to do so instead of adding StreamingWriterFactory? Are the guarantees for an epoch identical to those of a single batch job?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The guarantees are identical, and in the current execution model, each epoch is in fact processed by a single batch job.

@asfgit asfgit closed this in b0f422c Mar 5, 2018
@tdas
Copy link
Contributor

tdas commented Mar 5, 2018

@rdblue @jose-torres arrgh... i didnt notice that you guys were still commenting before i merged it.
feel free to continue discussion and if any change is needed we will deal with this accordingly. sorry about it!

@rdblue
Copy link
Contributor

rdblue commented Mar 5, 2018

@tdas, thanks for letting us know. I'm really wondering if we should be using the same interfaces between batch and streaming. The epoch id strikes me as strange for data sources that won't support streaming. What do you think?

@jose-torres
Copy link
Contributor Author

My primary concern with splitting the interfaces is that it makes it easy for Spark changes to accidentally do the wrong thing. Callers of DataWriterFactory.createDataWriter() won't necessarily notice that there's a StreamingDataWriterFactory which needs to be supported; they'd likely just end up writing code which will break with an opaque internal error in a streaming query.

@rdblue
Copy link
Contributor

rdblue commented Mar 5, 2018

@jose-torres, can you explain that more for me? Why would callers only use one interface but not the other? Wouldn't streaming use one and batch the other? Why would batch need to know about streaming and vice versa? The simplification is for implementers. It seems odd for implementations to deal with parameters that are for something else (e.g., don't worry about this for batch).

@jose-torres
Copy link
Contributor Author

jose-torres commented Mar 5, 2018

There isn't a currently a distinction between streaming and batch in the places where this interface is called (except in the experimental continuous processing streaming mode). The streaming engine executes a sequence of WriteToDataSourceV2Exec plans, in the same way that a sequence of unrelated batch queries would be executed. The only thing distinguishing streaming queries is that they have a custom DataSourceWriter implementation, which forwards each individual epoch to the StreamWriter.

@rdblue
Copy link
Contributor

rdblue commented Mar 6, 2018

Could the non-continuous streaming mode just use the batch interface, since each write is basically separate?

@jose-torres
Copy link
Contributor Author

jose-torres commented Mar 6, 2018

I'm not certain I understand the question.

From the perspective of query plan execution, the non-continuous streaming mode does just use the batch interface. The motivation of adding epoch ID to DataWriterFactory is to allow it to continue using the batch interface, rather than adding a StreamingDataWriterFactory which it must use instead of the batch interface.

From the perspective of the writer, the batch interface isn't sufficient. Epoch ID is relevant for the same reason that partition ID is; Spark may need to distinguish between different segments of the data when talking to the remote sink.

@rdblue
Copy link
Contributor

rdblue commented Mar 6, 2018

My question is: why can't we use a batch interface for batch and micro-batch (which behaves like batch) and add a separate streaming interface for continuous streaming? I see no reason to have epoch ID for batch, and it seems janky to add options that implementers should know to ignore.

Spark may need to distinguish between different segments of the data when talking to the remote sink.

For which case, continuous or micro-batch?

@jose-torres
Copy link
Contributor Author

jose-torres commented Mar 6, 2018

For either case. Any streaming writer has to know that epoch 1 and epoch 2 are part of the same query, for the same reasons it has to know that task attempt 0 and task attempt 1 are iterations of the same task.

@jose-torres
Copy link
Contributor Author

Partitions are a better example than task attempts, but it's still roughly the same idea. Data source writers need to be able to reason about what progress they've made, which is impossible in the streaming case if each epoch is its own disconnected query.

@rdblue
Copy link
Contributor

rdblue commented Mar 6, 2018

Data source writers need to be able to reason about what progress they've made, which is impossible in the streaming case if each epoch is its own disconnected query.

I don't think the writers necessarily need to reason about progress. Are you saying that there are guarantees the writers need to make, like ordering how data appears?

I'm thinking of an implementation that creates a file for each task commit and the driver's commit operation makes those available. That doesn't require any progress tracking on tasks.

As far as a writer knowing that different epochs are part of the same query: why? Is there something the writer needs to do? If so, then I think that is more of an argument for a separate streaming interface, or else batch implementations that ignore the epoch might do the wrong thing.

@jose-torres
Copy link
Contributor Author

As you say, there's no strict semantic need to have createDataWriter() take arguments. We could simply have each DataWriter identify itself by a random UUID, and require upstream components to keep track of which UUIDs map to which of the writers they care about. But the current API design is to enable each data writer to identify its logical place in the query, and epoch ID is an important part of that. (I expect it would be infeasible to migrate existing sources to an API which didn't provide things like partition ID or attempt number.)

StreamWriter is the separate streaming interface, and DataWriterFactory implementations in streaming queries will always come from a StreamWriter.

@rdblue
Copy link
Contributor

rdblue commented Mar 6, 2018

Epoch ID is not a valid part of the logical place in a query for batch. I think we should separate batch and streaming, as they are already coming from different interfaces. There's no need to pass useless information to a batch writer or committer.

Implementations can choose to use the same logic if they want, but we should keep the API focused on what is needed, to keep it reasonable for implementers.

@jose-torres
Copy link
Contributor Author

I still maintain that it's sensible to say a batch query is a query that has only one epoch, and that the ship has sailed on passing useless information. But I'm bikeshedding here. Created #20752 to split the interfaces.

otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
Add an epoch ID argument to DataWriterFactory for use in streaming. As a side effect of passing in this value, DataWriter will now have a consistent lifecycle; commit() or abort() ends the lifecycle of a DataWriter instance in any execution mode.

I considered making a separate streaming interface and adding the epoch ID only to that one, but I think it requires a lot of extra work for no real gain. I think it makes sense to define epoch 0 as the one and only epoch of a non-streaming query.

existing unit tests

Author: Jose Torres <[email protected]>

Closes apache#20710 from jose-torres/api2.

Ref: LIHADOOP-48531

RB=1838805
A=
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants