-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-23099][SS] Migrate foreach sink to DataSourceV2 #20951
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #88764 has finished for PR 20951 at commit
|
| * @tparam T The expected type of the sink. | ||
| */ | ||
| class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable { | ||
| case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename the file accordingly. and Add docs. Clarify why this is not a DataSource but still extends StreamWriteSupport
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, why not make it extend DataSourceV2 for consistency sake? Then it is easier to find all data sources in code by looking at who extends DataSourceV2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a DataSourceV2 - that interface extends it
| val input = MemoryStream[Int] | ||
| val query = input.toDS().repartition(1).writeStream | ||
| .option("checkpointLocation", checkpointDir.getCanonicalPath) | ||
| .foreach(new TestForeachWriter() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe rename this to ForeachWriterSuite?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And move this to streaming.sources package similar ConsoleWriterSuite
| } | ||
| assert(e.getCause.isInstanceOf[SparkException]) | ||
| assert(e.getCause.getCause.getMessage === "error") | ||
| assert(e.getCause.getCause.getCause.getMessage === "ForeachSinkSuite error") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why 3 levels? Can you paste the levels here in the PR comments?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[info] org.apache.spark.sql.streaming.StreamingQueryException: Query [id = c80c8860-d4f5-47c6-9a2b-33b5172e1735, runId = 81acd408-9028-41ee-9349-866ae2d67615] terminated with exception: Writing job aborted.
[info] at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
[info] Cause: org.apache.spark.SparkException: Writing job aborted.
[info] at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2.scala:117)
[info] Cause: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): java.lang.RuntimeException: ForeachSinkSuite error
[info] at org.apache.spark.sql.execution.streaming.sources.ForeachWriterSuite$$anonfun$5$$anonfun$apply$mcV$sp$4$$anon$1.process(ForeachWriterSuite.scala:135)
[info] Cause: java.lang.RuntimeException: ForeachSinkSuite error
[info] at org.apache.spark.sql.execution.streaming.sources.ForeachWriterSuite$$anonfun$5$$anonfun$apply$mcV$sp$4$$anon$1.process(ForeachWriterSuite.scala:135)
|
LGTM. Just one comment. |
|
Test build #88833 has finished for PR 20951 at commit
|
|
Test build #88832 has finished for PR 20951 at commit
|
## What changes were proposed in this pull request? Migrate foreach sink to DataSourceV2. Since the previous attempt at this PR apache#20552, we've changed and strictly defined the lifecycle of writer components. This means we no longer need the complicated lifecycle shim from that PR; it just naturally works. ## How was this patch tested? existing tests Author: Jose Torres <[email protected]> Closes apache#20951 from jose-torres/foreach.
## What changes were proposed in this pull request? Migrate foreach sink to DataSourceV2. Since the previous attempt at this PR apache#20552, we've changed and strictly defined the lifecycle of writer components. This means we no longer need the complicated lifecycle shim from that PR; it just naturally works. ## How was this patch tested? existing tests Author: Jose Torres <[email protected]> Closes apache#20951 from jose-torres/foreach.
What changes were proposed in this pull request?
Migrate foreach sink to DataSourceV2.
Since the previous attempt at this PR #20552, we've changed and strictly defined the lifecycle of writer components. This means we no longer need the complicated lifecycle shim from that PR; it just naturally works.
How was this patch tested?
existing tests