-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
How best to release elements in a ReplaySubject? #1794
Comments
I think for-each is your best option now. Multicast or Refcount doesn't support unsubscription action and ReplaySubject doesn't expose the buffer in any safe manner. |
I see... Would that be worth the effort to try and implement and expose |
Using Subjects directly are almost never the desired solution as they don't compose with backpressure or resource cleanup (unsubscription). Subjects are very imperative in use and "hot" in how data flows through them. For example, using Your use case however sounds like you should use Thus, I suggest something like the following: import java.util.Arrays;
import java.util.Collection;
import rx.Observable;
import rx.schedulers.Schedulers;
import rx.subscriptions.Subscriptions;
public class ObservableCreateWithCleanup {
public static void main(String[] args) {
System.out.println("fetch slow data...");
getData().toBlocking().forEach(System.out::println);
// sleep so the JVM doesn't shut down before we see cleanup
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
}
public static Observable<Data> getData() {
return Observable.<Data> create(subscriber -> {
try {
Collection<Data> data = slowSynchronousCall();
// completely ignoring backpressure and not using setProducer for simplicity here
for(Data d : data) {
subscriber.onNext(d);
}
subscriber.onCompleted();
// register subscription
subscriber.add(Subscriptions.create(() -> {
try {
// do the manual refcount release
for(Data d : data) {
d.release();
}
}catch(Throwable e) {
// never throw while unsubscribing
e.printStackTrace();
}
}));
}catch(Throwable e) {
subscriber.onError(e);
}
}).subscribeOn(Schedulers.io());
}
/**
* Assuming the slow call is synchronous. And I'm making this be a single collection rather than a stream to the Observable.
*
* @param id
* @return
*/
public static Collection<Data> slowSynchronousCall() {
try {
// simulate slow
Thread.sleep(1000);
} catch (InterruptedException e) {
}
return Arrays.asList(new Data(1), new Data(2), new Data(3));
}
public static class Data {
private final int id;
public Data(int id) {
this.id = id;
}
public void release() {
System.out.println("Releasing " + toString());
}
@Override
public String toString() {
return "Data => " + id;
}
}
} This outputs:
|
@benjchristensen @simonbasle is working with me on the Java client now. The problem is why we deferred to use the Subject is we need to create it, pass it over to netty and then fill it in from the event loop as server chunks come in. I implemented a JSON streaming parser that writes chunks into the subject. Since we're giving the Subject (casted to an observable) back to the user we can't influence his behavior, but we can wrap or extend those subject as needed since they are never exposed that way to the user. The main problem here is that we expose Netty ByteBufs into the subject and if we don't clean it up properly, Netty's leak detector gets unhappy. |
Hi Michael, I figured it was Netty ByteBufs. Does the solution I offered not work? Subjects are not good to use because they break the subscription and backpressure composition. If my example doesn't work can you modify the code to show how it doesn't? = |
@benjchristensen hmm maybe I didn't grasp the whole idea.Are you suggesting that a) we should remove the subject and use the Observable.create directly? (that may or may not work in our case, but it will require a larger rewrite of that codepath) b) we wrap the subject with Observable.create, return the wrapper to the user and in the wrapper handle the releasing once emitted? |
@benjchristensen the problem is that we are not dealing with just a blocking operation. The request has been sent to the server and it is already responding, in a streaming fashion, by sending back chunks. This is all done inside the core layer, then exploited by the java sdk layer. Upon receiving the first chunk from the server we return a response with meta-data and an Observable to the java sdk layer (in fact a ReplaySubject, let's call it Next server chunk will probably be our first ByteBuf (first row), which will be fed into the ReplaySubject. At this point in time, client code may not have yet subscribed to the rows Observable, yet we need to process and store the beginning of the response, thus the ReplaySubject (1). Also, down the road the final client code (using java sdk layer) may want to process the sequence several time, without the cost of calling the server again occurring. He would do that by resubscribing on a different flow starting from rows. Once again the ReplaySubject kicks in (2). I see that there is a BufferUntilSubscriber subject implementation. Maybe this could be a better match for the first part of our use case 1 above? //this is java sdk layer code, we adapt the response from core and pass it to final client code
//coreResponse contains metadata like total size of sequence + rows observable
Observable<ByteBuf> rows = coreResponse.rows(); //this was ReplaySubject, now BufferUntilSubscriber subject (1)
int rowCount = coreResponse.rowCount();
Observable<JsonObject> clientRows = rows
.map(ByteBufToJsonObject::transcode) //this also releases the ByteBuf
.cache(totalRows); //here we cache on client side, (2)
//calling code of the client library will be able to subscribe to clientRows multiple times and process rows as JSON
return new ClientResponse(ResultEnum.SUCCESS, rowCount, clientRows); Does it give you a better grasp of what we try to achieve? Do you think it makes better sense to do things this way? |
Yes, use of
The use of (note that backpressure is still not supported on
You can model your solution after I also caution you automatically caching things to allow multiple subscribers, as that means you will never know when to release the ByteBufs (unless using ConnectableObservable.refCount(), but that doesn't solve your second use case of subscribing later, only when multiple subscribers at the same time are interested). The user should opt into that using operators like I've been working with ByteBufs on the RxNetty project and it's honestly a mess in user space to deal with. Mixing a GCd programming model with a refCounting strategy is not elegant at all. There are use cases that require the user choosing to increment/decrement the refCount themselves. That is why in RxNetty we automatically release the ByteBuf at the end of the chain, and if a user is trying to cache it for later, they need to opt in to incrementing the count and then are responsible for releasing it later. |
Interesting, thanks for the insight. I'll explore the Observable.create |
Feel free to keep discussing here. This isn't an easy thing to solve gracefully and I'm interested in figuring out a good solution with you. |
Potentially of interest to you about releasing ByteBuf -> ReactiveX/RxNetty#264 |
This is a very interesting topic :) I agree with @benjchristensen that auto-caching isn't a clean solution and the responsibility should lie in the hands of the user to cache if required i.e. when they can not eagerly subscribe to the content. OTOH, if the user eagerly subscribes to the content, then there is no need for caching. We have taken this approach of caching in zuul 2.x which is a proxy. From the point of view of auto-releasing the |
@NiteshKant thanks for chiming in here, I agree its a interesting topic and hard to get right for a variety of scenarios. I'd say we are in the position where currently everywhere we can assume that subscribers consuming the ByteBufs are trusted (so the language bindings for the core package), but of course we need to solve this "what happens if noone subscribes" issue. I'm curious to see how you did that in RxNetty, I'm sure we can apply something similar. |
@daschl I replied to your comment on the RxNetty issue |
|
|
I'm closing this due to inactivity. If you have further questions, please don't hesitate to reopen this issue or post a new one. |
Hi,
We're using ReplaySubject to keep a streamed response from a server into a buffer, in order to be able to pass the whole (finite) sequence to observables that subscribe to the subject.
The whole sequence may take a bit of time to complete, but we want to pass the Observable right away to client code.Furthermore, client code may not immediately subscribe to the flow, or it could subscribe several time to do multiple processing of the sequence.
Thus the use of ReplaySubject to avoid replaying the original request each time, and to avoid blocking for the server to respond with the whole sequence.
However, said elements are resources that are reference counted and should be manually freed before being garbage collected. What do you think is the best approach to cope with that?
Maybe what we need is to add semantics on the ReplaySubject to do cleanup and clear the buffer (like a
dispose()
orrelease()
method)? This would probably also be needed in the bounded ReplaySubject to plug in cleanup behavior at eviction time...Maybe there is a better alternative to ReplaySubject? Or a simple solution like foreaching on the items and cleaning them up this way?
I hope someone will be able to make a suggestion, and I'd be delighted to contribute if it makes sense ;) Thanks!
The text was updated successfully, but these errors were encountered: