-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-23559][SS] Create StreamingDataWriterFactory for epoch ID. #20752
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #88013 has finished for PR 20752 at commit
|
|
Test build #88014 has finished for PR 20752 at commit
|
|
Test build #88015 has finished for PR 20752 at commit
|
| DataWriter<T> createDataWriter(int partitionId, int attemptNumber, long epochId); | ||
|
|
||
| @Override default DataWriter<T> createDataWriter(int partitionId, int attemptNumber) { | ||
| throw new IllegalStateException("Streaming data writer factory cannot create data writers without epoch."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why extend DataWriterFactory if this method is going to throw an exception? Why not make them independent interfaces?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there's no common interface, DataSourceRDD would need to take a java.util.List[Any] instead of java.util.List[DataWriterFactory[T]]. This kind of pattern is present in a lot of DataSourceV2 interfaces, and I think it's endemic to the general design.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose we could have it take a (partition, attempt number, epoch) => DataWriter lambda instead of Any if we really don't want to extend DataWriterFactory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you point me to the code where this would need to change? I don't see it here: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, wrong side of the query. I meant DataWritingSparkTask.run().
| * increasing numeric ID. This writer handles commits and aborts for each successive epoch. | ||
| * | ||
| * Note that StreamWriter implementations should provide instances of | ||
| * {@link StreamingDataWriterFactory}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about adding createStreamWriterFactory that returns the streaming interface? That would make it easier for implementations and prevent throwing cast exceptions because a StreamingDataWriterFactory is expected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That wouldn't be compatible with SupportsWriteInternalRow. We could add a StreamingSupportsWriteInternalRow, but that seems much more confusing both for Spark developers and for data source implementers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about removing the SupportsWriteInternalRow and always using InternalRow? For the read side, I think using Row and UnsafeRow is a problem: https://issues.apache.org/jira/browse/SPARK-23325
I don't see the value of using Row instead of InternalRow for readers, so maybe we should just simplify on both the read and write paths.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm broadly supportive. I'll detail my thoughts in the jira.
| new StreamingInternalRowDataWriterFactory(w.createWriterFactory(), query.schema) | ||
| case w: StreamWriter => | ||
| new StreamingInternalRowDataWriterFactory( | ||
| w.createWriterFactory().asInstanceOf[StreamingDataWriterFactory[Row]], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will cause a cast exception, right? It think it is better to use a separate create method.
|
I'm not very familiar with the streaming side, but here are my 2 cents: I agree with @rdblue that it's unnecessary to introduce the epoch id to data sources that don't care about streaming. However, I think it's natural to say that batch data source is a special case of streaming data source and only needs to deal with one epoch. So it's a tradeoff, do we wanna make it easier to implement a batch data source, or do we wanna make it easier to implement a data source supports both batch and streaming? To be clear, I'm not talking about code complexity, as the extra |
|
Thanks for the clear summary, @cloud-fan. I think we want to make it easy to support batch, and then easy to reuse those internals to support streaming by adding new mix-in interfaces. Streaming is more complicated for implementers, and I'd like to help people conceptually ramp up instead of requiring a lot of understanding to get the simple cases working. I think we may also want to put a design for the streaming side on the dev list. If the batch side warranted a design discussion, then I think the streaming side does as well. Changing the batch side for streaming changes as they become necessary doesn't seem like a good way to arrive at a solid design to me. |
|
I agree that we should put a design for the streaming side on the dev list, and I intend to do so. The streaming interfaces will remain evolving until a design discussion about them has happened. Right now, we're still at the point where we aren't quite sure what a streaming API needs to look like. We're starting from basically step zero; the V1 streaming API just throws a DataFrame at the sink and tells it to catch. So we need to iterate towards something that works at all before a meaningful design discussion is possible. |
Thanks for the context. This aligns with the impression I've gotten and it makes sense. My push for separation between the batch and streaming sides comes from wanting to keep that evolution from making too many changes to the batch side that's better understood. I also think that streaming is different enough that we might be heading in the wrong direction by trying to combine the interfaces too early on. |
|
Sounds fair to me. I'll continue iterating on the read side, and send out a design proposal for the write side incorporating this discussion in the next few days. |
|
Test build #88019 has finished for PR 20752 at commit
|
|
Doc sent to the dev list: https://docs.google.com/document/d/1PJYfb68s2AG7joRWbhrgpEWhrsPqbhyRwUVl9V1wPOE |
|
There've been no comments on the doc. Should we move forward with this PR? |
|
Can one of the admins verify this patch? |
What changes were proposed in this pull request?
Create StreamingDataWriterFactory for epoch ID.
How was this patch tested?
existing unit tests