Skip to content

Conversation

@xuanyuanking
Copy link
Member

What changes were proposed in this pull request?

This problem reported by @yanlin-Lynn @ivoson and @LiangchangZ. Thanks!

When we union 2 streams from kafka or other sources, while one of them have no continues data coming and in the same time task restart, this will cause an IllegalStateException. This mainly cause because the code in MicroBatchExecution , while one stream has no continues data, its comittedOffset same with availableOffset during populateStartOffsets, and currentPartitionOffsets not properly handled in KafkaSource. Also, maybe we should also consider this scenario in other Source.

How was this patch tested?

Add a UT in KafkaSourceSuite.scala

@SparkQA
Copy link

SparkQA commented Jan 4, 2018

Test build #85675 has finished for PR 20150 at commit aa3d7b7.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 4, 2018

Test build #85678 has finished for PR 20150 at commit fa64187.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xuanyuanking
Copy link
Member Author

cc @zsxwing

@xuanyuanking
Copy link
Member Author

cc @gatorsmile @cloud-fan

@zsxwing
Copy link
Member

zsxwing commented Jan 10, 2018

@xuanyuanking could you post the full stack trace about this issue?

@xuanyuanking
Copy link
Member Author

Hi Shixiong, thanks a lot for your reply.
The full stack below can reproduce by running the added UT based on original code base.

Assert on query failed: : Query [id = 3421db21-652e-47af-9d54-2b74a222abed, runId = cd8d7c94-1286-44a5-b000-a8d870aef6fa] terminated with exception: Partition topic-0-0's offset was changed from 10 to 5, some data may have been missed. 
Some data may have been lost because they are not available in Kafka any more; either the
 data was aged out by Kafka or the topic may have been deleted before all the data in the
 topic was processed. If you don't want your streaming query to fail on such cases, set the
 source option "failOnDataLoss" to "false".
    
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
	org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)

	Caused by: 	Partition topic-0-0's offset was changed from 10 to 5, some data may have been missed. 
Some data may have been lost because they are not available in Kafka any more; either the
 data was aged out by Kafka or the topic may have been deleted before all the data in the
 topic was processed. If you don't want your streaming query to fail on such cases, set the
 source option "failOnDataLoss" to "false".
    
	org.apache.spark.sql.kafka010.KafkaSource.org$apache$spark$sql$kafka010$KafkaSource$$reportDataLoss(KafkaSource.scala:332)
		org.apache.spark.sql.kafka010.KafkaSource$$anonfun$8.apply(KafkaSource.scala:291)
		org.apache.spark.sql.kafka010.KafkaSource$$anonfun$8.apply(KafkaSource.scala:289)
		scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
		scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
		scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
		scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
		scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
		scala.collection.AbstractTraversable.filter(Traversable.scala:104)
		org.apache.spark.sql.kafka010.KafkaSource.getBatch(KafkaSource.scala:289)

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing this. Looks good to me. Just some nits.

batches.slice(sliceStart, sliceEnd)
}

if (newBlocks.isEmpty) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could you add an assert(sliceStart <= sliceEnd, s"sliceStart: $sliceStart sliceEnd: $sliceEnd") above batches.slice(sliceStart, sliceEnd) to make sure getBatch will not be called with wrong offsets.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DONE

)
}

test("union bug in failover") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: test("SPARK-22956: currentPartitionOffsets should be set when no new data comes in")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DONE

StartStream(ProcessingTime(100), clock),
waitUntilBatchProcessed,
// 5 from smaller topic, 5 from bigger one
CheckAnswer(0, 1, 2, 3, 4, 100, 101, 102, 103, 104),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can clean these codes a bit using the following snippet:

    testStream(kafka)(
      StartStream(ProcessingTime(100), clock),
      waitUntilBatchProcessed,
      // 5 from smaller topic, 5 from bigger one
      CheckLastBatch((0 to 4) ++ (100 to 104): _*),
      AdvanceManualClock(100),
      waitUntilBatchProcessed,
      // 5 from smaller topic, 5 from bigger one
      CheckLastBatch((5 to 9) ++ (105 to 109): _*),
      AdvanceManualClock(100),
      waitUntilBatchProcessed,
      // smaller topic empty, 5 from bigger one
      CheckLastBatch(110 to 114: _*),
      StopStream,
      StartStream(ProcessingTime(100), clock),
      waitUntilBatchProcessed,
      // smallest now empty, 5 from bigger one
      CheckLastBatch(115 to 119: _*),
      AdvanceManualClock(100),
      waitUntilBatchProcessed,
      // smallest now empty, 5 from bigger one
      CheckLastBatch(120 to 124: _*)
    )

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, this made the code more cleaner.

@SparkQA
Copy link

SparkQA commented Jan 15, 2018

Test build #86128 has finished for PR 20150 at commit bf8af29.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Jan 16, 2018

Thanks! Merging to master and 2.3.

@xuanyuanking
Copy link
Member Author

Thanks for your review! Shixiong

asfgit pushed a commit that referenced this pull request Jan 16, 2018
## What changes were proposed in this pull request?

This problem reported by yanlin-Lynn ivoson and LiangchangZ. Thanks!

When we union 2 streams from kafka or other sources, while one of them have no continues data coming and in the same time task restart, this will cause an `IllegalStateException`. This mainly cause because the code in [MicroBatchExecution](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L190) , while one stream has no continues data, its comittedOffset same with availableOffset during `populateStartOffsets`, and `currentPartitionOffsets` not properly handled in KafkaSource. Also, maybe we should also consider this scenario in other Source.

## How was this patch tested?

Add a UT in KafkaSourceSuite.scala

Author: Yuanjian Li <[email protected]>

Closes #20150 from xuanyuanking/SPARK-22956.

(cherry picked from commit 07ae39d)
Signed-off-by: Shixiong Zhu <[email protected]>
@asfgit asfgit closed this in 07ae39d Jan 16, 2018
@xuanyuanking xuanyuanking deleted the SPARK-22956 branch January 16, 2018 08:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants