Skip to content

Commit

Permalink
xdsclient: correct logic used to suppress empty ADS requests on new s…
Browse files Browse the repository at this point in the history
…treams (#7026)
  • Loading branch information
dfawley authored Mar 7, 2024
1 parent f7c5e6a commit 55341d7
Show file tree
Hide file tree
Showing 2 changed files with 207 additions and 20 deletions.
36 changes: 16 additions & 20 deletions xds/internal/xdsclient/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,29 +363,21 @@ func (t *Transport) send(ctx context.Context) {
// The xDS protocol only requires that we send the node proto in the first
// discovery request on every stream. Sending the node proto in every
// request message wastes CPU resources on the client and the server.
sendNodeProto := true
sentNodeProto := false
for {
select {
case <-ctx.Done():
return
case stream = <-t.adsStreamCh:
// We have a new stream and we've to ensure that the node proto gets
// sent out in the first request on the stream. At this point, we
// might not have any registered watches. Setting this field to true
// here will ensure that the node proto gets sent out along with the
// discovery request when the first watch is registered.
if len(t.resources) == 0 {
sendNodeProto = true
continue
}

if !t.sendExisting(stream) {
// sent out in the first request on the stream.
var err error
if sentNodeProto, err = t.sendExisting(stream); err != nil {
// Send failed, clear the current stream. Attempt to resend will
// only be made after a new stream is created.
stream = nil
continue
}
sendNodeProto = false
case u, ok := <-t.adsRequestCh.Get():
if !ok {
// No requests will be sent after the adsRequestCh buffer is closed.
Expand Down Expand Up @@ -416,12 +408,12 @@ func (t *Transport) send(ctx context.Context) {
// sending response back).
continue
}
if err := t.sendAggregatedDiscoveryServiceRequest(stream, sendNodeProto, resources, url, version, nonce, nackErr); err != nil {
if err := t.sendAggregatedDiscoveryServiceRequest(stream, !sentNodeProto, resources, url, version, nonce, nackErr); err != nil {
t.logger.Warningf("Sending ADS request for resources: %q, url: %q, version: %q, nonce: %q failed: %v", resources, url, version, nonce, err)
// Send failed, clear the current stream.
stream = nil
}
sendNodeProto = false
sentNodeProto = true
}
}
}
Expand All @@ -433,7 +425,9 @@ func (t *Transport) send(ctx context.Context) {
// that here because the stream has just started and Send() usually returns
// quickly (once it pushes the message onto the transport layer) and is only
// ever blocked if we don't have enough flow control quota.
func (t *Transport) sendExisting(stream adsStream) bool {
//
// Returns true if the node proto was sent.
func (t *Transport) sendExisting(stream adsStream) (sentNodeProto bool, err error) {
t.mu.Lock()
defer t.mu.Unlock()

Expand All @@ -450,16 +444,18 @@ func (t *Transport) sendExisting(stream adsStream) bool {
t.nonces = make(map[string]string)

// Send node proto only in the first request on the stream.
sendNodeProto := true
for url, resources := range t.resources {
if err := t.sendAggregatedDiscoveryServiceRequest(stream, sendNodeProto, mapToSlice(resources), url, t.versions[url], "", nil); err != nil {
if len(resources) == 0 {
continue
}
if err := t.sendAggregatedDiscoveryServiceRequest(stream, !sentNodeProto, mapToSlice(resources), url, t.versions[url], "", nil); err != nil {
t.logger.Warningf("Sending ADS request for resources: %q, url: %q, version: %q, nonce: %q failed: %v", resources, url, t.versions[url], "", err)
return false
return false, err
}
sendNodeProto = false
sentNodeProto = true
}

return true
return sentNodeProto, nil
}

// recv receives xDS responses on the provided ADS stream and branches out to
Expand Down
191 changes: 191 additions & 0 deletions xds/internal/xdsclient/transport/transport_resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package transport_test

import (
"context"
"errors"
"testing"
"time"

Expand Down Expand Up @@ -217,3 +218,193 @@ func (s) TestHandleResponseFromManagementServer(t *testing.T) {
})
}
}

func (s) TestEmptyListenerResourceOnStreamRestart(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

mgmtServer, cleanup := startFakeManagementServer(t)
defer cleanup()
t.Logf("Started xDS management server on %s", mgmtServer.Address)
nodeProto := &v3corepb.Node{Id: uuid.New().String()}
tr, err := transport.New(transport.Options{
ServerCfg: *xdstestutils.ServerConfigForAddress(t, mgmtServer.Address),
OnRecvHandler: func(update transport.ResourceUpdate) error {
return nil
},
OnSendHandler: func(*transport.ResourceSendInfo) {}, // No onSend handling.
OnErrorHandler: func(error) {}, // No stream error handling.
Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff.
NodeProto: nodeProto,
})
if err != nil {
t.Fatalf("Failed to create xDS transport: %v", err)
}
defer tr.Close()

// Send a request for a listener resource.
const resource = "some-resource"
tr.SendRequest(version.V3ListenerURL, []string{resource})

// Ensure the proper request was sent.
val, err := mgmtServer.XDSRequestChan.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for mgmt server response: %v", err)
}
wantReq := &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
Node: nodeProto,
ResourceNames: []string{resource},
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
}}
gotReq := val.(*fakeserver.Request)
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
}

// Remove the subscription by requesting an empty list.
tr.SendRequest(version.V3ListenerURL, []string{})

// Ensure the proper request was sent.
val, err = mgmtServer.XDSRequestChan.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for mgmt server response: %v", err)
}
wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
ResourceNames: []string{},
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
}}
gotReq = val.(*fakeserver.Request)
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
}

// Cause the stream to restart.
mgmtServer.XDSResponseChan <- &fakeserver.Response{Err: errors.New("go away")}

// Ensure no request is sent since there are no resources.
ctxShort, cancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer cancel()
if got, err := mgmtServer.XDSRequestChan.Receive(ctxShort); !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("mgmt server received request: %v; wanted DeadlineExceeded error", got)
}

tr.SendRequest(version.V3ListenerURL, []string{resource})

// Ensure the proper request was sent with the node proto.
val, err = mgmtServer.XDSRequestChan.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for mgmt server response: %v", err)
}
wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
Node: nodeProto,
ResourceNames: []string{resource},
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
}}
gotReq = val.(*fakeserver.Request)
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
}

}

func (s) TestEmptyClusterResourceOnStreamRestartWithListener(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

mgmtServer, cleanup := startFakeManagementServer(t)
defer cleanup()
t.Logf("Started xDS management server on %s", mgmtServer.Address)
nodeProto := &v3corepb.Node{Id: uuid.New().String()}
tr, err := transport.New(transport.Options{
ServerCfg: *xdstestutils.ServerConfigForAddress(t, mgmtServer.Address),
OnRecvHandler: func(update transport.ResourceUpdate) error {
return nil
},
OnSendHandler: func(*transport.ResourceSendInfo) {}, // No onSend handling.
OnErrorHandler: func(error) {}, // No stream error handling.
Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff.
NodeProto: nodeProto,
})
if err != nil {
t.Fatalf("Failed to create xDS transport: %v", err)
}
defer tr.Close()

// Send a request for a listener resource.
const resource = "some-resource"
tr.SendRequest(version.V3ListenerURL, []string{resource})

// Ensure the proper request was sent.
val, err := mgmtServer.XDSRequestChan.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for mgmt server response: %v", err)
}
wantReq := &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
Node: nodeProto,
ResourceNames: []string{resource},
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
}}
gotReq := val.(*fakeserver.Request)
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
}

// Send a request for a cluster resource.
tr.SendRequest(version.V3ClusterURL, []string{resource})

// Ensure the proper request was sent.
val, err = mgmtServer.XDSRequestChan.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for mgmt server response: %v", err)
}
wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
ResourceNames: []string{resource},
TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster",
}}
gotReq = val.(*fakeserver.Request)
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
}

// Remove the cluster subscription by requesting an empty list.
tr.SendRequest(version.V3ClusterURL, []string{})

// Ensure the proper request was sent.
val, err = mgmtServer.XDSRequestChan.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for mgmt server response: %v", err)
}
wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
ResourceNames: []string{},
TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster",
}}
gotReq = val.(*fakeserver.Request)
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
}

// Cause the stream to restart.
mgmtServer.XDSResponseChan <- &fakeserver.Response{Err: errors.New("go away")}

// Ensure the proper LDS request was sent.
val, err = mgmtServer.XDSRequestChan.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for mgmt server response: %v", err)
}
wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
Node: nodeProto,
ResourceNames: []string{resource},
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
}}
gotReq = val.(*fakeserver.Request)
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
}

// Ensure no cluster request is sent since there are no cluster resources.
ctxShort, cancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer cancel()
if got, err := mgmtServer.XDSRequestChan.Receive(ctxShort); !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("mgmt server received request: %v; wanted DeadlineExceeded error", got)
}
}

0 comments on commit 55341d7

Please sign in to comment.