Skip to content

Conversation

@brkyvz
Copy link
Contributor

@brkyvz brkyvz commented Jan 29, 2020

What changes were proposed in this pull request?

We propose to add a new interface SupportsAdmissionControl and ReadLimit. A ReadLimit defines how much data should be read in the next micro-batch. SupportsAdmissionControl specifies that a source can rate limit its ingest into the system. The source can tell the system what the user specified as a read limit, and the system can enforce this limit within each micro-batch or impose its own limit if the Trigger is Trigger.Once() for example.

We then use this interface in FileStreamSource, KafkaSource, and KafkaMicroBatchStream.

Why are the changes needed?

Sources currently have no information around execution semantics such as whether the stream is being executed in Trigger.Once() mode. This interface will pass this information into the sources as part of planning. With a trigger like Trigger.Once(), the semantics are to process all the data available to the datasource in a single micro-batch. However, this semantic can be broken when data source options such as maxOffsetsPerTrigger (in the Kafka source) rate limit the amount of data read for that micro-batch without this interface.

Does this PR introduce any user-facing change?

DataSource developers can extend this interface for their streaming sources to add admission control into their system and correctly support Trigger.Once().

How was this patch tested?

Existing tests, as this API is mostly internal

@SparkQA
Copy link

SparkQA commented Jan 29, 2020

Test build #117500 has finished for PR 27380 at commit 980719a.

  • This patch fails Scala style tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@brkyvz brkyvz requested a review from zsxwing January 29, 2020 02:45
@brkyvz
Copy link
Contributor Author

brkyvz commented Jan 29, 2020

cc @tdas and @zsxwing

@SparkQA
Copy link

SparkQA commented Jan 29, 2020

Test build #117503 has finished for PR 27380 at commit 4ed77e6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}
}.toMap
case (s, _) =>
// for some reason, the compiler is unhappy
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean Match is not exhaustive?

@SparkQA
Copy link

SparkQA commented Jan 29, 2020

Test build #117502 has finished for PR 27380 at commit 9563c5b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 29, 2020

Test build #117505 has finished for PR 27380 at commit 06256bb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 29, 2020

Test build #117508 has finished for PR 27380 at commit a9c6897.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except one nit.

}
}

test("maxFilesPerTrigger: ignored when using Trigger.Once") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: missing the jira id.

@HeartSaVioR
Copy link
Contributor

I'm seeing the indentation of Java files are done by 4 spaces, whereas we seem to use 2 spaces also for Java files as well (checked some sample of java files in sql/core).

https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/connector/read/V1Scan.java
https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/connector/write/V1WriteBuilder.java

While it should be even better if we can catch it in checkstyle (linter), could we make a change here for now?

@brkyvz
Copy link
Contributor Author

brkyvz commented Jan 30, 2020

Thanks @HeartSaVioR . Addressed

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found a couple of broken indentations.

Btw I haven't looked at the details for the changes, will catch up the change soon but please go ahead once the indentation is fixed.


def latestOffset(start: Offset): Offset
static ReadLimit allAvailable() {
return ReadAllAvailable.SINGLETON;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: still have 4 spaces

* the data source.
*/
default ReadLimit getDefaultReadLimit() {
return ReadLimit.allAvailable();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: still have 4 spaces

}

override def latestOffset(): Offset = {
throw new IllegalStateException(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is UnsupportedOperationException better in this context?


/** Returns the maximum available offset for this source. */
override def getOffset: Option[Offset] = {
throw new IllegalStateException(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnsupportedOperationException?

*/
@Evolving
public final class ReadAllAvailable implements ReadLimit {
static final ReadAllAvailable SINGLETON = new ReadAllAvailable();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't mind, shall we use INSTANCE instead of SINGLETON?

$ git grep " SINGLETON =" | wc -l
       0
$ git grep " INSTANCE =" | wc -l
       5

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private static final First SINGLETON = new First();

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that we can change that,too. I'll make a follow-up for that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the master, that is the only one, right?

*/
@Evolving
public class ReadMaxFiles implements ReadLimit {
private int files;
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jan 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

files -> maxFiles? Or, can we have a better name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's consistent this way with rows and maxRows

val batchFiles = limit match {
case files: ReadMaxFiles =>
newFiles.take(files.maxFiles())
case all: ReadAllAvailable =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit.

-      case all: ReadAllAvailable =>
+      case _: ReadAllAvailable =>


override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.logOffset == -1)
override def getOffset: Option[Offset] = {
throw new IllegalStateException(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnsupportedOperationException?

case source: SupportsAdmissionControl =>
val limit = source.getDefaultReadLimit
if (trigger == OneTimeTrigger && limit != ReadLimit.allAvailable()) {
logWarning(s"The read limit $limit for $source is ignored when Trigger.Once() is used.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we can do this at Analyzer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Triggers are a property of the system, not the query, so I don't think it fits into analysis

@SparkQA
Copy link

SparkQA commented Jan 31, 2020

Test build #117592 has finished for PR 27380 at commit 0548ba9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 31, 2020

Test build #117597 has finished for PR 27380 at commit 895fb87.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@brkyvz
Copy link
Contributor Author

brkyvz commented Jan 31, 2020

Thanks all for the review. Merging to master!

@brkyvz
Copy link
Contributor Author

brkyvz commented Jan 31, 2020

wait, seems like the latest tests haven't finished running. Holding off for now

@dongjoon-hyun
Copy link
Member

Thank you for updating, @brkyvz .

@SparkQA
Copy link

SparkQA commented Jan 31, 2020

Test build #117606 has finished for PR 27380 at commit bdbfa11.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 31, 2020

Test build #117607 has finished for PR 27380 at commit de5f486.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in 1cd19ad Jan 31, 2020
@aminh73
Copy link

aminh73 commented Nov 25, 2020

We need to use maxOffsetsPerTrigger in the Kafka source with Trigger.Once() but it seems reads allAvailable in spark 3. Is there a way for achieving rate limit in this situation?

@GrigorievNick
Copy link

Hi,
I know that these changes already in spark 3.
But I have a question.
How can I configure backpressure to my job when I want to use TriggerOnce?
In spark 2.4 I have a use case, to backfill some data and then start the stream.
So I use trigger once, but my backfill scenario can be very very big and sometimes create too big a load on my disks because of shuffles and to driver memory because FileIndex cached there.
SO I use max maxOffsetsPerTrigger and maxFilesPerTrigger to control how much data my spark can process. that's how I configure backpressure.

And now you remove this ability, so assume you can suggest a better way to go?

@lior-k
Copy link

lior-k commented Jul 20, 2021

Joining the above question. how can we achieve rate limit in Trigger.Once?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants