13
13
// limitations under the License.
14
14
package com .google .devtools .build .lib .remote ;
15
15
16
+ import com .google .devtools .build .lib .remote .grpc .ChannelConnectionFactory ;
17
+ import com .google .devtools .build .lib .remote .grpc .ChannelConnectionFactory .ChannelConnection ;
18
+ import com .google .devtools .build .lib .remote .grpc .DynamicConnectionPool ;
19
+ import com .google .devtools .build .lib .remote .grpc .SharedConnectionFactory .SharedConnection ;
16
20
import io .grpc .CallOptions ;
21
+ import io .grpc .Channel ;
17
22
import io .grpc .ClientCall ;
18
- import io .grpc .ManagedChannel ;
23
+ import io .grpc .ForwardingClientCall ;
24
+ import io .grpc .ForwardingClientCallListener ;
25
+ import io .grpc .Metadata ;
19
26
import io .grpc .MethodDescriptor ;
27
+ import io .grpc .Status ;
20
28
import io .netty .util .AbstractReferenceCounted ;
21
29
import io .netty .util .ReferenceCounted ;
22
- import java .util . concurrent . TimeUnit ;
30
+ import java .io . IOException ;
23
31
24
32
/**
25
- * A wrapper around a {@link io.grpc.ManagedChannel } exposing a reference count. When instantiated
26
- * the reference count is 1. {@link ManagedChannel#shutdown ()} will be called on the wrapped channel
27
- * when the reference count reaches 0.
33
+ * A wrapper around a {@link DynamicConnectionPool } exposing {@link Channel} and a reference count.
34
+ * When instantiated the reference count is 1. {@link DynamicConnectionPool#close ()} will be called
35
+ * on the wrapped channel when the reference count reaches 0.
28
36
*
29
37
* <p>See {@link ReferenceCounted} for more information about reference counting.
30
38
*/
31
- public class ReferenceCountedChannel extends ManagedChannel implements ReferenceCounted {
32
-
33
- private final ManagedChannel channel ;
34
- private final AbstractReferenceCounted referenceCounted ;
35
-
36
- public ReferenceCountedChannel (ManagedChannel channel ) {
37
- this (
38
- channel ,
39
- new AbstractReferenceCounted () {
40
- @ Override
41
- protected void deallocate () {
42
- channel .shutdown ();
39
+ public class ReferenceCountedChannel extends Channel implements ReferenceCounted {
40
+ private final DynamicConnectionPool dynamicConnectionPool ;
41
+ private final AbstractReferenceCounted referenceCounted =
42
+ new AbstractReferenceCounted () {
43
+ @ Override
44
+ protected void deallocate () {
45
+ try {
46
+ dynamicConnectionPool .close ();
47
+ } catch (IOException e ) {
48
+ throw new AssertionError (e .getMessage (), e );
43
49
}
50
+ }
44
51
45
- @ Override
46
- public ReferenceCounted touch (Object o ) {
47
- return this ;
48
- }
49
- });
50
- }
51
-
52
- protected ReferenceCountedChannel (
53
- ManagedChannel channel , AbstractReferenceCounted referenceCounted ) {
54
- this .channel = channel ;
55
- this .referenceCounted = referenceCounted ;
56
- }
52
+ @ Override
53
+ public ReferenceCounted touch (Object o ) {
54
+ return this ;
55
+ }
56
+ };
57
57
58
- @ Override
59
- public ManagedChannel shutdown () {
60
- throw new UnsupportedOperationException ("Don't call shutdown() directly, but use release() "
61
- + "instead." );
58
+ public ReferenceCountedChannel (ChannelConnectionFactory connectionFactory ) {
59
+ this .dynamicConnectionPool =
60
+ new DynamicConnectionPool (connectionFactory , connectionFactory .maxConcurrency ());
62
61
}
63
62
64
- @ Override
65
63
public boolean isShutdown () {
66
- return channel .isShutdown ();
67
- }
68
-
69
- @ Override
70
- public boolean isTerminated () {
71
- return channel .isTerminated ();
72
- }
73
-
74
- @ Override
75
- public ManagedChannel shutdownNow () {
76
- throw new UnsupportedOperationException ("Don't call shutdownNow() directly, but use release() "
77
- + "instead." );
78
- }
79
-
80
- @ Override
81
- public boolean awaitTermination (long timeout , TimeUnit timeUnit ) throws InterruptedException {
82
- return channel .awaitTermination (timeout , timeUnit );
64
+ return dynamicConnectionPool .isClosed ();
65
+ }
66
+
67
+ /** A {@link ClientCall} which call {@link SharedConnection#close()} after the RPC is closed. */
68
+ static class ConnectionCleanupCall <ReqT , RespT >
69
+ extends ForwardingClientCall .SimpleForwardingClientCall <ReqT , RespT > {
70
+ private final SharedConnection connection ;
71
+
72
+ protected ConnectionCleanupCall (ClientCall <ReqT , RespT > delegate , SharedConnection connection ) {
73
+ super (delegate );
74
+ this .connection = connection ;
75
+ }
76
+
77
+ @ Override
78
+ public void start (Listener <RespT > responseListener , Metadata headers ) {
79
+ super .start (
80
+ new ForwardingClientCallListener .SimpleForwardingClientCallListener <RespT >(
81
+ responseListener ) {
82
+ @ Override
83
+ public void onClose (Status status , Metadata trailers ) {
84
+ super .onClose (status , trailers );
85
+
86
+ try {
87
+ connection .close ();
88
+ } catch (IOException e ) {
89
+ throw new AssertionError (e .getMessage (), e );
90
+ }
91
+ }
92
+ },
93
+ headers );
94
+ }
83
95
}
84
96
85
97
@ Override
86
98
public <RequestT , ResponseT > ClientCall <RequestT , ResponseT > newCall (
87
99
MethodDescriptor <RequestT , ResponseT > methodDescriptor , CallOptions callOptions ) {
88
- return channel .<RequestT , ResponseT >newCall (methodDescriptor , callOptions );
100
+ SharedConnection sharedConnection = dynamicConnectionPool .create ().blockingGet ();
101
+ ChannelConnection connection = (ChannelConnection ) sharedConnection .getUnderlyingConnection ();
102
+ return new ConnectionCleanupCall <>(
103
+ connection .getChannel ().newCall (methodDescriptor , callOptions ), sharedConnection );
89
104
}
90
105
91
106
@ Override
92
107
public String authority () {
93
- return channel .authority ();
108
+ SharedConnection sharedConnection = dynamicConnectionPool .create ().blockingGet ();
109
+ ChannelConnection connection = (ChannelConnection ) sharedConnection .getUnderlyingConnection ();
110
+ return connection .getChannel ().authority ();
94
111
}
95
112
96
113
@ Override
@@ -131,4 +148,4 @@ public boolean release() {
131
148
public boolean release (int decrement ) {
132
149
return referenceCounted .release (decrement );
133
150
}
134
- }
151
+ }
0 commit comments