Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Releasing pooled objects in queues upon cancellation #5839

Closed
RikkiGibson opened this issue Feb 4, 2018 · 6 comments
Closed

2.x: Releasing pooled objects in queues upon cancellation #5839

RikkiGibson opened this issue Feb 4, 2018 · 6 comments

Comments

@RikkiGibson
Copy link

I'm using RxJava 2.1.9.

I'm emitting pooled buffer objects in a Flowable to perform actions like file and socket reads. When the subscription is active until completion, it works great. However, if the subscription is disposed at some arbitrary point while it's running, intermediate operators like observeOn() may drop some buffers which were stored in an internal queue, resulting in a memory leak.

I considered creating an operator to work around this which requests modest numbers of items from upstream, and upon cancellation, waits for all requested items to be delivered before cancelling the upstream. I don’t know if this would be sound in the face of a misbehaving upstream which takes a long time or never emits the requested items. It's also less than ideal to have to remember to put this operator at the end of every workflow involving pooled buffers or risk a leak.

I'm wondering if perhaps it would be reasonable to do something in RxJava like allow the installation of an optional queue drain handler which could release the buffers, or even allow items which should be automatically disposed upon cancellation to conform to some interface.

Please let me know what you think.

@akarnokd
Copy link
Member

akarnokd commented Feb 4, 2018

Your use case is called resource-aware flows which is not supported by the RxJava architecture. The closest thing is the using() operator and the doFinally, but you have to manage the resources entered into the flow manually.

Beyond these, you'll have to implement a custom operator for each stage that could loose items. I have experimented with such special flows here but developing such operators and accounting for all possible cleanup points was extremely tedious.

@davidmoten
Copy link
Collaborator

I don't understand your use case fully but you mention pools in a reactive context so I thought you might be interested that rxjava2-jdbc uses a reactive pool (of JDBC Connection objects). The pool side is abstracted for general pools. I had considered extracting this code to its own artifact (rxjava2-pool) but lacked a secondary use case so didn't (but now might!). Nevertheless, the rxjava2-jdbc artifact is on Maven Central to use if you desire.

The reactive pool is presented as Single<Member<T>>. You subscribe to that Single as many times as you like (concurrently if desired) and when finished with a T you call checkin() on the Member.

To create a pool use the NonBlockingPool static builder methods (specifying factory, healthCheck, maxSize, maxIdleTime, disposal action, etc.).

If it seems useful then I'd be happy to get feedback/review.

@RikkiGibson
Copy link
Author

RikkiGibson commented Feb 4, 2018

I think understanding how you achieved pooling with Rx could be very helpful for what I've been working on. I'll have a look at the code.

I'm also curious how the RxNetty folks have solved this problem of managing the pooled Netty ByteBufs. There has been a significant amount of discussion on their side about it. It's RxJava 1 still, I think, but I wonder if they are actually susceptible to the same queue clearing on cancellation issue I mentioned in my original post.

@jamesgorman2
Copy link

@RikkiGibson RxNetty 0.5.x has a background cleanup thread looking for unreleased ByteByfs over a certain age (I've been out of the core of the codebase for a while so it'd take a while to find the code). If I recall correctly, this is predominantly for unread streams (eg ignoring the content of an HTTP message).

You can memory leak by thread switching before handling and manually releasing ByteBufs if the subscription is cancelled. We found this when HAProxy was droping connections and leaving about 0.5% of ByteBufs unreleased. Didn't propose a systematic fix for this, we just moved the map before the thread switch (see below).

  @Override
  public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
    return response.writeString(
      request.getContent()
        .observeOn(Schedulers.io()) // problematic thread switch
        .map(
          bb -> {
            String s = bb.toString(Charset.defaultCharset());
            bb.release();
            return s;
          }
        )
    );
  }

@RikkiGibson
Copy link
Author

Thank you very much @akarnokd, @davidmoten and @jamesgorman2 for giving context, explaining and showing some of the ways you've attempted to solve this problem. It seems like dealing with this kind of problem just requires working on a case-by-case basis within the limitations of RxJava.

@akarnokd
Copy link
Member

akarnokd commented Feb 8, 2018

Looks like this question has been answered. If you have further input on the issue, don't hesitate to reopen this issue or post a new one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants