diff --git a/docs/proposals/control-data-plane-split/graph-conns.png b/docs/proposals/control-data-plane-split/graph-conns.png index bb41cd488e..b383363917 100644 Binary files a/docs/proposals/control-data-plane-split/graph-conns.png and b/docs/proposals/control-data-plane-split/graph-conns.png differ diff --git a/internal/framework/runnables/runnables.go b/internal/framework/runnables/runnables.go index d960475008..4c8aac5460 100644 --- a/internal/framework/runnables/runnables.go +++ b/internal/framework/runnables/runnables.go @@ -34,29 +34,33 @@ func (r *LeaderOrNonLeader) NeedLeaderElection() bool { return false } -// EnableAfterBecameLeader is a Runnable that will call the enable function when the current instance becomes +// CallFunctionsAfterBecameLeader is a Runnable that will call the given functions when the current instance becomes // the leader. -type EnableAfterBecameLeader struct { - enable func(context.Context) +type CallFunctionsAfterBecameLeader struct { + enableFunctions []func(context.Context) } var ( - _ manager.LeaderElectionRunnable = &EnableAfterBecameLeader{} - _ manager.Runnable = &EnableAfterBecameLeader{} + _ manager.LeaderElectionRunnable = &CallFunctionsAfterBecameLeader{} + _ manager.Runnable = &CallFunctionsAfterBecameLeader{} ) -// NewEnableAfterBecameLeader creates a new EnableAfterBecameLeader Runnable. -func NewEnableAfterBecameLeader(enable func(context.Context)) *EnableAfterBecameLeader { - return &EnableAfterBecameLeader{ - enable: enable, +// NewCallFunctionsAfterBecameLeader creates a new CallFunctionsAfterBecameLeader Runnable. +func NewCallFunctionsAfterBecameLeader( + enableFunctions []func(context.Context), +) *CallFunctionsAfterBecameLeader { + return &CallFunctionsAfterBecameLeader{ + enableFunctions: enableFunctions, } } -func (j *EnableAfterBecameLeader) Start(ctx context.Context) error { - j.enable(ctx) +func (j *CallFunctionsAfterBecameLeader) Start(ctx context.Context) error { + for _, f := range j.enableFunctions { + f(ctx) + } return nil } -func (j *EnableAfterBecameLeader) NeedLeaderElection() bool { +func (j *CallFunctionsAfterBecameLeader) NeedLeaderElection() bool { return true } diff --git a/internal/framework/runnables/runnables_test.go b/internal/framework/runnables/runnables_test.go index 9f34d9ccba..7a9b8968ba 100644 --- a/internal/framework/runnables/runnables_test.go +++ b/internal/framework/runnables/runnables_test.go @@ -23,19 +23,25 @@ func TestLeaderOrNonLeader(t *testing.T) { g.Expect(leaderOrNonLeader.NeedLeaderElection()).To(BeFalse()) } -func TestEnableAfterBecameLeader(t *testing.T) { +func TestCallFunctionsAfterBecameLeader(t *testing.T) { t.Parallel() - enabled := false - enableAfterBecameLeader := NewEnableAfterBecameLeader(func(_ context.Context) { - enabled = true + statusUpdaterEnabled := false + healthCheckEnableLeader := false + eventHandlerEnabled := false + + callFunctionsAfterBecameLeader := NewCallFunctionsAfterBecameLeader([]func(ctx context.Context){ + func(_ context.Context) { statusUpdaterEnabled = true }, + func(_ context.Context) { healthCheckEnableLeader = true }, + func(_ context.Context) { eventHandlerEnabled = true }, }) g := NewWithT(t) - g.Expect(enableAfterBecameLeader.NeedLeaderElection()).To(BeTrue()) - g.Expect(enabled).To(BeFalse()) + g.Expect(callFunctionsAfterBecameLeader.NeedLeaderElection()).To(BeTrue()) - err := enableAfterBecameLeader.Start(context.Background()) + err := callFunctionsAfterBecameLeader.Start(context.Background()) g.Expect(err).ToNot(HaveOccurred()) - g.Expect(enabled).To(BeTrue()) + g.Expect(statusUpdaterEnabled).To(BeTrue()) + g.Expect(healthCheckEnableLeader).To(BeTrue()) + g.Expect(eventHandlerEnabled).To(BeTrue()) } diff --git a/internal/mode/static/handler.go b/internal/mode/static/handler.go index 1020f11e03..84a658ae4e 100644 --- a/internal/mode/static/handler.go +++ b/internal/mode/static/handler.go @@ -167,13 +167,33 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Log changeType, gr := h.cfg.processor.Process() - // Once we've processed resources on startup and built our first graph, mark the Pod as ready. - if !h.cfg.graphBuiltHealthChecker.ready { - h.cfg.graphBuiltHealthChecker.setAsReady() + // Once we've processed resources on startup and built our first graph, mark the Pod as having built the graph. + if !h.cfg.graphBuiltHealthChecker.graphBuilt { + h.cfg.graphBuiltHealthChecker.setGraphBuilt() } - // TODO(sberman): hardcode this deployment name until we support provisioning data planes - // If no deployments exist, we should just return without doing anything. + // if this Pod is not the leader or does not have the leader lease yet, + // the nginx conf should not be updated. + if !h.cfg.graphBuiltHealthChecker.leader { + return + } + + h.sendNginxConfig(ctx, logger, gr, changeType) +} + +func (h *eventHandlerImpl) eventHandlerEnable(ctx context.Context) { + // Latest graph is guaranteed to not be nil since the leader election process takes longer than + // the initial call to HandleEventBatch when NGF starts up. And GatewayClass will typically always exist which + // triggers an event. + h.sendNginxConfig(ctx, h.cfg.logger, h.cfg.processor.GetLatestGraph(), state.ClusterStateChange) +} + +func (h *eventHandlerImpl) sendNginxConfig( + ctx context.Context, + logger logr.Logger, + gr *graph.Graph, + changeType state.ChangeType, +) { deploymentName := types.NamespacedName{ Name: "tmp-nginx-deployment", Namespace: h.cfg.gatewayPodConfig.Namespace, diff --git a/internal/mode/static/handler_test.go b/internal/mode/static/handler_test.go index fcde4e7f25..ed6414731c 100644 --- a/internal/mode/static/handler_test.go +++ b/internal/mode/static/handler_test.go @@ -126,7 +126,9 @@ var _ = Describe("eventHandler", func() { metricsCollector: collectors.NewControllerNoopCollector(), updateGatewayClassStatus: true, }) - Expect(handler.cfg.graphBuiltHealthChecker.ready).To(BeFalse()) + Expect(handler.cfg.graphBuiltHealthChecker.graphBuilt).To(BeFalse()) + + handler.cfg.graphBuiltHealthChecker.leader = true }) AfterEach(func() { @@ -161,7 +163,7 @@ var _ = Describe("eventHandler", func() { }) AfterEach(func() { - Expect(handler.cfg.graphBuiltHealthChecker.ready).To(BeTrue()) + Expect(handler.cfg.graphBuiltHealthChecker.graphBuilt).To(BeTrue()) }) When("a batch has one event", func() { @@ -484,22 +486,36 @@ var _ = Describe("eventHandler", func() { Expect(gr.LatestReloadResult.Error.Error()).To(Equal("status error")) }) - It("should set the health checker status properly", func() { + It("should update nginx conf only when leader", func() { + ctx := context.Background() + handler.cfg.graphBuiltHealthChecker.leader = false + e := &events.UpsertEvent{Resource: &gatewayv1.HTTPRoute{}} batch := []interface{}{e} readyChannel := handler.cfg.graphBuiltHealthChecker.getReadyCh() fakeProcessor.ProcessReturns(state.ClusterStateChange, &graph.Graph{}) - Expect(handler.cfg.graphBuiltHealthChecker.readyCheck(nil)).ToNot(Succeed()) handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch) + // graph is built, but since the graphBuiltHealthChecker.leader is false, configuration isn't created and + // the readyCheck fails + Expect(handler.cfg.graphBuiltHealthChecker.graphBuilt).To(BeTrue()) + Expect(handler.GetLatestConfiguration()).To(BeNil()) + Expect(handler.cfg.graphBuiltHealthChecker.readyCheck(nil)).ToNot(Succeed()) + Expect(readyChannel).ShouldNot(BeClosed()) + + // Once the pod becomes leader, these two functions will be called through the runnables we set in the manager + handler.cfg.graphBuiltHealthChecker.setAsLeader(ctx) + handler.eventHandlerEnable(ctx) + + // nginx conf has been set dcfg := dataplane.GetDefaultConfiguration(&graph.Graph{}, 1) Expect(helpers.Diff(handler.GetLatestConfiguration(), &dcfg)).To(BeEmpty()) - Expect(readyChannel).To(BeClosed()) - + // ready check is also set Expect(handler.cfg.graphBuiltHealthChecker.readyCheck(nil)).To(Succeed()) + Expect(handler.cfg.graphBuiltHealthChecker.getReadyCh()).To(BeClosed()) }) It("should panic for an unknown event type", func() { diff --git a/internal/mode/static/health.go b/internal/mode/static/health.go index a0fe4e9b59..4993b0b40e 100644 --- a/internal/mode/static/health.go +++ b/internal/mode/static/health.go @@ -1,9 +1,17 @@ package static import ( + "context" "errors" + "fmt" + "net" "net/http" "sync" + "time" + + "sigs.k8s.io/controller-runtime/pkg/manager" + + "github.com/nginx/nginx-gateway-fabric/internal/mode/static/config" ) // newGraphBuiltHealthChecker creates a new graphBuiltHealthChecker. @@ -13,37 +21,94 @@ func newGraphBuiltHealthChecker() *graphBuiltHealthChecker { } } -// graphBuiltHealthChecker is used to check if the initial graph is built and the NGF Pod is ready. +// graphBuiltHealthChecker is used to check if the NGF Pod is ready. The NGF Pod is ready if the initial graph has +// been built and if it is leader. type graphBuiltHealthChecker struct { // readyCh is a channel that is initialized in newGraphBuiltHealthChecker and represents if the NGF Pod is ready. - readyCh chan struct{} - lock sync.RWMutex - ready bool + readyCh chan struct{} + lock sync.RWMutex + graphBuilt bool + leader bool +} + +// createHealthProbe creates a Server runnable to serve as our health and readiness checker. +func createHealthProbe(cfg config.Config, healthChecker *graphBuiltHealthChecker) (manager.Server, error) { + // we chose to create our own health probe server instead of using the controller-runtime one because + // of repetitive log which would flood our logs on non-ready non-leader NGF Pods. This health probe is + // similar to the controller-runtime's health probe. + + mux := http.NewServeMux() + + // copy of controller-runtime sane defaults for new http.Server + s := &http.Server{ + Handler: mux, + MaxHeaderBytes: 1 << 20, + IdleTimeout: 90 * time.Second, // matches http.DefaultTransport keep-alive timeout + ReadHeaderTimeout: 32 * time.Second, + } + + mux.HandleFunc(readinessEndpointName, healthChecker.readyHandler) + + ln, err := net.Listen("tcp", fmt.Sprintf(":%d", cfg.HealthConfig.Port)) + if err != nil { + return manager.Server{}, + fmt.Errorf("error listening on %s: %w", fmt.Sprintf(":%d", cfg.HealthConfig.Port), err) + } + + return manager.Server{ + Name: "health probe", + Server: s, + Listener: ln, + }, nil +} + +func (h *graphBuiltHealthChecker) readyHandler(resp http.ResponseWriter, req *http.Request) { + if err := h.readyCheck(req); err != nil { + resp.WriteHeader(http.StatusServiceUnavailable) + } else { + resp.WriteHeader(http.StatusOK) + } } // readyCheck returns the ready-state of the Pod. It satisfies the controller-runtime Checker type. -// We are considered ready after the first graph is built. +// We are considered ready after the first graph is built and if the NGF Pod is leader. func (h *graphBuiltHealthChecker) readyCheck(_ *http.Request) error { h.lock.RLock() defer h.lock.RUnlock() - if !h.ready { - return errors.New("control plane is not yet ready") + if !h.leader { + return errors.New("this Pod is not currently leader") + } + + if !h.graphBuilt { + return errors.New("control plane initial graph has not been built") } return nil } -// setAsReady marks the health check as ready. -func (h *graphBuiltHealthChecker) setAsReady() { +// setGraphBuilt marks the health check as having the initial graph built. +func (h *graphBuiltHealthChecker) setGraphBuilt() { h.lock.Lock() defer h.lock.Unlock() - h.ready = true - close(h.readyCh) + h.graphBuilt = true } // getReadyCh returns a read-only channel, which determines if the NGF Pod is ready. func (h *graphBuiltHealthChecker) getReadyCh() <-chan struct{} { return h.readyCh } + +// setAsLeader marks the health check as leader. +func (h *graphBuiltHealthChecker) setAsLeader(_ context.Context) { + h.lock.Lock() + defer h.lock.Unlock() + + h.leader = true + + // setGraphBuilt should already have been called when processing the resources on startup because the leader + // election process takes longer than the initial call to HandleEventBatch. Thus, the NGF Pod should be marked as + // ready and have this channel be closed. + close(h.readyCh) +} diff --git a/internal/mode/static/health_test.go b/internal/mode/static/health_test.go index 7246283ed9..3505479d7d 100644 --- a/internal/mode/static/health_test.go +++ b/internal/mode/static/health_test.go @@ -1,17 +1,99 @@ package static import ( + "context" + "errors" + "net" + "net/http" + "net/http/httptest" "testing" . "github.com/onsi/gomega" + + "github.com/nginx/nginx-gateway-fabric/internal/mode/static/config" ) func TestReadyCheck(t *testing.T) { t.Parallel() g := NewWithT(t) healthChecker := newGraphBuiltHealthChecker() - g.Expect(healthChecker.readyCheck(nil)).ToNot(Succeed()) - healthChecker.ready = true + g.Expect(healthChecker.readyCheck(nil)).To(MatchError(errors.New("this Pod is not currently leader"))) + + healthChecker.graphBuilt = true + g.Expect(healthChecker.readyCheck(nil)).To(MatchError(errors.New("this Pod is not currently leader"))) + + healthChecker.graphBuilt = false + healthChecker.leader = true + g.Expect(healthChecker.readyCheck(nil)). + To(MatchError(errors.New("control plane initial graph has not been built"))) + + healthChecker.graphBuilt = true g.Expect(healthChecker.readyCheck(nil)).To(Succeed()) } + +func TestSetAsLeader(t *testing.T) { + t.Parallel() + g := NewWithT(t) + healthChecker := newGraphBuiltHealthChecker() + + g.Expect(healthChecker.leader).To(BeFalse()) + g.Expect(healthChecker.readyCh).ShouldNot(BeClosed()) + + healthChecker.setAsLeader(context.Background()) + + g.Expect(healthChecker.leader).To(BeTrue()) + g.Expect(healthChecker.readyCh).To(BeClosed()) +} + +func TestSetGraphBuilt(t *testing.T) { + t.Parallel() + g := NewWithT(t) + healthChecker := newGraphBuiltHealthChecker() + + g.Expect(healthChecker.graphBuilt).To(BeFalse()) + + healthChecker.setGraphBuilt() + + g.Expect(healthChecker.graphBuilt).To(BeTrue()) +} + +func TestReadyHandler(t *testing.T) { + t.Parallel() + g := NewWithT(t) + healthChecker := newGraphBuiltHealthChecker() + + r := httptest.NewRequest(http.MethodGet, "/readyz", nil) + w := httptest.NewRecorder() + + healthChecker.readyHandler(w, r) + g.Expect(w.Result().StatusCode).To(Equal(http.StatusServiceUnavailable)) + + healthChecker.graphBuilt = true + healthChecker.leader = true + + w = httptest.NewRecorder() + healthChecker.readyHandler(w, r) + g.Expect(w.Result().StatusCode).To(Equal(http.StatusOK)) +} + +func TestCreateHealthProbe(t *testing.T) { + t.Parallel() + g := NewWithT(t) + + healthChecker := newGraphBuiltHealthChecker() + + cfg := config.Config{HealthConfig: config.HealthConfig{Port: 100000}} + _, err := createHealthProbe(cfg, healthChecker) + g.Expect(err).To(MatchError("error listening on :100000: listen tcp: address 100000: invalid port")) + + cfg = config.Config{HealthConfig: config.HealthConfig{Port: 8081}} + hp, err := createHealthProbe(cfg, healthChecker) + g.Expect(err).ToNot(HaveOccurred()) + + addr, ok := (hp.Listener.Addr()).(*net.TCPAddr) + g.Expect(ok).To(BeTrue()) + + g.Expect(addr.Port).To(Equal(cfg.HealthConfig.Port)) + g.Expect(hp.Server).ToNot(BeNil()) +} diff --git a/internal/mode/static/manager.go b/internal/mode/static/manager.go index e6959e6609..31574a9f64 100644 --- a/internal/mode/static/manager.go +++ b/internal/mode/static/manager.go @@ -75,6 +75,8 @@ const ( plusClientCertField = "tls.crt" plusClientKeyField = "tls.key" grpcServerPort = 8443 + // defined in our deployment.yaml. + readinessEndpointName = "/readyz" ) var scheme = runtime.NewScheme() @@ -245,8 +247,12 @@ func StartManager(cfg config.Config) error { return fmt.Errorf("cannot register event loop: %w", err) } - if err = mgr.Add(runnables.NewEnableAfterBecameLeader(groupStatusUpdater.Enable)); err != nil { - return fmt.Errorf("cannot register status updater: %w", err) + if err = mgr.Add(runnables.NewCallFunctionsAfterBecameLeader([]func(context.Context){ + groupStatusUpdater.Enable, + healthChecker.setAsLeader, + eventHandler.eventHandlerEnable, + })); err != nil { + return fmt.Errorf("cannot register functions that get called after Pod becomes leader: %w", err) } if cfg.ProductTelemetryConfig.Enabled { @@ -274,6 +280,7 @@ func StartManager(cfg config.Config) error { } cfg.Logger.Info("Starting manager") + cfg.Logger.Info("NGINX Gateway Fabric Pod will be marked as unready until it has the leader lease") go func() { <-ctx.Done() cfg.Logger.Info("Shutting down") @@ -326,10 +333,6 @@ func createManager(cfg config.Config, healthChecker *graphBuiltHealthChecker) (m }, } - if cfg.HealthConfig.Enabled { - options.HealthProbeBindAddress = fmt.Sprintf(":%d", cfg.HealthConfig.Port) - } - clusterCfg := ctlr.GetConfigOrDie() clusterCfg.Timeout = clusterTimeout @@ -339,8 +342,13 @@ func createManager(cfg config.Config, healthChecker *graphBuiltHealthChecker) (m } if cfg.HealthConfig.Enabled { - if err := mgr.AddReadyzCheck("readyz", healthChecker.readyCheck); err != nil { - return nil, fmt.Errorf("error adding ready check: %w", err) + healthProbeServer, err := createHealthProbe(cfg, healthChecker) + if err != nil { + return nil, fmt.Errorf("error creating health probe: %w", err) + } + + if err := mgr.Add(&healthProbeServer); err != nil { + return nil, fmt.Errorf("error adding health probe: %w", err) } }