Skip to content

Commit

Permalink
xds: change the DumpResources API to return proto message containing …
Browse files Browse the repository at this point in the history
…the resource dump (#7240)
  • Loading branch information
easwars authored May 22, 2024
1 parent 48b6b11 commit a75dfa6
Show file tree
Hide file tree
Showing 6 changed files with 428 additions and 306 deletions.
64 changes: 5 additions & 59 deletions xds/csds/csds.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@ import (
internalgrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
"google.golang.org/protobuf/types/known/timestamppb"

v3adminpb "github.com/envoyproxy/go-control-plane/envoy/admin/v3"
v3statusgrpc "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
)
Expand Down Expand Up @@ -77,7 +74,7 @@ func NewClientStatusDiscoveryServer() (*ClientStatusDiscoveryServer, error) {
return s, nil
}

// StreamClientStatus implementations interface ClientStatusDiscoveryServiceServer.
// StreamClientStatus implements interface ClientStatusDiscoveryServiceServer.
func (s *ClientStatusDiscoveryServer) StreamClientStatus(stream v3statusgrpc.ClientStatusDiscoveryService_StreamClientStatusServer) error {
for {
req, err := stream.Recv()
Expand All @@ -97,13 +94,13 @@ func (s *ClientStatusDiscoveryServer) StreamClientStatus(stream v3statusgrpc.Cli
}
}

// FetchClientStatus implementations interface ClientStatusDiscoveryServiceServer.
// FetchClientStatus implements interface ClientStatusDiscoveryServiceServer.
func (s *ClientStatusDiscoveryServer) FetchClientStatus(_ context.Context, req *v3statuspb.ClientStatusRequest) (*v3statuspb.ClientStatusResponse, error) {
return s.buildClientStatusRespForReq(req)
}

// buildClientStatusRespForReq fetches the status from the client, and returns
// the response to be sent back to xdsclient.
// buildClientStatusRespForReq fetches the status of xDS resources from the
// xdsclient, and returns the response to be sent back to the csds client.
//
// If it returns an error, the error is a status error.
func (s *ClientStatusDiscoveryServer) buildClientStatusRespForReq(req *v3statuspb.ClientStatusRequest) (*v3statuspb.ClientStatusResponse, error) {
Expand All @@ -119,16 +116,7 @@ func (s *ClientStatusDiscoveryServer) buildClientStatusRespForReq(req *v3statusp
return nil, status.Errorf(codes.InvalidArgument, "node_matchers are not supported, request contains node_matchers: %v", req.NodeMatchers)
}

dump := s.xdsClient.DumpResources()
ret := &v3statuspb.ClientStatusResponse{
Config: []*v3statuspb.ClientConfig{
{
Node: s.xdsClient.BootstrapConfig().NodeProto,
GenericXdsConfigs: dumpToGenericXdsConfig(dump),
},
},
}
return ret, nil
return s.xdsClient.DumpResources()
}

// Close cleans up the resources.
Expand All @@ -137,45 +125,3 @@ func (s *ClientStatusDiscoveryServer) Close() {
s.xdsClientClose()
}
}

func dumpToGenericXdsConfig(dump map[string]map[string]xdsresource.UpdateWithMD) []*v3statuspb.ClientConfig_GenericXdsConfig {
var ret []*v3statuspb.ClientConfig_GenericXdsConfig
for typeURL, updates := range dump {
for name, update := range updates {
config := &v3statuspb.ClientConfig_GenericXdsConfig{
TypeUrl: typeURL,
Name: name,
VersionInfo: update.MD.Version,
XdsConfig: update.Raw,
LastUpdated: timestamppb.New(update.MD.Timestamp),
ClientStatus: serviceStatusToProto(update.MD.Status),
}
if errState := update.MD.ErrState; errState != nil {
config.ErrorState = &v3adminpb.UpdateFailureState{
LastUpdateAttempt: timestamppb.New(errState.Timestamp),
Details: errState.Err.Error(),
VersionInfo: errState.Version,
}
}
ret = append(ret, config)
}
}
return ret
}

func serviceStatusToProto(serviceStatus xdsresource.ServiceStatus) v3adminpb.ClientResourceStatus {
switch serviceStatus {
case xdsresource.ServiceStatusUnknown:
return v3adminpb.ClientResourceStatus_UNKNOWN
case xdsresource.ServiceStatusRequested:
return v3adminpb.ClientResourceStatus_REQUESTED
case xdsresource.ServiceStatusNotExist:
return v3adminpb.ClientResourceStatus_DOES_NOT_EXIST
case xdsresource.ServiceStatusACKed:
return v3adminpb.ClientResourceStatus_ACKED
case xdsresource.ServiceStatusNACKed:
return v3adminpb.ClientResourceStatus_NACKED
default:
return v3adminpb.ClientResourceStatus_UNKNOWN
}
}
48 changes: 40 additions & 8 deletions xds/internal/xdsclient/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ import (
"google.golang.org/grpc/xds/internal/xdsclient/transport"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"

v3adminpb "github.com/envoyproxy/go-control-plane/envoy/admin/v3"
v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
)

type watchState int
Expand Down Expand Up @@ -586,26 +590,54 @@ func (a *authority) reportLoad() (*load.Store, func()) {
return a.transport.ReportLoad()
}

func (a *authority) dumpResources() map[string]map[string]xdsresource.UpdateWithMD {
func (a *authority) dumpResources() ([]*v3statuspb.ClientConfig_GenericXdsConfig, error) {
a.resourcesMu.Lock()
defer a.resourcesMu.Unlock()

dump := make(map[string]map[string]xdsresource.UpdateWithMD)
var ret []*v3statuspb.ClientConfig_GenericXdsConfig
for rType, resourceStates := range a.resources {
states := make(map[string]xdsresource.UpdateWithMD)
typeURL := rType.TypeURL()
for name, state := range resourceStates {
var raw *anypb.Any
if state.cache != nil {
raw = state.cache.Raw()
}
states[name] = xdsresource.UpdateWithMD{
MD: state.md,
Raw: raw,
config := &v3statuspb.ClientConfig_GenericXdsConfig{
TypeUrl: typeURL,
Name: name,
VersionInfo: state.md.Version,
XdsConfig: raw,
LastUpdated: timestamppb.New(state.md.Timestamp),
ClientStatus: serviceStatusToProto(state.md.Status),
}
if errState := state.md.ErrState; errState != nil {
config.ErrorState = &v3adminpb.UpdateFailureState{
LastUpdateAttempt: timestamppb.New(errState.Timestamp),
Details: errState.Err.Error(),
VersionInfo: errState.Version,
}
}
ret = append(ret, config)
}
dump[rType.TypeURL()] = states
}
return dump
return ret, nil
}

func serviceStatusToProto(serviceStatus xdsresource.ServiceStatus) v3adminpb.ClientResourceStatus {
switch serviceStatus {
case xdsresource.ServiceStatusUnknown:
return v3adminpb.ClientResourceStatus_UNKNOWN
case xdsresource.ServiceStatusRequested:
return v3adminpb.ClientResourceStatus_REQUESTED
case xdsresource.ServiceStatusNotExist:
return v3adminpb.ClientResourceStatus_DOES_NOT_EXIST
case xdsresource.ServiceStatusACKed:
return v3adminpb.ClientResourceStatus_ACKED
case xdsresource.ServiceStatusNACKed:
return v3adminpb.ClientResourceStatus_NACKED
default:
return v3adminpb.ClientResourceStatus_UNKNOWN
}
}

func combineErrors(rType string, topLevelErrors []error, perResourceErrors map[string]error) error {
Expand Down
4 changes: 3 additions & 1 deletion xds/internal/xdsclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/load"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"

v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
)

// XDSClient is a full fledged gRPC client which queries a set of discovery APIs
Expand All @@ -48,7 +50,7 @@ type XDSClient interface {

// DumpResources returns the status of the xDS resources. Returns a map of
// resource type URLs to a map of resource names to resource state.
DumpResources() map[string]map[string]xdsresource.UpdateWithMD
DumpResources() (*v3statuspb.ClientStatusResponse, error)

ReportLoad(*bootstrap.ServerConfig) (*load.Store, func())

Expand Down
43 changes: 19 additions & 24 deletions xds/internal/xdsclient/clientimpl_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,30 @@
package xdsclient

import (
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
)

func appendMaps(dst, src map[string]map[string]xdsresource.UpdateWithMD) {
// Iterate through the resource types.
for rType, srcResources := range src {
// Lookup/create the resource type specific map in the destination.
dstResources := dst[rType]
if dstResources == nil {
dstResources = make(map[string]xdsresource.UpdateWithMD)
dst[rType] = dstResources
}

// Iterate through the resources within the resource type in the source,
// and copy them over to the destination.
for name, update := range srcResources {
dstResources[name] = update
}
}
}

// DumpResources returns the status and contents of all xDS resources.
func (c *clientImpl) DumpResources() map[string]map[string]xdsresource.UpdateWithMD {
func (c *clientImpl) DumpResources() (*v3statuspb.ClientStatusResponse, error) {
c.authorityMu.Lock()
defer c.authorityMu.Unlock()
dumps := make(map[string]map[string]xdsresource.UpdateWithMD)

var retCfg []*v3statuspb.ClientConfig_GenericXdsConfig
for _, a := range c.authorities {
dump := a.dumpResources()
appendMaps(dumps, dump)
cfg, err := a.dumpResources()
if err != nil {
return nil, err
}
retCfg = append(retCfg, cfg...)
}
return dumps

return &v3statuspb.ClientStatusResponse{
Config: []*v3statuspb.ClientConfig{
{
// TODO: Populate ClientScope. Need to update go-control-plane dependency.
Node: c.config.NodeProto,
GenericXdsConfigs: retCfg,
},
},
}, nil
}
Loading

0 comments on commit a75dfa6

Please sign in to comment.