diff --git a/cmd/main/main.go b/cmd/main/main.go index 6bb14e2..36c3a0b 100644 --- a/cmd/main/main.go +++ b/cmd/main/main.go @@ -175,12 +175,10 @@ func main() { cs.Stop() - drainPeriod, err := time.ParseDuration(*config.Get().ConfigDrainPeriod) - if err != nil { log.WithError(err).Error() } else { - time.Sleep(drainPeriod) + time.Sleep(*config.Get().ConfigDrainPeriod) } configstore.StoreMap.Delete(nodeID) @@ -263,13 +261,8 @@ func startGRPC(grpcServer *grpc.Server, lis net.Listener) { // sync all endpoints in configs with endpointstore. func syncManual() { - WaitTime, err := time.ParseDuration(*config.Get().EndpointCheckPeriod) - if err != nil { - log.WithError(err).Fatal() - } - for { - time.Sleep(WaitTime) + time.Sleep(*config.Get().EndpointCheckPeriod) configstore.StoreMap.Range(func(k, v interface{}) bool { cs, ok := v.(*configstore.ConfigStore) diff --git a/config/test1-id.yaml b/config/test1-id.yaml index 628559b..3939b3d 100644 --- a/config/test1-id.yaml +++ b/config/test1-id.yaml @@ -190,10 +190,8 @@ data: pass_through_mode: false headers: - name: ":path" - exact_match: "/healthz" - # need update of github.com/envoyproxy/go-control-plane - # string_match: - # exact: "/healthz" + string_match: + exact: "/healthz" - name: envoy.filters.http.router routes: @@ -238,25 +236,24 @@ data: weight: 50 - name: local_service2 weight: 50 - # need update of github.com/envoyproxy/go-control-plane - # - match: - # prefix: "/test-exact-match" - # headers: - # - name: "create-xhprof-of-request" - # string_match: - # exact: "true" - # route: - # cluster: paket-xhprof - # - match: - # path: "/test-regex-match" - # headers: - # - name: "cookie" - # string_match: - # safe_regex: - # google_re2: {} - # regex: ".*?(CanaryUser=true).*?" - # route: - # cluster: local_service1 + - match: + prefix: "/test-exact-match" + headers: + - name: "create-xhprof-of-request" + string_match: + exact: "true" + route: + cluster: paket-xhprof + - match: + path: "/test-regex-match" + headers: + - name: "cookie" + string_match: + safe_regex: + google_re2: {} + regex: ".*?(CanaryUser=true).*?" + route: + cluster: local_service1 - match: prefix: "/no-ratelimits" route: diff --git a/go.mod b/go.mod index 95e66be..a2577cf 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.17 require ( github.com/census-instrumentation/opencensus-proto v0.3.0 // indirect - github.com/envoyproxy/go-control-plane v0.9.9 + github.com/envoyproxy/go-control-plane v0.9.10-0.20211025154310-e7fb5d0c57d2 github.com/envoyproxy/protoc-gen-validate v0.6.1 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/uuid v1.3.0 @@ -41,7 +41,7 @@ require ( github.com/Masterminds/sprig v2.22.0+incompatible // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.1 // indirect - github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed // indirect + github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/getsentry/sentry-go v0.9.0 // indirect github.com/go-logr/logr v0.4.0 // indirect diff --git a/go.sum b/go.sum index 81e2ca0..c1d01ec 100644 --- a/go.sum +++ b/go.sum @@ -81,8 +81,9 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= -github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed h1:OZmjad4L3H8ncOIR8rnb5MREYqG8ixi5+WbeUsquF0c= github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe h1:QJDJubh0OEcpeGjC7/8uF9tt4e39U/Ya1uyK+itnNPQ= +github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/codegangsta/inject v0.0.0-20150114235600-33e0aa1cb7c0/go.mod h1:4Zcjuz89kmFXt9morQgcfYZAYZ5n8WHjt81YYWIwtTM= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= @@ -107,8 +108,8 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= -github.com/envoyproxy/go-control-plane v0.9.9 h1:vQLjymTobffN2R0F8eTqw6q7iozfRO5Z0m+/4Vw+/uA= -github.com/envoyproxy/go-control-plane v0.9.9/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= +github.com/envoyproxy/go-control-plane v0.9.10-0.20211025154310-e7fb5d0c57d2 h1:KUDP9rxiUW3Jk91IL9ZTOH3tjfeFNy64nnkAaWe/1qQ= +github.com/envoyproxy/go-control-plane v0.9.10-0.20211025154310-e7fb5d0c57d2/go.mod h1:utjuSZ1DPHuYf0cTZ8WEsaQf5bwmT1TZiWaQjpJtBF0= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v0.6.1 h1:4CF52PCseTFt4bE+Yk3dIpdVi7XWuPVMhPtm4FaIJPM= github.com/envoyproxy/protoc-gen-validate v0.6.1/go.mod h1:txg5va2Qkip90uYoSKH+nkAAmXrb2j3iq4FLwdrCbXQ= @@ -397,8 +398,9 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= @@ -573,6 +575,7 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201112073958-5cba982894dd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210104204734-6f8348627aad/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/pkg/config/config.go b/pkg/config/config.go index a9b8b1d..fcb2f2a 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -24,7 +24,12 @@ import ( "gopkg.in/yaml.v3" ) -const sslRotationPeriodDefault = 1 * time.Hour +const ( + sslRotationPeriodDefault = 1 * time.Hour + endpointCheckPeriodDefault = 60 * time.Second + endpointTTLDefault = 30 * time.Second + configDrainPeriodDefault = 5 * time.Second +) type Type struct { LogLevel *string `yaml:"logLevel"` @@ -40,8 +45,9 @@ type Type struct { WebHTTPAddress *string `yaml:"webHttpAddress"` WebHTTPSAddress *string `yaml:"webHttpsAddress"` NodeZoneLabel *string `yaml:"nodeZoneLabel"` - ConfigDrainPeriod *string `yaml:"configDrainPeriod"` - EndpointCheckPeriod *string `yaml:"endpointCheckPeriod"` + ConfigDrainPeriod *time.Duration `yaml:"configDrainPeriod"` + EndpointCheckPeriod *time.Duration `yaml:"endpointCheckPeriod"` + EndpointTTL *time.Duration `yaml:"endpointTtl"` SentryDSN *string `yaml:"sentryDsn"` SSLName *string `yaml:"sslName"` SSLCrt *string `yaml:"sslCrt"` @@ -67,8 +73,9 @@ var config = Type{ WebHTTPSAddress: flag.String("web.https.address", ":18081", "https web address"), WebHTTPAddress: flag.String("web.http.address", ":18082", "http web address"), NodeZoneLabel: flag.String("node.label.zone", "topology.kubernetes.io/zone", "node label region"), - ConfigDrainPeriod: flag.String("config.drainPeriod", "5s", "drain period"), - EndpointCheckPeriod: flag.String("endpoint.checkPeriod", "60s", "check period"), + ConfigDrainPeriod: flag.Duration("config.drainPeriod", configDrainPeriodDefault, "drain period"), + EndpointCheckPeriod: flag.Duration("endpoint.checkPeriod", endpointCheckPeriodDefault, "check period"), + EndpointTTL: flag.Duration("endpoint.ttl", endpointTTLDefault, "xDS TTL"), SentryDSN: flag.String("sentry.dsn", "", "sentry DSN"), SSLName: flag.String("ssl.name", "envoy_control_plane_default", "name of certificate in envoy secrets"), //nolint:lll SSLCrt: flag.String("ssl.crt", "", "path to CA cert"), @@ -116,14 +123,6 @@ func CheckConfig() error { } } - if _, err := time.ParseDuration(*config.ConfigDrainPeriod); err != nil { - return errors.Wrap(err, "ParseDuration="+*config.ConfigDrainPeriod) - } - - if _, err := time.ParseDuration(*config.EndpointCheckPeriod); err != nil { - return errors.Wrap(err, "ParseDuration="+*config.EndpointCheckPeriod) - } - if len(*config.SSLCrt) > 0 { if _, err := os.Stat(*config.SSLCrt); os.IsNotExist(err) { return errors.Wrap(err, "ssl certificate error") diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 713cf4f..ae082e3 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -14,6 +14,7 @@ package config_test import ( "testing" + "time" "github.com/maksim-paskal/envoy-control-plane/pkg/config" ) @@ -28,4 +29,8 @@ func TestConfig(t *testing.T) { if want := "/some/test/path"; *config.Get().KubeConfigFile != want { t.Fatalf("KubeConfigFile != %s", want) } + + if want := 3 * time.Second; *config.Get().EndpointTTL != want { + t.Fatalf("EndpointTTL != %s", want) + } } diff --git a/pkg/config/config_test.yaml b/pkg/config/config_test.yaml index 21867c9..4b232ec 100644 --- a/pkg/config/config_test.yaml +++ b/pkg/config/config_test.yaml @@ -1 +1,2 @@ -kubeConfigFile: /some/test/path \ No newline at end of file +kubeConfigFile: /some/test/path +endpointTtl: 3s \ No newline at end of file diff --git a/pkg/configstore/configStore.go b/pkg/configstore/configStore.go index 7e8e197..6b78abd 100644 --- a/pkg/configstore/configStore.go +++ b/pkg/configstore/configStore.go @@ -51,7 +51,7 @@ type ConfigStore struct { ep *endpointstore.EndpointsStore KubernetesEndpoints sync.Map configEndpoints map[string][]*endpoint.LocalityLbEndpoints - lastEndpoints []types.Resource + lastEndpoints []types.ResourceWithTTL LastEndpointsArray []string ConfigStoreState int log *log.Entry @@ -155,7 +155,7 @@ func (cs *ConfigStore) Push() { return } - err = controlplane.SnapshotCache.SetSnapshot(cs.Config.ID, snap) + err = controlplane.SnapshotCache.SetSnapshot(cs.ctx, cs.Config.ID, snap) if err != nil { cs.log.WithError(err).Error() @@ -192,7 +192,7 @@ func (cs *ConfigStore) getConfigEndpoints() (map[string][]*endpoint.LocalityLbEn lbEndpoints := make(map[string][]*endpoint.LocalityLbEndpoints) for _, ep := range endpoints { - fixed, ok := ep.(*endpoint.ClusterLoadAssignment) + fixed, ok := ep.Resource.(*endpoint.ClusterLoadAssignment) if !ok { cs.log.WithError(errAssertion).Fatal("ep.(*endpoint.ClusterLoadAssignment)") } @@ -275,7 +275,7 @@ func (cs *ConfigStore) saveLastEndpoints() { }) isInvalidIP := false - publishEp := []types.Resource{} + publishEp := []types.ResourceWithTTL{} publishEpArray := []string{} // for reflect.DeepEqual for clusterName, ep := range lbEndpoints { @@ -301,9 +301,14 @@ func (cs *ConfigStore) saveLastEndpoints() { } } - publishEp = append(publishEp, &endpoint.ClusterLoadAssignment{ + clusterLoadAssignment := endpoint.ClusterLoadAssignment{ ClusterName: clusterName, Endpoints: ep, + } + + publishEp = append(publishEp, types.ResourceWithTTL{ + Resource: &clusterLoadAssignment, + TTL: appConfig.Get().EndpointTTL, }) } diff --git a/pkg/controlplane/callbacks.go b/pkg/controlplane/callbacks.go index de9ed20..36a420b 100644 --- a/pkg/controlplane/callbacks.go +++ b/pkg/controlplane/callbacks.go @@ -31,37 +31,34 @@ type callbacks struct { } func (cb *callbacks) Report() { - cb.mu.Lock() - defer cb.mu.Unlock() - if log.GetLevel() >= log.DebugLevel || *config.Get().LogAccess { - log.WithFields(log.Fields{"fetches": cb.fetches, "requests": cb.requests}).Info("cb.Report() callbacks") + log.WithFields(log.Fields{"fetches": cb.fetches, "requests": cb.requests}).Info("Report") } } -func (cb *callbacks) OnStreamOpen(ctx context.Context, id int64, typ string) error { +func (cb *callbacks) OnStreamOpen(ctx context.Context, streamID int64, typ string) error { metrics.GrpcOnStreamOpen.Inc() if log.GetLevel() >= log.DebugLevel || *config.Get().LogAccess { - log.Debugf("OnStreamOpen %d open for %s", id, typ) + log.WithField("streamID", streamID).Infof("OnStreamOpen==>%s", typ) } return nil } -func (cb *callbacks) OnStreamClosed(id int64) { +func (cb *callbacks) OnStreamClosed(streamID int64) { metrics.GrpcOnStreamClosed.Inc() if log.GetLevel() >= log.DebugLevel || *config.Get().LogAccess { - log.Debugf("OnStreamClosed %d closed", id) + log.WithField("streamID", streamID).Info("OnStreamClosed") } } -func (cb *callbacks) OnStreamRequest(id int64, r *discovery.DiscoveryRequest) error { +func (cb *callbacks) OnStreamRequest(streamID int64, r *discovery.DiscoveryRequest) error { metrics.GrpcOnStreamRequest.Inc() if log.GetLevel() >= log.DebugLevel || *config.Get().LogAccess { - log.Debugf("OnStreamRequest") + log.WithField("streamID", streamID).Info("OnStreamRequest") } cb.mu.Lock() @@ -76,15 +73,16 @@ func (cb *callbacks) OnStreamRequest(id int64, r *discovery.DiscoveryRequest) er return nil } -func (cb *callbacks) OnStreamResponse(id int64, r *discovery.DiscoveryRequest, w *discovery.DiscoveryResponse) { +func (cb *callbacks) OnStreamResponse(ctx context.Context, streamID int64, r *discovery.DiscoveryRequest, w *discovery.DiscoveryResponse) { //nolint:lll metrics.GrpcOnStreamResponse.Inc() if log.GetLevel() >= log.DebugLevel || *config.Get().LogAccess { - json, _ := protojson.Marshal(r) - log.Debugf("DiscoveryRequest=>\n%s\n", string(json)) + log := log.WithField("streamID", streamID) - json, _ = protojson.Marshal(w) - log.Debugf("DiscoveryResponse=>\n%s\n", string(json)) + discoveryRequest, _ := protojson.Marshal(r) + discoveryResponse, _ := protojson.Marshal(w) + + log.Infof("DiscoveryRequest=>%s\nDiscoveryResponse=>%s\n", string(discoveryRequest), string(discoveryResponse)) //nolint:lll } cb.Report() @@ -94,7 +92,9 @@ func (cb *callbacks) OnFetchRequest(ctx context.Context, req *discovery.Discover metrics.GrpcOnFetchRequest.Inc() if log.GetLevel() >= log.DebugLevel || *config.Get().LogAccess { - log.Debugf("OnFetchRequest...") + log := log.WithField("node", req.Node.Id) + + log.Info("OnFetchRequest") } cb.mu.Lock() @@ -113,11 +113,12 @@ func (cb *callbacks) OnFetchResponse(r *discovery.DiscoveryRequest, w *discovery metrics.GrpcOnFetchResponse.Inc() if log.GetLevel() >= log.DebugLevel || *config.Get().LogAccess { - json, _ := protojson.Marshal(r) - log.Debugf("DiscoveryRequest=>\n%s\n", string(json)) + log := log.WithField("node", r.Node.Id) - json, _ = protojson.Marshal(w) - log.Debugf("DiscoveryResponse=>\n%s\n", string(json)) + discoveryRequest, _ := protojson.Marshal(r) + discoveryResponse, _ := protojson.Marshal(w) + + log.Infof("DiscoveryRequest=>%s\nDiscoveryResponse=>%s\n", string(discoveryRequest), string(discoveryResponse)) //nolint:lll } } @@ -128,7 +129,7 @@ func (cb *callbacks) OnStreamDeltaRequest(streamID int64, req *discovery.DeltaDi log := log.WithField("streamID", streamID) json, _ := protojson.Marshal(req) - log.Debugf("DeltaDiscoveryRequest=>\n%s\n", string(json)) + log.Infof("DeltaDiscoveryRequest=>\n%s\n", string(json)) } return nil @@ -140,11 +141,10 @@ func (cb *callbacks) OnStreamDeltaResponse(streamID int64, req *discovery.DeltaD if log.GetLevel() >= log.DebugLevel || *config.Get().LogAccess { log := log.WithField("streamID", streamID) - json, _ := protojson.Marshal(req) - log.Debugf("DeltaDiscoveryRequest=>\n%s\n", string(json)) + deltaDiscoveryRequest, _ := protojson.Marshal(req) + deltaDiscoveryResponse, _ := protojson.Marshal(resp) - json, _ = protojson.Marshal(resp) - log.Debugf("DeltaDiscoveryResponse=>\n%s\n", string(json)) + log.Infof("DeltaDiscoveryRequest=>%s\nDeltaDiscoveryResponse=>%s\n", string(deltaDiscoveryRequest), string(deltaDiscoveryResponse)) //nolint:lll } } @@ -155,7 +155,7 @@ func (cb *callbacks) OnStreamDeltaRequestOnStreamDeltaRequest(streamID int64, re log := log.WithField("streamID", streamID) json, _ := protojson.Marshal(req) - log.Debugf("DeltaDiscoveryRequest=>\n%s\n", string(json)) + log.Infof("DeltaDiscoveryRequest=>\n%s\n", string(json)) } return nil @@ -167,7 +167,7 @@ func (cb *callbacks) OnDeltaStreamOpen(ctx context.Context, streamID int64, type if log.GetLevel() >= log.DebugLevel || *config.Get().LogAccess { log := log.WithField("streamID", streamID) - log.Debugf("typeURL=>\n%s\n", typeURL) + log.Infof("typeURL=>\n%s\n", typeURL) } return nil @@ -179,6 +179,6 @@ func (cb *callbacks) OnDeltaStreamClosed(streamID int64) { if log.GetLevel() >= log.DebugLevel || *config.Get().LogAccess { log := log.WithField("streamID", streamID) - log.Debugf("closed") + log.Infof("OnDeltaStreamClosed") } } diff --git a/pkg/controlplane/controlPlane.go b/pkg/controlplane/controlPlane.go index d4fa923..b193499 100644 --- a/pkg/controlplane/controlPlane.go +++ b/pkg/controlplane/controlPlane.go @@ -23,10 +23,18 @@ import ( secretservice "github.com/envoyproxy/go-control-plane/envoy/service/secret/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" xds "github.com/envoyproxy/go-control-plane/pkg/server/v3" + "github.com/maksim-paskal/envoy-control-plane/pkg/config" "google.golang.org/grpc" ) -var SnapshotCache cache.SnapshotCache = cache.NewSnapshotCache(false, cache.IDHash{}, &Logger{}) +// SnapshotCache create cache with heartbeat responses for resources with a TTL. +var SnapshotCache cache.SnapshotCache = cache.NewSnapshotCacheWithHeartbeating( + context.Background(), + false, + cache.IDHash{}, + &Logger{}, + *config.Get().EndpointTTL, +) type ControlPlane struct{} diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index e7170a0..981344a 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -23,6 +23,7 @@ import ( tls "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + resource "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "github.com/maksim-paskal/envoy-control-plane/pkg/certs" "github.com/maksim-paskal/envoy-control-plane/pkg/config" "github.com/maksim-paskal/utils-go" @@ -32,7 +33,7 @@ import ( "google.golang.org/protobuf/types/known/anypb" ) -func GetConfigSnapshot(version string, configType *config.ConfigType, endpoints []types.Resource, commonSecrets []tls.Secret) (cache.Snapshot, error) { //nolint: lll +func GetConfigSnapshot(version string, configType *config.ConfigType, endpoints []types.ResourceWithTTL, commonSecrets []tls.Secret) (cache.Snapshot, error) { //nolint: lll clusters, err := YamlToResources(configType.Clusters, cluster.Cluster{}) if err != nil { return cache.Snapshot{}, err @@ -62,21 +63,21 @@ func GetConfigSnapshot(version string, configType *config.ConfigType, endpoints } for i := range commonSecrets { - secrets = append(secrets, &commonSecrets[i]) + secrets = append(secrets, types.ResourceWithTTL{Resource: &commonSecrets[i]}) } - return cache.NewSnapshot( - version, - endpoints, - clusters, - routes, - listiners, - nil, - secrets, - ), nil + resources := make(map[string][]types.ResourceWithTTL) + + resources[resource.ClusterType] = clusters + resources[resource.RouteType] = routes + resources[resource.ListenerType] = listiners + resources[resource.SecretType] = secrets + resources[resource.EndpointType] = endpoints + + return cache.NewSnapshotWithTTLs(version, resources) } -func YamlToResources(yamlObj []interface{}, outType interface{}) ([]types.Resource, error) { +func YamlToResources(yamlObj []interface{}, outType interface{}) ([]types.ResourceWithTTL, error) { if len(yamlObj) == 0 { return nil, nil } @@ -95,7 +96,7 @@ func YamlToResources(yamlObj []interface{}, outType interface{}) ([]types.Resour return nil, errors.Wrap(err, "json.Unmarshal(jsonObj, &resources)") } - results := make([]types.Resource, len(resources)) + results := make([]types.ResourceWithTTL, len(resources)) for k, v := range resources { resourcesJSON, err := utils.GetJSONfromYAML(v) @@ -114,7 +115,8 @@ func YamlToResources(yamlObj []interface{}, outType interface{}) ([]types.Resour return nil, errors.Wrap(err, "cluster.Cluster") } - results[k] = &resource + results[k] = types.ResourceWithTTL{Resource: &resource} + case route.RouteConfiguration: resource := route.RouteConfiguration{} err = protojson.Unmarshal(resourcesJSON, &resource) @@ -125,7 +127,7 @@ func YamlToResources(yamlObj []interface{}, outType interface{}) ([]types.Resour return nil, errors.Wrap(err, "route.RouteConfiguration") } - results[k] = &resource + results[k] = types.ResourceWithTTL{Resource: &resource} case endpoint.ClusterLoadAssignment: resource := endpoint.ClusterLoadAssignment{} err = protojson.Unmarshal(resourcesJSON, &resource) @@ -136,7 +138,7 @@ func YamlToResources(yamlObj []interface{}, outType interface{}) ([]types.Resour return nil, errors.Wrap(err, "endpoint.ClusterLoadAssignment") } - results[k] = &resource + results[k] = types.ResourceWithTTL{Resource: &resource} case listener.Listener: resource := listener.Listener{} err = protojson.Unmarshal(resourcesJSON, &resource) @@ -147,7 +149,7 @@ func YamlToResources(yamlObj []interface{}, outType interface{}) ([]types.Resour return nil, errors.Wrap(err, "listener.Listener") } - results[k] = &resource + results[k] = types.ResourceWithTTL{Resource: &resource} case tls.Secret: resource := tls.Secret{} err = protojson.Unmarshal(resourcesJSON, &resource) @@ -158,7 +160,7 @@ func YamlToResources(yamlObj []interface{}, outType interface{}) ([]types.Resour return nil, errors.Wrap(err, "tls.Secret") } - results[k] = &resource + results[k] = types.ResourceWithTTL{Resource: &resource} default: return nil, errUnknownClass } @@ -229,9 +231,9 @@ func NewSecrets(dnsName string, validation interface{}) ([]tls.Secret, error) { } // remove require_client_certificate from all listeners. -func filterCertificates(listiners []types.Resource) error { +func filterCertificates(listiners []types.ResourceWithTTL) error { for _, listiner := range listiners { - c, ok := listiner.(*listener.Listener) + c, ok := listiner.Resource.(*listener.Listener) if !ok { return errUnknownClass } diff --git a/pkg/utils/utils_test.go b/pkg/utils/utils_test.go index 611318f..7793c4b 100644 --- a/pkg/utils/utils_test.go +++ b/pkg/utils/utils_test.go @@ -27,12 +27,14 @@ func TestGetConfigSnapshot(t *testing.T) { t.Parallel() c := config.ConfigType{} - r := []types.Resource{} + r := []types.ResourceWithTTL{} s := []tls.Secret{} - r = append(r, &endpoint.ClusterLoadAssignment{ + e := endpoint.ClusterLoadAssignment{ ClusterName: "clusterName", - }) + } + + r = append(r, types.ResourceWithTTL{Resource: &e}) version := uuid.New().String()