Skip to content

[SPARK-24127][SS] Continuous text socket source#21199

Closed
arunmahadevan wants to merge 9 commits intoapache:masterfrom
arunmahadevan:SPARK-24127
Closed

[SPARK-24127][SS] Continuous text socket source#21199
arunmahadevan wants to merge 9 commits intoapache:masterfrom
arunmahadevan:SPARK-24127

Conversation

@arunmahadevan
Copy link
Copy Markdown
Contributor

@arunmahadevan arunmahadevan commented Apr 30, 2018

What changes were proposed in this pull request?

Support for text socket stream in spark structured streaming "continuous" mode. This is roughly based on the idea of ContinuousMemoryStream where the executor queries the data from driver over an RPC endpoint.

This makes it possible to create Structured streaming continuous pipeline to ingest data via "nc" and run examples.

How was this patch tested?

Unit test and ran spark examples in structured streaming continuous mode.

Please review http://spark.apache.org/contributing.html before opening a pull request.

@arunmahadevan
Copy link
Copy Markdown
Contributor Author

ping @jerryshao @tdas @jose-torres @HeartSaVioR for inputs.

@HyukjinKwon
Copy link
Copy Markdown
Member

ok to test

@HyukjinKwon
Copy link
Copy Markdown
Member

add to whitelist

@SparkQA
Copy link
Copy Markdown

SparkQA commented May 8, 2018

Test build #90349 has finished for PR 21199 at commit f010943.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jose-torres
Copy link
Copy Markdown
Contributor

I won't be able to look at this in detail until next week.

In general, I think this is a great source to have available. I wonder if it'd be worthwhile to try and abstract the record forwarding RPCs from here and ContinuousMemoryStream together.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

context.reply(record.map(r => if (includeTimestamp) Row(r) else Row(r._1)))

Just in one line is ok I think.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand right, this commit will never enter in your added test case.

@jerryshao
Copy link
Copy Markdown
Contributor

I was thinking if it is too overkill to receive data in the driver side and publish them to the executors via RPC? This might give user a wrong impression that data should be received in the driver side and published to the executors again.

Just my two cents.

@jose-torres
Copy link
Copy Markdown
Contributor

I think that's unavoidable if we want to have a socket source. The microbatch socket source has the same thing going on. I'd expect most people looking into implementation details of data sources will understand that they ought to read from executors in general.

@arunmahadevan
Copy link
Copy Markdown
Contributor Author

yes, this similar to the micro batch socket source where the driver opens a single socket connection to read data from "nc". I would expect this pattern to be used only for debug and test sources and not so much for the real ones. We can add some code comments to clarify this.

@SparkQA
Copy link
Copy Markdown

SparkQA commented May 14, 2018

Test build #90606 has finished for PR 21199 at commit b962c3d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link
Copy Markdown

SparkQA commented May 14, 2018

Test build #90603 has finished for PR 21199 at commit b3a42f0.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
  • case class ContinuousRecordPartitionOffset(partitionId: Int, offset: Int) extends PartitionOffset
  • case class GetRecord(offset: ContinuousRecordPartitionOffset)
  • class ContinuousRecordEndpoint(buckets: Seq[Seq[Any]], lock: Object)

@arunmahadevan
Copy link
Copy Markdown
Contributor Author

A gentle ping for review @jose-torres , @jerryshao , @xuanyuanking

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: probably better to elaborate on what these are for, Seq[Seq[Any]] and Object aren't very informative types

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added more comments to clarify.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we use testStream for these tests?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we could use, but the addSocketData does not work for continuous source and thought the reader offsets could be validated better this way. (followed the approach in RateStreamProviderSuite)

@SparkQA
Copy link
Copy Markdown

SparkQA commented May 17, 2018

Test build #90744 has finished for PR 21199 at commit 242bcdb.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class TextSocketContinuousInputPartition(
  • class TextSocketContinuousInputPartitionReader(

@SparkQA
Copy link
Copy Markdown

SparkQA commented May 18, 2018

Test build #90755 has finished for PR 21199 at commit 68c5eed.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@arunmahadevan
Copy link
Copy Markdown
Contributor Author

ping @tdas @jose-torres

@HyukjinKwon
Copy link
Copy Markdown
Member

ok to test

@SparkQA
Copy link
Copy Markdown

SparkQA commented Jun 9, 2018

Test build #91615 has finished for PR 21199 at commit 68c5eed.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Copy Markdown
Contributor

@arunmahadevan
Sorry I forgot to review this so far. Could you fix merge conflicts? I'd pull the code to the local and review since the code diff is not small.

@SparkQA
Copy link
Copy Markdown

SparkQA commented Jul 30, 2018

Test build #93797 has finished for PR 21199 at commit 76512d8.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@arunmahadevan
Copy link
Copy Markdown
Contributor Author

@HeartSaVioR , rebased with master.

ping @jose-torres @tdas @zsxwing for review.

@SparkQA
Copy link
Copy Markdown

SparkQA commented Jul 30, 2018

Test build #93801 has finished for PR 21199 at commit a069d01.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader

@HeartSaVioR
Copy link
Copy Markdown
Contributor

@arunmahadevan Thanks for rebasing. I'll take a look.

Copy link
Copy Markdown
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code change looks good overall.

Left comments mostly suggesting about deduplicating code between micro-batch and continuous and kind of defensive programming. Some comments could be shown as individual's preference and I'm definitely sure to follow Spark's preference.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While the values are good to be placed with companion object, it looks like redundant to have them in both micro-batch and continuous, so might be better to have common object to place this.

We may need to find more spots to deduplicate between micro-batch and continuous for socket.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The companion object can be shared. But overall I guess we need to come up better interfaces such that the micro and continuous sources could share more code. I would investigate this out of the scope of this PR.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather make it safer via either one of two approaches:

  1. assert partition offsets has all partition ids, 0 ~ numPartitions - 1
  2. add partition id in list element of TextSocketOffset as RateStreamContinuousReader and RateStreamOffset did

Personally I prefer option 2, but either is fine for me. Not sure which is more preferred for committers/Spark community.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an assertion above assert(offsets.length == numPartitions) (option 1). RateSource also uses similar validation. I am not sure if adding the index adds any value here since socket source does not support recovery. Even in Rate source the partition values stored are 1...numPartitions-1 and this can already be inferred by the index of the offset in the array.

Copy link
Copy Markdown
Contributor

@HeartSaVioR HeartSaVioR Aug 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, agreed. I'm OK if same implication is used in other places.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Micro-batch Socket Reader validates the type of end and calls sys.error with some informational message: we may be better to give meaningful message like this.

Btw, my 2 cents, more specific exception is always better, so I'm +1 to throw IllegalStateException rather than calling sys.error which throws RuntimeException, like below lines.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we could deduplicate the code between continuous / micro-batch, via modifying read thread to receive a handler for new line and let each reader handles the new line accordingly with proper lock. With this change we can use same read thread for both continuous and micro-batch reader.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could probably refactor and use common code but the usages are slightly different and I would like to do this outside the scope of this PR. I would like to identify and pull out some generic APIs that both micro-batch and continuous sources can implement so that such duplication can be avoided in general. With the current approach there are always two separate implementations for each type and and the chance of duplication is more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah you're planning to investigate and touch APIs then it sounds really good. May worth to file a new issue?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: according to style guide, this may need to be written as follow

.map { rec => 
  if (includeTimestamp) {
...

or even

ContinuousRecordPartitionOffset(partitionId, currentOffset))).map { rec => 
  if (includeTimestamp) {
...

https://github.com/databricks/scala-style-guide#anonymous-methods

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like unused import

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe adding more line comments in code block would help understanding the test code easier, like intentionally committing in the middle of range, etc.

@arunmahadevan
Copy link
Copy Markdown
Contributor Author

@HeartSaVioR , Addressed your comments. Let me know if I missed something. Also rebased and had to change more code to use the new interfaces.

I hope if we can speed up the review cycles in general than leaving PRs to hibernation for a while and then the developer will loose the context and other things would have changed in the meanwhile.

@jose-torres
Copy link
Copy Markdown
Contributor

The change looks broadly good (and important) to me. I'll defer to @HeartSaVioR wrt the in-depth review; let me know if there are any specific parts I should to take a look at.

@SparkQA
Copy link
Copy Markdown

SparkQA commented Aug 2, 2018

Test build #93921 has finished for PR 21199 at commit f4a39d9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader with Logging

@HyukjinKwon
Copy link
Copy Markdown
Member

retest this please

Copy link
Copy Markdown
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM given we are planning to tackle deduplication of codebase between micro-batch and continuous later.

@SparkQA
Copy link
Copy Markdown

SparkQA commented Aug 2, 2018

Test build #93939 has finished for PR 21199 at commit f4a39d9.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader with Logging

@arunmahadevan
Copy link
Copy Markdown
Contributor Author

retest this please

@SparkQA
Copy link
Copy Markdown

SparkQA commented Aug 2, 2018

Test build #94011 has finished for PR 21199 at commit f4a39d9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader with Logging

@arunmahadevan
Copy link
Copy Markdown
Contributor Author

retest this please

@SparkQA
Copy link
Copy Markdown

SparkQA commented Aug 3, 2018

Test build #94148 has finished for PR 21199 at commit f4a39d9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader with Logging

@arunmahadevan
Copy link
Copy Markdown
Contributor Author

retest this please

@SparkQA
Copy link
Copy Markdown

SparkQA commented Aug 7, 2018

Test build #94321 has finished for PR 21199 at commit f4a39d9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader with Logging

@arunmahadevan
Copy link
Copy Markdown
Contributor Author

@HyukjinKwon this has been open for a while, would you mind taking this forward?

@HyukjinKwon
Copy link
Copy Markdown
Member

retest this please

@HyukjinKwon
Copy link
Copy Markdown
Member

@jose-torres and @HeartSaVioR, is it good to go?

@SparkQA
Copy link
Copy Markdown

SparkQA commented Aug 8, 2018

Test build #94411 has finished for PR 21199 at commit f4a39d9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader with Logging

@HyukjinKwon
Copy link
Copy Markdown
Member

Merged to master.

@asfgit asfgit closed this in 9abe09b Aug 10, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants