Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bidi Blocking Stub #10318

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open

Bidi Blocking Stub #10318

wants to merge 14 commits into from

Conversation

larry-safran
Copy link
Contributor

@larry-safran larry-safran commented Jun 28, 2023

Created a BlockingClientCall class that does blocking streams for all 3 streaming types.

@larry-safran larry-safran marked this pull request as ready for review June 28, 2023 00:33
Copy link
Contributor

@YifeiZhuang YifeiZhuang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not done yet, put what I have.

stub/src/main/java/io/grpc/stub/BlockingClientCall.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/BlockingClientCall.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/BlockingClientCall.java Outdated Show resolved Hide resolved
if (!origWriteClosed && closedStatus == null) {
call.halfClose();
} else if (origWriteClosed) {
throw new IllegalStateException("halfClose cannot be called after stream terminated");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What should be the behaviour if closeStatus is non null?

the code looks like halfClose can not be called twice or called after cancel, but the error message seems the situation where the call is terminated from listener first, then calling halfClose is IlegalStateException.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the listener closed the stream, but the client hadn't called halfClose or cancel, then we want to just ignore it.
It is a timing condition whether the halfClose happens before or after the stream is closed, the caller isn't going to do anything in response to the error and the stream is already in the desired state (closed); therefore it is better to do nothing than throw an error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it can be simplified to:

if (writeClosed) {
  throw...
} else if (getClosedStatus() == null) {
  call.halfClose();
}
 writeClosed = true;

Is it to prevent halfclose when call is already cancelled? Then this is still seems not correct. We would need an atomicInteger in both places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice readability improvement. Done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you think that we would need an AtomicInteger?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I thought the API in this class should be thread safe - if user has one thread to call halfClose and the other thread to call cancel, then the code won't be able to handle the timing condition.
However, if thread safety is not a concern, (ClientCall itself is not thread safe as publicly documented), then there is no need for atomicInteger.

Also the getClosedStatus() seem can be removed. Because there is an issue:

t1: halfclose is called
t2: stream is closed and closedStatus=ok
t3: check closedSatus(L271)

then halfClose in t1 is lost.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If one thread calls halfClose and the other cancel then it is valid both for the halfClose to succeed or to throw an exception. The exception isn't thrown because it is causing the system problems, but rather because it likely indicates that there is a problem in the user's logic which they should look into.
If the client calls halfClose after the server sends close it won't cause a problem, it is just inefficient. In the example above, it is fine for halfClose not to be sent to the server because the server is by design no longer listening since it sent its own close.

stub/src/main/java/io/grpc/stub/BlockingClientCall.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/BlockingClientCall.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/BlockingClientCall.java Outdated Show resolved Hide resolved
Copy link
Member

@ejona86 ejona86 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only scratched the surface. I think I'm following along well enough. I think the comments are all small except the one about ThreadlessExecutor, but that one I also said I wasn't looking at more at the moment.

if (!origWriteClosed && closedStatus == null) {
call.halfClose();
} else if (origWriteClosed) {
throw new IllegalStateException("halfClose cannot be called after stream terminated");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it can be simplified to:

if (writeClosed) {
  throw...
} else if (getClosedStatus() == null) {
  call.halfClose();
}
 writeClosed = true;

Is it to prevent halfclose when call is already cancelled? Then this is still seems not correct. We would need an atomicInteger in both places.

stub/src/main/java/io/grpc/stub/BlockingClientCall.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/ClientCalls.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/ClientCalls.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/ClientCalls.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/BlockingClientCall.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/ClientCalls.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/ClientCalls.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/ClientCalls.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/BlockingClientCall.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/BlockingClientCall.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/BlockingClientCall.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/BlockingClientCall.java Outdated Show resolved Hide resolved
private static final Logger log =
Logger.getLogger(ThreadSafeThreadlessExecutor.class.getName());

private Lock waiterLock = new java.util.concurrent.locks.ReentrantLock();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final here and the condition. Since it is multi-threading, the final is particularly helpful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still needs final

stub/src/main/java/io/grpc/stub/BlockingClientCall.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/BlockingClientCall.java Outdated Show resolved Hide resolved
if (client_streaming && server_streaming) {
p->Print(
*vars,
"$BlockingClientCall$<$input_type$, $output_type$>\n"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's go ahead and mark these @ExperimentalApi for at least one release? The code here is pretty tight. The main thing I could see that could change is BlockingClientCall<?, RespT> becoming BlockingClientCall<ReqT, RespT>, because that seems the only choice we have.

Although maybe we could also remove the throws InterruptedException from server streaming, since we "know" it won't block. Dunno if that is worth it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Created #10918

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The generated method is not ExperimentalApi.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that I misunderstood. I added the ExperimentalApi on ClientCalls and BlockingClientCall in our code. When we talked about it a long time ago I thought you were against putting the experimental annotation in the generated code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added experimental annotation to generated code.

Copy link
Contributor Author

@larry-safran larry-safran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need a decision on whether should pass ClientCall instead of channel, method and call options. Then need to update generated code.

if (client_streaming && server_streaming) {
p->Print(
*vars,
"$BlockingClientCall$<$input_type$, $output_type$>\n"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Created #10918

stub/src/main/java/io/grpc/stub/BlockingClientCall.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/BlockingClientCall.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/BlockingClientCall.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/ClientCalls.java Outdated Show resolved Hide resolved
private static final Logger log =
Logger.getLogger(ThreadSafeThreadlessExecutor.class.getName());

private Lock waiterLock = new java.util.concurrent.locks.ReentrantLock();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

stub/src/main/java/io/grpc/stub/BlockingClientCall.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/BlockingClientCall.java Outdated Show resolved Hide resolved
stub/src/main/java/io/grpc/stub/ClientCalls.java Outdated Show resolved Hide resolved
private static final Logger log =
Logger.getLogger(ThreadSafeThreadlessExecutor.class.getName());

private Lock waiterLock = new java.util.concurrent.locks.ReentrantLock();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still needs final

@@ -340,7 +413,7 @@ private abstract static class StartableListener<T> extends ClientCall.Listener<T
abstract void onStart();
}

private static final class CallToStreamObserverAdapter<ReqT>
private static class CallToStreamObserverAdapter<ReqT>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, the static and final are separate. This should still be final.

p->Print(
*vars,
"$BlockingClientCall$<?, $output_type$>\n"
" $lower_method_name$($input_type$ request) throws java.lang.InterruptedException,\n"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should discuss whether we remove InterruptedException here. (We know it won't throw.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still an open item.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had put it in the ClientCalls method that this calls that we would rethrow it if we got interrupted while writing a request. Should we be handling it there and not rethrowing it?

stub/src/main/java/io/grpc/stub/ClientCalls.java Outdated Show resolved Hide resolved
Copy link
Contributor Author

@larry-safran larry-safran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ready for rereview

stub/src/main/java/io/grpc/stub/ClientCalls.java Outdated Show resolved Hide resolved
@@ -340,7 +413,7 @@ private abstract static class StartableListener<T> extends ClientCall.Listener<T
abstract void onStart();
}

private static final class CallToStreamObserverAdapter<ReqT>
private static class CallToStreamObserverAdapter<ReqT>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

stub/src/main/java/io/grpc/stub/ClientCalls.java Outdated Show resolved Hide resolved
Copy link
Member

@ejona86 ejona86 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can remove InterruptedException later and not break people (no error like "catching exception not thrown in try"), then we just need to experimental API annotation on the generated code before shipping.

if (client_streaming && server_streaming) {
p->Print(
*vars,
"$BlockingClientCall$<$input_type$, $output_type$>\n"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The generated method is not ExperimentalApi.

p->Print(
*vars,
"$BlockingClientCall$<?, $output_type$>\n"
" $lower_method_name$($input_type$ request) throws java.lang.InterruptedException,\n"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still an open item.

@ejona86
Copy link
Member

ejona86 commented Oct 17, 2024

FWIW, I do have a really hard time understanding what the state of the code was when I last reviewed, so that I can see the changes.

@larry-safran
Copy link
Contributor Author

Sorry about making such a mess with the rebase. I'll wait until you're ready to approve before trying to rebase again.

@ejona86
Copy link
Member

ejona86 commented Oct 18, 2024

Sorry about making such a mess with the rebase. I'll wait until you're ready to approve before trying to rebase again.

It was less that you did the rebase and more that I can't tell where I should start reading. If you had squashed all the commits before the rebase and fixes were on top, I'd have actually been fine. Right now the first commit is "Fix a bunch of things" so I get lost immediately.

I see now that if I keep looking at commits, they make sense again, so the first commit just had the wrong message. But also the commits are repeated a second time. Looks like it was just a bad rebase.

@larry-safran
Copy link
Contributor Author

larry-safran commented Oct 19, 2024 via email

@ejona86
Copy link
Member

ejona86 commented Oct 23, 2024

that's why I called it "Fix a bunch of
things". I see that adding "Squashed" would have been a good idea.

A better name would have been "Bidi Blocking Stub" or similar, since it had all the implementation in it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants