@@ -20,38 +20,51 @@ package org.apache.spark.sql.streaming.sources
2020import java .util .Optional
2121
2222import org .apache .spark .sql .{AnalysisException , Row }
23+ import org .apache .spark .sql .execution .datasources .DataSource
2324import org .apache .spark .sql .execution .streaming .{LongOffset , RateStreamOffset }
25+ import org .apache .spark .sql .execution .streaming .continuous .ContinuousTrigger
2426import org .apache .spark .sql .sources .DataSourceRegister
2527import org .apache .spark .sql .sources .v2 .{DataSourceV2 , DataSourceV2Options }
2628import org .apache .spark .sql .sources .v2 .reader .ReadTask
2729import org .apache .spark .sql .sources .v2 .streaming .{ContinuousReadSupport , ContinuousWriteSupport , MicroBatchReadSupport , MicroBatchWriteSupport }
2830import org .apache .spark .sql .sources .v2 .streaming .reader .{ContinuousReader , MicroBatchReader , Offset , PartitionOffset }
2931import org .apache .spark .sql .sources .v2 .streaming .writer .ContinuousWriter
3032import org .apache .spark .sql .sources .v2 .writer .DataSourceV2Writer
31- import org .apache .spark .sql .streaming .{OutputMode , StreamTest , Trigger }
33+ import org .apache .spark .sql .streaming .{OutputMode , StreamingQueryException , StreamTest , Trigger }
3234import org .apache .spark .sql .types .StructType
3335import org .apache .spark .util .Utils
3436
35- object FakeReader extends MicroBatchReader with ContinuousReader {
37+ case class FakeReader () extends MicroBatchReader with ContinuousReader {
3638 def setOffsetRange (start : Optional [Offset ], end : Optional [Offset ]): Unit = {}
3739 def getStartOffset : Offset = RateStreamOffset (Map ())
3840 def getEndOffset : Offset = RateStreamOffset (Map ())
3941 def deserializeOffset (json : String ): Offset = RateStreamOffset (Map ())
4042 def commit (end : Offset ): Unit = {}
4143 def readSchema (): StructType = StructType (Seq ())
42- def createReadTasks (): java.util.ArrayList [ReadTask [Row ]] = new java.util.ArrayList ()
4344 def stop (): Unit = {}
4445 def mergeOffsets (offsets : Array [PartitionOffset ]): Offset = RateStreamOffset (Map ())
4546 def setOffset (start : Optional [Offset ]): Unit = {}
47+
48+ def createReadTasks (): java.util.ArrayList [ReadTask [Row ]] = {
49+ throw new IllegalStateException (" fake source - cannot actually read" )
50+ }
4651}
4752
48- class FakeStreamingMicroBatchOnly extends DataSourceRegister
49- with DataSourceV2 with MicroBatchReadSupport with MicroBatchWriteSupport {
53+ trait FakeMicroBatchReadSupport extends MicroBatchReadSupport {
5054 override def createMicroBatchReader (
5155 schema : Optional [StructType ],
5256 checkpointLocation : String ,
53- options : DataSourceV2Options ): MicroBatchReader = FakeReader
57+ options : DataSourceV2Options ): MicroBatchReader = FakeReader ()
58+ }
59+
60+ trait FakeContinuousReadSupport extends ContinuousReadSupport {
61+ override def createContinuousReader (
62+ schema : Optional [StructType ],
63+ checkpointLocation : String ,
64+ options : DataSourceV2Options ): ContinuousReader = FakeReader ()
65+ }
5466
67+ trait FakeMicroBatchWriteSupport extends MicroBatchWriteSupport {
5568 def createMicroBatchWriter (
5669 queryId : String ,
5770 epochId : Long ,
@@ -60,140 +73,176 @@ class FakeStreamingMicroBatchOnly extends DataSourceRegister
6073 options : DataSourceV2Options ): Optional [DataSourceV2Writer ] = {
6174 throw new IllegalStateException (" fake sink - cannot actually write" )
6275 }
63-
64- override def shortName (): String = " fake-microbatch-only"
6576}
6677
67- class FakeStreamingContinuousOnly extends DataSourceRegister
68- with DataSourceV2 with ContinuousReadSupport with ContinuousWriteSupport {
69- override def createContinuousReader (
70- schema : Optional [StructType ],
71- checkpointLocation : String ,
72- options : DataSourceV2Options ): ContinuousReader = FakeReader
73-
78+ trait FakeContinuousWriteSupport extends ContinuousWriteSupport {
7479 def createContinuousWriter (
7580 queryId : String ,
7681 schema : StructType ,
7782 mode : OutputMode ,
7883 options : DataSourceV2Options ): Optional [ContinuousWriter ] = {
7984 throw new IllegalStateException (" fake sink - cannot actually write" )
8085 }
86+ }
8187
82- override def shortName (): String = " fake-continuous-only"
88+ class FakeReadMicroBatchOnly extends DataSourceRegister with FakeMicroBatchReadSupport {
89+ override def shortName (): String = " fake-read-microbatch-only"
8390}
8491
85- class FakeStreamingBothModes extends DataSourceRegister
86- with DataSourceV2 with MicroBatchReadSupport with ContinuousReadSupport
87- with MicroBatchWriteSupport with ContinuousWriteSupport {
88- override def createMicroBatchReader (
89- schema : Optional [StructType ],
90- checkpointLocation : String ,
91- options : DataSourceV2Options ): MicroBatchReader = FakeReader
92+ class FakeReadContinuousOnly extends DataSourceRegister with FakeContinuousReadSupport {
93+ override def shortName (): String = " fake-read-continuous-only"
94+ }
9295
93- def createMicroBatchWriter (
94- queryId : String ,
95- epochId : Long ,
96- schema : StructType ,
97- mode : OutputMode ,
98- options : DataSourceV2Options ): Optional [DataSourceV2Writer ] = {
99- throw new IllegalStateException (" fake sink - cannot actually write" )
100- }
96+ class FakeReadBothModes extends DataSourceRegister
97+ with FakeMicroBatchReadSupport with FakeContinuousReadSupport {
98+ override def shortName (): String = " fake-read-microbatch-continuous"
99+ }
101100
102- override def createContinuousReader (
103- schema : Optional [StructType ],
104- checkpointLocation : String ,
105- options : DataSourceV2Options ): ContinuousReader = FakeReader
101+ class FakeReadNeitherMode extends DataSourceRegister {
102+ override def shortName (): String = " fake-read-neither-mode"
103+ }
106104
107- def createContinuousWriter (
108- queryId : String ,
109- schema : StructType ,
110- mode : OutputMode ,
111- options : DataSourceV2Options ): Optional [ContinuousWriter ] = {
112- throw new IllegalStateException (" fake sink - cannot actually write" )
113- }
105+ class FakeWriteMicroBatchOnly extends DataSourceRegister with FakeMicroBatchWriteSupport {
106+ override def shortName (): String = " fake-write-microbatch-only"
107+ }
114108
115- override def shortName (): String = " fake-both-modes"
109+ class FakeWriteContinuousOnly extends DataSourceRegister with FakeContinuousWriteSupport {
110+ override def shortName (): String = " fake-write-continuous-only"
116111}
117112
118- class FakeStreamingNeitherMode extends DataSourceRegister with DataSourceV2 {
119- override def shortName (): String = " fake-neither-mode"
113+ class FakeWriteBothModes extends DataSourceRegister
114+ with FakeMicroBatchWriteSupport with FakeContinuousWriteSupport {
115+ override def shortName (): String = " fake-write-microbatch-continuous"
120116}
121117
122- class StreamingDataSourceV2Suite extends StreamTest {
118+ class FakeWriteNeitherMode extends DataSourceRegister {
119+ override def shortName (): String = " fake-write-neither-mode"
120+ }
123121
124- private def df = spark.readStream.format( " rate " ).load()
122+ class StreamingDataSourceV2Suite extends StreamTest {
125123
126124 override def beforeAll (): Unit = {
127125 super .beforeAll()
128126 val fakeCheckpoint = Utils .createTempDir()
129127 spark.conf.set(" spark.sql.streaming.checkpointLocation" , fakeCheckpoint.getCanonicalPath)
130128 }
131129
132- testQuietly( " create microbatch with only microbatch support " ) {
133- val query = df.writeStream.format( " fake-microbatch-only" ).start()
134- query.stop()
135- }
136-
137- testQuietly( " create microbatch with both support " ) {
138- val query = df.writeStream.format( " fake-both-modes " ).start()
139- query.stop()
140- }
141-
142- testQuietly( " create continuous with only continuous support " ) {
143- val query = df.writeStream
144- .format( " fake-continuous-only " )
145- .trigger( Trigger .Continuous (100 ))
146- .start()
147- query.stop()
148- }
149-
150- testQuietly( " create continuous with both support " ) {
151- val query = df .writeStream
152- .format(" fake-both-modes " )
153- .trigger(Trigger . Continuous ( 100 ) )
130+ val readFormats = Seq (
131+ " fake-read- microbatch-only" ,
132+ " fake-read-continuous-only " ,
133+ " fake-read-microbatch-continuous " ,
134+ " fake-read-neither-mode " )
135+ val writeFormats = Seq (
136+ " fake-write-microbatch-only " ,
137+ " fake-write-continuous-only " ,
138+ " fake-write-microbatch-continuous " ,
139+ " fake-write-neither-mode " )
140+ val triggers = Seq (
141+ Trigger . Once (),
142+ Trigger . ProcessingTime ( 1000 ),
143+ Trigger .Continuous (1000 ))
144+
145+ private def testPositiveCase ( readFormat : String , writeFormat : String , trigger : Trigger ) = {
146+ val query = spark.readStream
147+ .format(readFormat)
148+ .load()
149+ .writeStream
150+ .format(writeFormat )
151+ .trigger(trigger )
154152 .start()
155153 query.stop()
156154 }
157155
158- test(" microbatch with only continuous support" ) {
156+ private def testUnsupportedOperationCase (
157+ readFormat : String ,
158+ writeFormat : String ,
159+ trigger : Trigger ,
160+ errorMsg : String ) = {
159161 val ex = intercept[UnsupportedOperationException ] {
160- df.writeStream.format( " fake-continuous-only " ).start( )
162+ testPositiveCase(readFormat, writeFormat, trigger )
161163 }
162-
163- assert(ex.getMessage.contains(
164- " Data source fake-continuous-only does not support streamed writing" ))
164+ assert(ex.getMessage.contains(errorMsg))
165165 }
166166
167- test(" microbatch with no support" ) {
168- val ex = intercept[UnsupportedOperationException ] {
169- df.writeStream.format(" fake-neither-mode" ).start()
167+ private def testLogicalPlanCase (
168+ readFormat : String ,
169+ writeFormat : String ,
170+ trigger : Trigger ,
171+ errorMsg : String ) = {
172+ val ex = intercept[StreamingQueryException ] {
173+ spark.readStream
174+ .format(readFormat)
175+ .load()
176+ .writeStream
177+ .format(writeFormat)
178+ .trigger(trigger)
179+ .start()
180+ .processAllAvailable()
170181 }
171-
172- assert(ex.getMessage.contains(
173- " Data source fake-neither-mode does not support streamed writing" ))
182+ assert(ex.cause != null )
183+ assert(ex.cause.getMessage.contains(errorMsg))
174184 }
175185
176- test( " continuous with only microbatch support " ) {
177- val ex = intercept[ AnalysisException ] {
178- df.writeStream
179- .format( " fake-microbatch-only " )
180- .trigger( Trigger . Continuous ( 100 ))
181- .start( )
186+ // Get a list of (read, write, trigger) tuples for test cases.
187+ val cases = readFormats.flatMap { read =>
188+ writeFormats.flatMap { write =>
189+ triggers.map(t => (write, t) )
190+ }.map {
191+ case (write, t) => (read, write, t )
182192 }
183-
184- assert(ex.getMessage.contains(
185- " Data source fake-microbatch-only does not support continuous writing" ))
186193 }
187194
188- test(" continuous with no support" ) {
189- val ex = intercept[AnalysisException ] {
190- df.writeStream
191- .format(" fake-neither-mode" )
192- .trigger(Trigger .Continuous (100 ))
193- .start()
195+ for ((read, write, trigger) <- cases) {
196+ testQuietly(s " stream with read format $read, write format $write, trigger $trigger" ) {
197+ val readSource = DataSource .lookupDataSource(read, spark.sqlContext.conf).newInstance()
198+ val writeSource = DataSource .lookupDataSource(write, spark.sqlContext.conf).newInstance()
199+ (readSource, writeSource, trigger) match {
200+ // Valid microbatch queries.
201+ case (_ : MicroBatchReadSupport , _ : MicroBatchWriteSupport , t)
202+ if ! t.isInstanceOf [ContinuousTrigger ] =>
203+ testPositiveCase(read, write, trigger)
204+
205+ // Valid continuous queries.
206+ case (_ : ContinuousReadSupport , _ : ContinuousWriteSupport , _ : ContinuousTrigger ) =>
207+ testPositiveCase(read, write, trigger)
208+
209+ // Invalid - can't read at all
210+ case (r, _, _)
211+ if ! r.isInstanceOf [MicroBatchReadSupport ]
212+ && ! r.isInstanceOf [ContinuousReadSupport ] =>
213+ testUnsupportedOperationCase(read, write, trigger,
214+ s " Data source $read does not support streamed reading " )
215+
216+ // Invalid - trigger is continuous but writer is not
217+ case (_, w, _ : ContinuousTrigger ) if ! w.isInstanceOf [ContinuousWriteSupport ] =>
218+ testUnsupportedOperationCase(read, write, trigger,
219+ s " Data source $write does not support continuous writing " )
220+
221+ // Invalid - can't write at all
222+ case (_, w, _)
223+ if ! w.isInstanceOf [MicroBatchWriteSupport ]
224+ && ! w.isInstanceOf [ContinuousWriteSupport ] =>
225+ testUnsupportedOperationCase(read, write, trigger,
226+ s " Data source $write does not support streamed writing " )
227+
228+ // Invalid - trigger and writer are continuous but reader is not
229+ case (r, _ : ContinuousWriteSupport , _ : ContinuousTrigger )
230+ if ! r.isInstanceOf [ContinuousReadSupport ] =>
231+ testLogicalPlanCase(read, write, trigger,
232+ s " Data source $read does not support continuous processing " )
233+
234+ // Invalid - trigger is microbatch but writer is not
235+ case (_, w, t)
236+ if ! w.isInstanceOf [MicroBatchWriteSupport ] && ! t.isInstanceOf [ContinuousTrigger ] =>
237+ testUnsupportedOperationCase(read, write, trigger,
238+ s " Data source $write does not support streamed writing " )
239+
240+ // Invalid - trigger and writer are microbatch but reader is not
241+ case (r, _, t)
242+ if ! r.isInstanceOf [MicroBatchReadSupport ] && ! t.isInstanceOf [ContinuousTrigger ] =>
243+ testLogicalPlanCase(read, write, trigger,
244+ s " Data source $read does not support microbatch processing " )
245+ }
194246 }
195-
196- assert(ex.getMessage.contains(
197- " Data source fake-neither-mode does not support continuous writing" ))
198247 }
199248}
0 commit comments