Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4238,6 +4238,7 @@ func (a *Agent) registerCache() {
a.cache.RegisterType(cachetype.CompiledDiscoveryChainName, &cachetype.CompiledDiscoveryChain{RPC: a})

a.cache.RegisterType(cachetype.GatewayServicesName, &cachetype.GatewayServices{RPC: a})

a.cache.RegisterType(cachetype.ServiceGatewaysName, &cachetype.ServiceGateways{RPC: a})

a.cache.RegisterType(cachetype.ConfigEntryListName, &cachetype.ConfigEntryList{RPC: a})
Expand All @@ -4257,6 +4258,8 @@ func (a *Agent) registerCache() {

a.cache.RegisterType(cachetype.PeeredUpstreamsName, &cachetype.PeeredUpstreams{RPC: a})

a.cache.RegisterType(cachetype.PeeringListName, &cachetype.Peerings{Client: a.rpcClientPeering})

a.registerEntCache()
}

Expand Down Expand Up @@ -4371,6 +4374,7 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources {
InternalServiceDump: proxycfgglue.CacheInternalServiceDump(a.cache),
LeafCertificate: proxycfgglue.CacheLeafCertificate(a.cache),
PeeredUpstreams: proxycfgglue.CachePeeredUpstreams(a.cache),
PeeringList: proxycfgglue.CachePeeringList(a.cache),
PreparedQuery: proxycfgglue.CachePrepraredQuery(a.cache),
ResolvedServiceConfig: proxycfgglue.CacheResolvedServiceConfig(a.cache),
ServiceList: proxycfgglue.CacheServiceList(a.cache),
Expand Down Expand Up @@ -4399,6 +4403,7 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources {
sources.IntentionUpstreams = proxycfgglue.ServerIntentionUpstreams(deps)
sources.IntentionUpstreamsDestination = proxycfgglue.ServerIntentionUpstreamsDestination(deps)
sources.InternalServiceDump = proxycfgglue.ServerInternalServiceDump(deps, proxycfgglue.CacheInternalServiceDump(a.cache))
sources.PeeringList = proxycfgglue.ServerPeeringList(deps)
sources.PeeredUpstreams = proxycfgglue.ServerPeeredUpstreams(deps)
sources.ResolvedServiceConfig = proxycfgglue.ServerResolvedServiceConfig(deps, proxycfgglue.CacheResolvedServiceConfig(a.cache))
sources.ServiceList = proxycfgglue.ServerServiceList(deps, proxycfgglue.CacheServiceList(a.cache))
Expand Down
63 changes: 63 additions & 0 deletions agent/cache-types/mock_PeeringLister_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

107 changes: 107 additions & 0 deletions agent/cache-types/peerings.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package cachetype

import (
"context"
"fmt"
"strconv"
"time"

external "github.com/hashicorp/consul/agent/grpc-external"
"github.com/hashicorp/consul/proto/pbpeering"
"github.com/mitchellh/hashstructure"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)

// PeeringListName is the recommended name for registration.
const PeeringListName = "peers"

type PeeringListRequest struct {
Request *pbpeering.PeeringListRequest
structs.QueryOptions
}

func (r *PeeringListRequest) CacheInfo() cache.RequestInfo {
info := cache.RequestInfo{
Token: r.Token,
Datacenter: "",
MinIndex: 0,
Timeout: 0,
MustRevalidate: false,

// OPTIMIZE(peering): Cache.notifyPollingQuery polls at this interval. We need to revisit how that polling works.
// Using an exponential backoff when the result hasn't changed may be preferable.
MaxAge: 1 * time.Second,
}

v, err := hashstructure.Hash([]interface{}{
r.Request.Partition,
}, nil)
if err == nil {
// If there is an error, we don't set the key. A blank key forces
// no cache for this request so the request is forwarded directly
// to the server.
info.Key = strconv.FormatUint(v, 10)
}

return info
}

// Peerings supports fetching the list of peers for a given partition or wildcard-specifier.
type Peerings struct {
RegisterOptionsNoRefresh
Client PeeringLister
}

//go:generate mockery --name PeeringLister --inpackage --filename mock_PeeringLister_test.go
type PeeringLister interface {
PeeringList(
ctx context.Context, in *pbpeering.PeeringListRequest, opts ...grpc.CallOption,
) (*pbpeering.PeeringListResponse, error)
}

func (t *Peerings) Fetch(_ cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
var result cache.FetchResult

// The request should be a PeeringListRequest.
// We do not need to make a copy of this request type like in other cache types
// because the RequestInfo is synthetic.
reqReal, ok := req.(*PeeringListRequest)
if !ok {
return result, fmt.Errorf(
"Internal cache failure: request wrong type: %T", req)
}

// Always allow stale - there's no point in hitting leader if the request is
// going to be served from cache and end up arbitrarily stale anyway. This
// allows cached service-discover to automatically read scale across all
// servers too.
reqReal.QueryOptions.SetAllowStale(true)

ctx, err := external.ContextWithQueryOptions(context.Background(), reqReal.QueryOptions)
if err != nil {
return result, err
}

// Fetch
reply, err := t.Client.PeeringList(ctx, reqReal.Request)
if err != nil {
// Return an empty result if the error is due to peering being disabled.
// This allows mesh gateways to receive an update and confirm that the watch is set.
if e, ok := status.FromError(err); ok && e.Code() == codes.FailedPrecondition {
result.Index = 1
result.Value = &pbpeering.PeeringListResponse{}
return result, nil
}
return result, err
}

result.Value = reply
result.Index = reply.Index

return result, nil
}
131 changes: 131 additions & 0 deletions agent/cache-types/peerings_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package cachetype

import (
"context"
"testing"
"time"

"github.com/mitchellh/copystructure"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/proto/pbpeering"
)

func TestPeerings(t *testing.T) {
client := NewMockPeeringLister(t)
typ := &Peerings{Client: client}

resp := &pbpeering.PeeringListResponse{
Index: 48,
Peerings: []*pbpeering.Peering{
{
Name: "peer1",
ID: "8ac403cf-6834-412f-9dfe-0ac6e69bd89f",
PeerServerAddresses: []string{"1.2.3.4"},
State: pbpeering.PeeringState_ACTIVE,
},
},
}

// Expect the proper call.
// This also returns the canned response above.
client.On("PeeringList", mock.Anything, mock.Anything).
Return(resp, nil)

// Fetch and assert against the result.
result, err := typ.Fetch(cache.FetchOptions{}, &PeeringListRequest{
Request: &pbpeering.PeeringListRequest{},
})
require.NoError(t, err)
require.Equal(t, cache.FetchResult{
Value: resp,
Index: 48,
}, result)
}

func TestPeerings_PeeringDisabled(t *testing.T) {
client := NewMockPeeringLister(t)
typ := &Peerings{Client: client}

var resp *pbpeering.PeeringListResponse

// Expect the proper call, but return the peering disabled error
client.On("PeeringList", mock.Anything, mock.Anything).
Return(resp, grpcstatus.Error(codes.FailedPrecondition, "peering must be enabled to use this endpoint"))

// Fetch and assert against the result.
result, err := typ.Fetch(cache.FetchOptions{}, &PeeringListRequest{
Request: &pbpeering.PeeringListRequest{},
})
require.NoError(t, err)
require.NotNil(t, result)
require.EqualValues(t, 1, result.Index)
require.NotNil(t, result.Value)
}

func TestPeerings_badReqType(t *testing.T) {
client := pbpeering.NewPeeringServiceClient(nil)
typ := &Peerings{Client: client}

// Fetch
_, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest(
t, cache.RequestInfo{Key: "foo", MinIndex: 64}))
require.Error(t, err)
require.Contains(t, err.Error(), "wrong type")
}

// This test asserts that we can continuously poll this cache type, given that it doesn't support blocking.
func TestPeerings_MultipleUpdates(t *testing.T) {
c := cache.New(cache.Options{})

client := NewMockPeeringLister(t)

// On each mock client call to PeeringList we will increment the index by 1
// to simulate new data arriving.
resp := &pbpeering.PeeringListResponse{
Index: uint64(0),
}

client.On("PeeringList", mock.Anything, mock.Anything).
Return(func(ctx context.Context, in *pbpeering.PeeringListRequest, opts ...grpc.CallOption) *pbpeering.PeeringListResponse {
resp.Index++
// Avoids triggering the race detection by copying the output
copyResp, err := copystructure.Copy(resp)
require.NoError(t, err)
output := copyResp.(*pbpeering.PeeringListResponse)
return output
}, nil)

c.RegisterType(PeeringListName, &Peerings{Client: client})

ch := make(chan cache.UpdateEvent)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
t.Cleanup(cancel)

require.NoError(t, c.Notify(ctx, PeeringListName, &PeeringListRequest{
Request: &pbpeering.PeeringListRequest{},
}, "updates", ch))

i := uint64(1)
for {
select {
case <-ctx.Done():
t.Fatal("context deadline exceeded")
return
case update := <-ch:
// Expect to receive updates for increasing indexes serially.
actual := update.Result.(*pbpeering.PeeringListResponse)
require.Equal(t, i, actual.Index)
i++

if i > 3 {
return
}
}
}
}
3 changes: 1 addition & 2 deletions agent/cache-types/trust_bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ func (t *TrustBundle) Fetch(_ cache.FetchOptions, req cache.Request) (cache.Fetc
reqReal.QueryOptions.SetAllowStale(true)

// Fetch
options := structs.QueryOptions{Token: reqReal.Token}
ctx, err := external.ContextWithQueryOptions(context.Background(), options)
ctx, err := external.ContextWithQueryOptions(context.Background(), reqReal.QueryOptions)
if err != nil {
return result, err
}
Expand Down
10 changes: 6 additions & 4 deletions agent/cache-types/trust_bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import (
"testing"
"time"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/proto/pbpeering"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/proto/pbpeering"
)

func TestTrustBundle(t *testing.T) {
Expand Down Expand Up @@ -93,11 +94,12 @@ func TestTrustBundle_MultipleUpdates(t *testing.T) {
for {
select {
case <-ctx.Done():
t.Fatal("context deadline exceeded")
return
case update := <-ch:
// Expect to receive updates for increasing indexes serially.
resp := update.Result.(*pbpeering.TrustBundleReadResponse)
require.Equal(t, i, resp.Index)
actual := update.Result.(*pbpeering.TrustBundleReadResponse)
require.Equal(t, i, actual.Index)
i++

if i > 3 {
Expand Down
3 changes: 1 addition & 2 deletions agent/cache-types/trust_bundles.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ func (t *TrustBundles) Fetch(_ cache.FetchOptions, req cache.Request) (cache.Fet
reqReal.QueryOptions.SetAllowStale(true)

// Fetch
options := structs.QueryOptions{Token: reqReal.Token}
ctx, err := external.ContextWithQueryOptions(context.Background(), options)
ctx, err := external.ContextWithQueryOptions(context.Background(), reqReal.QueryOptions)
if err != nil {
return result, err
}
Expand Down
1 change: 1 addition & 0 deletions agent/cache-types/trust_bundles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func TestTrustBundles_MultipleUpdates(t *testing.T) {
for {
select {
case <-ctx.Done():
t.Fatal("context deadline exceeded")
return
case update := <-ch:
// Expect to receive updates for increasing indexes serially.
Expand Down
Loading