Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KPL back off is kind of bush league #48

Open
rdifalco opened this issue Apr 4, 2016 · 4 comments
Open

KPL back off is kind of bush league #48

rdifalco opened this issue Apr 4, 2016 · 4 comments

Comments

@rdifalco
Copy link

rdifalco commented Apr 4, 2016

The KPL authors suggest writing something like this when records are being produced at a higher rate than the KPL can submit them to kinesis.

while (producer.getOutstandingRecordsCount() > MAX_BUFFERED) {
    Thread.sleep(1);
}

Using a busy loop to keep the KPL queue from getting too big seems kind of a novice approach. Since the Java code already has a callback and a ConcurrentHashMap it should allow us to specify a max and then have a "blocking" version of the addUserRecord call. I am doing something like this to work around the issue:

  private static final Semaphore backPressure = new Semaphore(MAX_BUFFERED_SIZE, true);

  private static final FutureCallback<UserRecordResult> CALLBACK = new FutureCallback<UserRecordResult>() {
    @Override
    public void onSuccess(UserRecordResult result) {
      backPressure.release();
    }

    @Override
    public void onFailure(Throwable t) {
      backPressure.release();
      . . .
    }
  };

  public void submit(String stream, String partitionKey, ByteBuffer data) throws InterruptedException {
    backPressure.acquire();
    try {
      ListenableFuture<UserRecordResult> future = producer.addUserRecord(stream, partitionKey, data);
      Futures.addCallback(future, CALLBACK);
    } catch (RuntimeException e) {
      backPressure.release();
      throw e;
    }
  }

It seems kind of lame I have to do this with a semphore when the KPL java code could do this more efficiently and built-in.

@samcday
Copy link

samcday commented Jun 7, 2016

Worse still, the internal retry logic in KPL is quite aggressive (see #12 + #14 for example). So if you don't follow the user-level throttling, things get linearly worse in the KPL daemon.

Forcing the backoff logic on consumers means everyone is forced to solve the problem, and most users are going to do it badly.

@pfifer
Copy link
Contributor

pfifer commented Feb 15, 2017

Thanks for reporting this, we're looking at improving the experience using the KPL. I agree that producer threads shouldn't need to call sleep, but instead should be parked via a wait call. Adding simpler support for max inflight is something we will definitely look at providing.

Can others who are impacted by this please leave a comment or reaction to assist in prioritizing these changes.

@kwinter
Copy link

kwinter commented Mar 12, 2018

We've similarly implemented our own backoff - but in a different way. We instead skip Kinesis altogether, log an error, and put the record into a local file that gets uploaded separately. (which spawned #76)

Would certainly welcome the inclusion of abstractions and choices around backoff, but I'm weary of the above being the default or the only choice. It would introduce blocking into an otherwise nonblocking client, which would likely cause a lot of pain in the Reactive/Scala world. Using something like above is likely to put a bunch of threads/tasks in waiting/blocked states if records are not being sent fast enough, which may be desirable in some scenarios but not in others (based on how critical the data is, and the system architecture at hand). It would also prevent real backpressure strategies from being implemented - like slowing down the rate that messages are coming in.

So would suggest an abstract BackpressureStrategy and a concrete semaphore based waiting strategy that can be optionally chosen

@buremba
Copy link

buremba commented Apr 13, 2020

What's the recommended strategy right now?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants