Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resource not re-sent if clients unsubscribe and then resubscribe #431

Open
menghanl opened this issue May 11, 2021 · 15 comments
Open

Resource not re-sent if clients unsubscribe and then resubscribe #431

menghanl opened this issue May 11, 2021 · 15 comments

Comments

@menghanl
Copy link

The scenario:

Client is watch for resource A

req(A, "")
resp(A, v1)
req(A, v1) // this is ACK

user unsubscribe for A

req([], v1) // this removes A from the requested resources

user resubscribe for A

req(A, v1)

It's expected that the control plane will resend resource for A to the client, even though there's no update in the resource, because of the unsubscribe and resubscribe.

But go-control-plane doesn't re-send the resource.

@menghanl menghanl changed the title Resource not re-sent after unsubscribe and resubscribe Resource not re-sent if clients unsubscribe and then resubscribe May 11, 2021
@menghanl
Copy link
Author

menghanl commented May 11, 2021

The spec doesn't talk about this scenario explicitly, but does mention the client can update the resource_names

In the Unsubscribing From Resources section

unsubscribing to a set of resources is done by sending a new request containing all resource names that are still being subscribed to but not containing the resource names being unsubscribed to

In the Resource updates section

Envoy may update the list of resource_names it presents to the management server in each DiscoveryRequest that ACK/NACKs a specific DiscoveryResponse. In addition, Envoy may later issue additional DiscoveryRequests at a given version_info to update the management server with new resource hints

@menghanl
Copy link
Author

menghanl commented May 11, 2021

Expand to see code to reproduce
import (
	"context"
	"fmt"
	"net"
	"testing"

	"github.com/envoyproxy/go-control-plane/pkg/cache/types"
	"google.golang.org/grpc"
	"google.golang.org/protobuf/types/known/wrapperspb"

	v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
	v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
	v3discoverygrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
	v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
	v3cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
	v3server "github.com/envoyproxy/go-control-plane/pkg/server/v3"
)

func testEndpoint(clusterName string, host string, port uint32) *v3endpointpb.ClusterLoadAssignment {
	return &v3endpointpb.ClusterLoadAssignment{
		ClusterName: clusterName,
		Endpoints: []*v3endpointpb.LocalityLbEndpoints{{
			Locality: &v3corepb.Locality{SubZone: "subzone"},
			LbEndpoints: []*v3endpointpb.LbEndpoint{{
				HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{Endpoint: &v3endpointpb.Endpoint{
					Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{
						SocketAddress: &v3corepb.SocketAddress{
							Protocol:      v3corepb.SocketAddress_TCP,
							Address:       host,
							PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: uint32(port)}},
					}},
				}},
			}},
			LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1},
			Priority:            0,
		}},
	}
}

func TestGCPFailure(t *testing.T) {
	// Start go-control-plane server.
	cache := v3cache.NewSnapshotCache(true, v3cache.IDHash{}, nil)
	fmt.Println("Created new snapshot cache...")
	lis, err := net.Listen("tcp", "localhost:0")
	if err != nil {
		panic(err.Error())
	}
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	xs := v3server.NewServer(ctx, cache, v3server.CallbackFuncs{})
	gs := grpc.NewServer()
	v3discoverygrpc.RegisterAggregatedDiscoveryServiceServer(gs, xs)
	fmt.Println("Registered Aggregated Discovery Service (ADS)...")
	go gs.Serve(lis)
	defer gs.Stop()

	// Start a client and an ADS stream.
	cc, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock())
	if err != nil {
		panic(err.Error())
	}
	adsStream, err := v3discoverypb.NewAggregatedDiscoveryServiceClient(cc).StreamAggregatedResources(context.Background(), grpc.WaitForReady(true))
	if err != nil {
		panic(err.Error())
	}

	const (
		nodeID         = "node-ID"
		clusterName    = "cluster-name"
		V3EndpointsURL = "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment"
	)

	// Send the first EDS req.
	fmt.Println("sending first EDS req")
	if err := adsStream.Send(&v3discoverypb.DiscoveryRequest{
		Node:          &v3corepb.Node{Id: nodeID},
		TypeUrl:       V3EndpointsURL,
		ResourceNames: []string{clusterName},
		VersionInfo:   "",
		ResponseNonce: "",
	}); err != nil {
		panic(err.Error())
	}

	version1 := "version-1"
	endpoints1 := []types.Resource{
		testEndpoint(clusterName, "127.0.0.1", 9527),
	}
	snapshot1 := v3cache.NewSnapshot(version1, endpoints1, nil, nil, nil, nil, nil)
	if err := cache.SetSnapshot(nodeID, snapshot1); err != nil {
		panic(err.Error())
	}

	// Receive the first resp.
	resp, err := adsStream.Recv()
	if err != nil {
		panic(err.Error())
	}
	fmt.Println(resp)
	nonce1 := resp.Nonce

	// Send a request to unsubscribe the EDS resource.
	fmt.Println("unsubscribe")
	if err := adsStream.Send(&v3discoverypb.DiscoveryRequest{
		Node:          &v3corepb.Node{Id: nodeID},
		TypeUrl:       V3EndpointsURL,
		ResourceNames: []string{},
		VersionInfo:   version1,
		ResponseNonce: nonce1,
	}); err != nil {
		panic(err.Error())
	}

	// Send a request to resubscribe the EDS resource.
	fmt.Println("resubscribe")
	if err := adsStream.Send(&v3discoverypb.DiscoveryRequest{
		Node:          &v3corepb.Node{Id: nodeID},
		TypeUrl:       V3EndpointsURL,
		ResourceNames: []string{clusterName},
		VersionInfo:   version1,
		ResponseNonce: nonce1,
	}); err != nil {
		panic(err.Error())
	}

	// This recv blocks forever.
	//
	// But it's expected that the server will send the EDS resource to the
	// client again, even though there's no update to the resource. Because
	// there's unsubscribe and resubscribe.
	resp2, err := adsStream.Recv()
	if err != nil {
		panic(err.Error())
	}
	fmt.Println(resp2)
}

The last Recv() on the stream after resubscribing blocks forever.

@menghanl
Copy link
Author

cc @markdroth @dfawley

@snowp
Copy link
Contributor

snowp commented May 19, 2021

This seems like a bug to me. I assume this happens because the code right now assumes that if the requested version matches the server version then nothing needs to be done, but it sounds like we need to track subscriptions even in SotW to re-push resources for these kind of subscription changes.

@htuch can you confirm that what this issue talks about is the expected behavior?

@alecholmez We might need some kind of subscription state for SotW as well based on this issue, maybe there are options for consolidating the new delta state with sotw.

@markdroth
Copy link

Yes, I think this is a bug. The xDS server definitely needs to explicitly track the set of resources that the client is subscribed to.

Another relevant part of the spec is Knowing When a Requested Resource Does Not Exist:

Note that even if a requested resource does not exist at the moment when the client requests it, that resource could be created at any time. Management servers must remember the set of resources being requested by the client, and if one of those resources springs into existence later, the server must send an update to the client informing it of the new resource. Clients that initially see a resource that does not exist must be prepared for the resource to be created at any time.

The way I think of an xDS server, its job is to basically match up the states of a database containing a set of resources with the states of the individual client streams. Each client stream has an associated list of resource names that it is subscribed to, along with the corresponding version of each resource that has already been sent to the client. Whenever the client changes the set of resources it is subscribing to, new resources may need to be sent to the client. Whenever a resource in the database gets updated (where creating and removing a resource are just special cases), that change needs to be propagated to any streams that are subscribing to that resource. (In this sense, the version tracking in SotW is IMHO actually harder to implement than the state tracking for the incremental protocol, because the semantics around the resource type instance version are actually much harder to understand.)

Note that in general, the server cannot rely on the version sent by the client to decide what set of resources it needs to send; it should always pay attention to changes in the set of resources that the client is subscribing to in its requests. The only case where the server may not send a resource requested by the client is upon stream reconnection, and even then this optimization works only in a very narrow set of circumstances, as per the ACK/NACK and resource type instance version section of the spec:

Note that the version for a resource type is not a property of an individual xDS stream but rather a property of the resources themselves. If the stream becomes broken and the client creates a new stream, the client’s initial request on the new stream should indicate the most recent version seen by the client on the previous stream. Servers may decide to optimize by not resending resources that the client had already seen on the previous stream, but only if they know that the client is not subscribing to a new resource that it was not previously subscribed to. For example, it is generally safe for servers to do this optimization for wildcard LDS and CDS requests, and it is safe to do in environments where the clients will always subscribe to exactly the same set of resources.

@alecholmez
Copy link
Contributor

alecholmez commented May 20, 2021

@snowp I think we can certainly abstract a pattern out of the delta logic that we could apply within SOTW. It might even be safe to say that if this is a bug in SOTW gRPC code, it's probably a bug in the REST logic too since they both follow the same protocol.

Luckily I defined the StreamState as a global server package so we could internally reference that pattern throughout the code.

@alecholmez
Copy link
Contributor

This is a big bug and I think I will go ahead and take this

@steeling
Copy link
Contributor

steeling commented Aug 2, 2022

any update on this?

@tony612
Copy link
Contributor

tony612 commented Oct 21, 2022

@alecholmez Any plan to fix this?

@bayoumymac
Copy link

@alecholmez any update on this?

@duxin40
Copy link

duxin40 commented Mar 5, 2024

@alecholmez any update on this?

@kkalin68
Copy link
Contributor

I know there is a PR that will address the issue. Meanwhile I made a patch for v0.12.0 that seems to solve the issue on current codebase

diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go
index ebf63f5b6..6dbcf6398 100644
--- a/pkg/cache/v3/simple.go
+++ b/pkg/cache/v3/simple.go
@@ -411,11 +411,19 @@ func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.Str
 	if exists {
 		knownResourceNames := streamState.GetKnownResourceNames(request.GetTypeUrl())
 		diff := []string{}
+		subscribed := make(map[string]bool)
 		for _, r := range request.GetResourceNames() {
+			subscribed[r] = true
 			if _, ok := knownResourceNames[r]; !ok {
 				diff = append(diff, r)
 			}
 		}
+		for k := range knownResourceNames {
+			if _, ok := subscribed[k]; !ok {
+				delete(knownResourceNames, k)
+			}
+		}
+		streamState.SetKnownResourceNames(request.GetTypeUrl(), knownResourceNames)
 
 		cache.log.Debugf("nodeID %q requested %s%v and known %v. Diff %v", nodeID,
 			request.GetTypeUrl(), request.GetResourceNames(), knownResourceNames, diff)

@carsonoid
Copy link

carsonoid commented Mar 27, 2024

@kkalin68 thanks for this!

I can confirm that your patch works. I am able to reliably recreate this issue using go grpc xds clients and applying this patch fixes it! I'm wondering if there a downside to using this fix over the fixes and changes proposed PRs attached to this issue. This seems like a much smaller and more focused fix.


For those who are curious, it's actually fairly easy to recreate this locally:

  • Create two grpc clients in one process
  • One uses ResourceA and has a very short idle timeout (I used 250ms)
  • Two uses ResourceB and has a normal timeout

Then have each client use do a grpc call every second, the first client will basically become blocked instantly and never work. It will simply always report Received error from the name resolver: produced zero addresses. The broken client will then stay in this state until an xds update happens and the watches are recreated.

@valerian-roche
Copy link
Contributor

This PR in our fork has been implemented to address this issue. We are currently validating the impact on grpc and envoy clients.
Other issues are also being addressed to properly handle grpc clients in those PRs. There is also a code fix in upstream grpc (for go) to address an improper wildcard subscription which will be released in grpc-go 1.63.0

@easwars
Copy link
Contributor

easwars commented Nov 4, 2024

gRPC-Go uses the go-control-plane as the management server for our tests and we are seeing multiple tests flaking because of this bug. Can a fix for this be prioritized, please. Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests