Skip to content

Commit 87d80e9

Browse files
authored
Merge branch 'grpc:master' into benchmark-client-context-cancellation
2 parents 123e0d4 + 9ff80a7 commit 87d80e9

File tree

5 files changed

+322
-141
lines changed

5 files changed

+322
-141
lines changed

internal/transport/http2_server.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,15 @@ import (
3535

3636
"golang.org/x/net/http2"
3737
"golang.org/x/net/http2/hpack"
38+
"google.golang.org/protobuf/proto"
39+
3840
"google.golang.org/grpc/internal"
3941
"google.golang.org/grpc/internal/grpclog"
4042
"google.golang.org/grpc/internal/grpcutil"
4143
"google.golang.org/grpc/internal/pretty"
4244
istatus "google.golang.org/grpc/internal/status"
4345
"google.golang.org/grpc/internal/syscall"
4446
"google.golang.org/grpc/mem"
45-
"google.golang.org/protobuf/proto"
4647

4748
"google.golang.org/grpc/codes"
4849
"google.golang.org/grpc/credentials"
@@ -1304,15 +1305,16 @@ func (t *http2Server) Close(err error) {
13041305
// deleteStream deletes the stream s from transport's active streams.
13051306
func (t *http2Server) deleteStream(s *ServerStream, eosReceived bool) {
13061307
t.mu.Lock()
1307-
if _, ok := t.activeStreams[s.id]; ok {
1308+
_, isActive := t.activeStreams[s.id]
1309+
if isActive {
13081310
delete(t.activeStreams, s.id)
13091311
if len(t.activeStreams) == 0 {
13101312
t.idle = time.Now()
13111313
}
13121314
}
13131315
t.mu.Unlock()
13141316

1315-
if channelz.IsOn() {
1317+
if isActive && channelz.IsOn() {
13161318
if eosReceived {
13171319
t.channelz.SocketMetrics.StreamsSucceeded.Add(1)
13181320
} else {

internal/transport/transport_test.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,11 @@ import (
3939
"github.com/google/go-cmp/cmp"
4040
"golang.org/x/net/http2"
4141
"golang.org/x/net/http2/hpack"
42+
4243
"google.golang.org/grpc/attributes"
4344
"google.golang.org/grpc/codes"
4445
"google.golang.org/grpc/credentials"
46+
"google.golang.org/grpc/internal"
4547
"google.golang.org/grpc/internal/channelz"
4648
"google.golang.org/grpc/internal/grpctest"
4749
"google.golang.org/grpc/internal/leakcheck"
@@ -3260,3 +3262,125 @@ func (s) TestClientTransport_Handle1xxHeaders(t *testing.T) {
32603262
})
32613263
}
32623264
}
3265+
3266+
func (s) TestDeleteStreamMetricsIncrementedOnlyOnce(t *testing.T) {
3267+
// Enable channelz for metrics collection
3268+
defer internal.ChannelzTurnOffForTesting()
3269+
if !channelz.IsOn() {
3270+
channelz.TurnOn()
3271+
}
3272+
3273+
for _, test := range []struct {
3274+
name string
3275+
eosReceived bool
3276+
wantStreamSucceeded int64
3277+
wantStreamFailed int64
3278+
}{
3279+
{
3280+
name: "StreamsSucceeded",
3281+
eosReceived: true,
3282+
wantStreamSucceeded: 1,
3283+
wantStreamFailed: 0,
3284+
},
3285+
{
3286+
name: "StreamsFailed",
3287+
eosReceived: false,
3288+
wantStreamSucceeded: 0,
3289+
wantStreamFailed: 1,
3290+
},
3291+
} {
3292+
t.Run(test.name, func(t *testing.T) {
3293+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
3294+
defer cancel()
3295+
3296+
// Setup server configuration with channelz support
3297+
serverConfig := &ServerConfig{
3298+
ChannelzParent: channelz.RegisterServer(t.Name()),
3299+
}
3300+
defer channelz.RemoveEntry(serverConfig.ChannelzParent.ID)
3301+
3302+
// Create server and client with normal handler (not notifyCall)
3303+
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, ConnectOptions{})
3304+
defer func() {
3305+
client.Close(fmt.Errorf("test cleanup"))
3306+
server.stop()
3307+
cancel()
3308+
}()
3309+
3310+
// Wait for connection to be established
3311+
waitWhileTrue(t, func() (bool, error) {
3312+
server.mu.Lock()
3313+
defer server.mu.Unlock()
3314+
if len(server.conns) == 0 {
3315+
return true, fmt.Errorf("timed-out while waiting for connection")
3316+
}
3317+
return false, nil
3318+
})
3319+
3320+
// Get the server transport
3321+
server.mu.Lock()
3322+
var serverTransport *http2Server
3323+
for st := range server.conns {
3324+
serverTransport = st.(*http2Server)
3325+
break
3326+
}
3327+
server.mu.Unlock()
3328+
3329+
if serverTransport == nil {
3330+
t.Fatal("Server transport not found")
3331+
}
3332+
3333+
clientStream, err := client.NewStream(ctx, &CallHdr{})
3334+
if err != nil {
3335+
t.Fatalf("Failed to create stream: %v", err)
3336+
}
3337+
3338+
// Wait for the stream to be created on the server side
3339+
var serverStream *ServerStream
3340+
waitWhileTrue(t, func() (bool, error) {
3341+
serverTransport.mu.Lock()
3342+
defer serverTransport.mu.Unlock()
3343+
for _, v := range serverTransport.activeStreams {
3344+
if v.id == clientStream.id {
3345+
serverStream = v
3346+
return false, nil
3347+
}
3348+
}
3349+
return true, nil
3350+
})
3351+
3352+
if serverStream == nil {
3353+
t.Fatalf("Server stream not found for client stream ID %d", clientStream.id)
3354+
}
3355+
3356+
// First call to deleteStream should remove the stream from activeStreams and update metrics
3357+
serverTransport.deleteStream(serverStream, test.eosReceived)
3358+
3359+
// Check metrics after first deleteStream call
3360+
streamsSucceeded := serverTransport.channelz.SocketMetrics.StreamsSucceeded.Load()
3361+
streamsFailed := serverTransport.channelz.SocketMetrics.StreamsFailed.Load()
3362+
3363+
if streamsSucceeded != test.wantStreamSucceeded {
3364+
t.Errorf("After first deleteStream - StreamsSucceeded: got %d, want %d", streamsSucceeded, test.wantStreamSucceeded)
3365+
}
3366+
if streamsFailed != test.wantStreamFailed {
3367+
t.Errorf("After first deleteStream - StreamsFailed: got %d, want %d", streamsFailed, test.wantStreamFailed)
3368+
}
3369+
3370+
// Additional calls to deleteStream should not change metrics (stream already deleted)
3371+
serverTransport.deleteStream(serverStream, test.eosReceived)
3372+
serverTransport.deleteStream(serverStream, test.eosReceived)
3373+
3374+
// Verify metrics haven't changed after subsequent calls
3375+
additionalStreamsSucceeded := serverTransport.channelz.SocketMetrics.StreamsSucceeded.Load()
3376+
additionalStreamsFailed := serverTransport.channelz.SocketMetrics.StreamsFailed.Load()
3377+
3378+
if additionalStreamsSucceeded != test.wantStreamSucceeded {
3379+
t.Errorf("After multiple deleteStream calls - StreamsSucceeded changed: got %d, want %d", additionalStreamsSucceeded, test.wantStreamSucceeded)
3380+
}
3381+
if additionalStreamsFailed != test.wantStreamFailed {
3382+
t.Errorf("After multiple deleteStream calls - StreamsFailed changed: got %d, want %d", additionalStreamsFailed, test.wantStreamFailed)
3383+
}
3384+
})
3385+
}
3386+
}

internal/xds/balancer/clusterimpl/balancer_test.go

Lines changed: 3 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@ package clusterimpl
2020

2121
import (
2222
"context"
23-
"encoding/json"
2423
"errors"
25-
"strings"
2624
"testing"
2725
"time"
2826

@@ -38,7 +36,6 @@ import (
3836
"google.golang.org/grpc/internal/xds/testutils/fakeclient"
3937
"google.golang.org/grpc/internal/xds/xdsclient"
4038
"google.golang.org/grpc/resolver"
41-
"google.golang.org/grpc/serviceconfig"
4239
)
4340

4441
const (
@@ -70,9 +67,6 @@ func init() {
7067
// are handled in the run() goroutine, which exits before Close() returns, we
7168
// expect the above picker update to be dropped.
7269
func (s) TestPickerUpdateAfterClose(t *testing.T) {
73-
defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
74-
xdsC := fakeclient.NewClient()
75-
7670
builder := balancer.Get(Name)
7771
cc := testutils.NewBalancerClientConn(t)
7872
b := builder.Build(cc, balancer.BuildOptions{})
@@ -107,6 +101,7 @@ func (s) TestPickerUpdateAfterClose(t *testing.T) {
107101
})
108102

109103
var maxRequest uint32 = 50
104+
xdsC := fakeclient.NewClient()
110105
if err := b.UpdateClientConnState(balancer.ClientConnState{
111106
ResolverState: xdsclient.SetClient(resolver.State{Endpoints: testBackendEndpoints}, xdsC),
112107
BalancerConfig: &LBConfig{
@@ -143,14 +138,12 @@ func (s) TestClusterNameInAddressAttributes(t *testing.T) {
143138
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
144139
defer cancel()
145140

146-
defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
147-
xdsC := fakeclient.NewClient()
148-
149141
builder := balancer.Get(Name)
150142
cc := testutils.NewBalancerClientConn(t)
151143
b := builder.Build(cc, balancer.BuildOptions{})
152144
defer b.Close()
153145

146+
xdsC := fakeclient.NewClient()
154147
if err := b.UpdateClientConnState(balancer.ClientConnState{
155148
ResolverState: xdsclient.SetClient(resolver.State{Endpoints: testBackendEndpoints}, xdsC),
156149
BalancerConfig: &LBConfig{
@@ -212,108 +205,6 @@ func (s) TestClusterNameInAddressAttributes(t *testing.T) {
212205
}
213206
}
214207

215-
// Test verifies that child policies was updated on receipt of
216-
// configuration update.
217-
func (s) TestChildPolicyUpdatedOnConfigUpdate(t *testing.T) {
218-
xdsC := fakeclient.NewClient()
219-
220-
builder := balancer.Get(Name)
221-
cc := testutils.NewBalancerClientConn(t)
222-
b := builder.Build(cc, balancer.BuildOptions{})
223-
defer b.Close()
224-
225-
// Keep track of which child policy was updated
226-
updatedChildPolicy := ""
227-
228-
// Create stub balancers to track config updates
229-
const (
230-
childPolicyName1 = "stubBalancer1"
231-
childPolicyName2 = "stubBalancer2"
232-
)
233-
234-
stub.Register(childPolicyName1, stub.BalancerFuncs{
235-
UpdateClientConnState: func(_ *stub.BalancerData, _ balancer.ClientConnState) error {
236-
updatedChildPolicy = childPolicyName1
237-
return nil
238-
},
239-
})
240-
241-
stub.Register(childPolicyName2, stub.BalancerFuncs{
242-
UpdateClientConnState: func(_ *stub.BalancerData, _ balancer.ClientConnState) error {
243-
updatedChildPolicy = childPolicyName2
244-
return nil
245-
},
246-
})
247-
248-
// Initial config update with childPolicyName1
249-
if err := b.UpdateClientConnState(balancer.ClientConnState{
250-
ResolverState: xdsclient.SetClient(resolver.State{Endpoints: testBackendEndpoints}, xdsC),
251-
BalancerConfig: &LBConfig{
252-
Cluster: testClusterName,
253-
ChildPolicy: &internalserviceconfig.BalancerConfig{
254-
Name: childPolicyName1,
255-
},
256-
},
257-
}); err != nil {
258-
t.Fatalf("Error updating the config: %v", err)
259-
}
260-
261-
if updatedChildPolicy != childPolicyName1 {
262-
t.Fatal("Child policy 1 was not updated on initial configuration update.")
263-
}
264-
265-
// Second config update with childPolicyName2
266-
if err := b.UpdateClientConnState(balancer.ClientConnState{
267-
ResolverState: xdsclient.SetClient(resolver.State{Endpoints: testBackendEndpoints}, xdsC),
268-
BalancerConfig: &LBConfig{
269-
Cluster: testClusterName,
270-
ChildPolicy: &internalserviceconfig.BalancerConfig{
271-
Name: childPolicyName2,
272-
},
273-
},
274-
}); err != nil {
275-
t.Fatalf("Error updating the config: %v", err)
276-
}
277-
278-
if updatedChildPolicy != childPolicyName2 {
279-
t.Fatal("Child policy 2 was not updated after child policy name change.")
280-
}
281-
}
282-
283-
// Test verifies that config update fails if child policy config
284-
// failed to parse.
285-
func (s) TestFailedToParseChildPolicyConfig(t *testing.T) {
286-
xdsC := fakeclient.NewClient()
287-
288-
builder := balancer.Get(Name)
289-
cc := testutils.NewBalancerClientConn(t)
290-
b := builder.Build(cc, balancer.BuildOptions{})
291-
defer b.Close()
292-
293-
// Create a stub balancer which fails to ParseConfig.
294-
const parseConfigError = "failed to parse config"
295-
const childPolicyName = "stubBalancer-FailedToParseChildPolicyConfig"
296-
stub.Register(childPolicyName, stub.BalancerFuncs{
297-
ParseConfig: func(_ json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
298-
return nil, errors.New(parseConfigError)
299-
},
300-
})
301-
302-
err := b.UpdateClientConnState(balancer.ClientConnState{
303-
ResolverState: xdsclient.SetClient(resolver.State{Endpoints: testBackendEndpoints}, xdsC),
304-
BalancerConfig: &LBConfig{
305-
Cluster: testClusterName,
306-
ChildPolicy: &internalserviceconfig.BalancerConfig{
307-
Name: childPolicyName,
308-
},
309-
},
310-
})
311-
312-
if err == nil || !strings.Contains(err.Error(), parseConfigError) {
313-
t.Fatalf("Got error: %v, want error: %s", err, parseConfigError)
314-
}
315-
}
316-
317208
// Test verify that the case picker is updated synchronously on receipt of
318209
// configuration update.
319210
func (s) TestPickerUpdatedSynchronouslyOnConfigUpdate(t *testing.T) {
@@ -342,9 +233,6 @@ func (s) TestPickerUpdatedSynchronouslyOnConfigUpdate(t *testing.T) {
342233
}
343234
defer func() { clientConnUpdateHook = origClientConnUpdateHook }()
344235

345-
defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
346-
xdsC := fakeclient.NewClient()
347-
348236
builder := balancer.Get(Name)
349237
cc := testutils.NewBalancerClientConn(t)
350238
b := builder.Build(cc, balancer.BuildOptions{})
@@ -362,6 +250,7 @@ func (s) TestPickerUpdatedSynchronouslyOnConfigUpdate(t *testing.T) {
362250
},
363251
})
364252

253+
xdsC := fakeclient.NewClient()
365254
if err := b.UpdateClientConnState(balancer.ClientConnState{
366255
ResolverState: xdsclient.SetClient(resolver.State{Endpoints: testBackendEndpoints}, xdsC),
367256
BalancerConfig: &LBConfig{

0 commit comments

Comments
 (0)