Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CP/DP split: Add leader election #3092

Merged
merged 12 commits into from
Feb 10, 2025
Binary file modified docs/proposals/control-data-plane-split/graph-conns.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 4 additions & 0 deletions internal/framework/events/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,7 @@ type DeleteEvent struct {
// NamespacedName is the namespace & name of the deleted resource.
NamespacedName types.NamespacedName
}

// NewLeaderEvent represents an NGF Pod becoming leader. This is used to trigger the event handler to batch process
// events and update nginx conf when no resource has been changed.
type NewLeaderEvent struct{}
21 changes: 12 additions & 9 deletions internal/framework/runnables/runnables.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,29 +34,32 @@ func (r *LeaderOrNonLeader) NeedLeaderElection() bool {
return false
}

// EnableAfterBecameLeader is a Runnable that will call the enable function when the current instance becomes
// CallFunctionsAfterBecameLeader is a Runnable that will call the given functions when the current instance becomes
// the leader.
type EnableAfterBecameLeader struct {
type CallFunctionsAfterBecameLeader struct {
enable func(context.Context)
leader func()
}

var (
_ manager.LeaderElectionRunnable = &EnableAfterBecameLeader{}
_ manager.Runnable = &EnableAfterBecameLeader{}
_ manager.LeaderElectionRunnable = &CallFunctionsAfterBecameLeader{}
_ manager.Runnable = &CallFunctionsAfterBecameLeader{}
)

// NewEnableAfterBecameLeader creates a new EnableAfterBecameLeader Runnable.
func NewEnableAfterBecameLeader(enable func(context.Context)) *EnableAfterBecameLeader {
return &EnableAfterBecameLeader{
// NewCallFunctionsAfterBecameLeader creates a new CallFunctionsAfterBecameLeader Runnable.
func NewCallFunctionsAfterBecameLeader(enable func(context.Context), leader func()) *CallFunctionsAfterBecameLeader {
return &CallFunctionsAfterBecameLeader{
enable: enable,
leader: leader,
}
}

func (j *EnableAfterBecameLeader) Start(ctx context.Context) error {
func (j *CallFunctionsAfterBecameLeader) Start(ctx context.Context) error {
j.enable(ctx)
j.leader()
return nil
}

func (j *EnableAfterBecameLeader) NeedLeaderElection() bool {
func (j *CallFunctionsAfterBecameLeader) NeedLeaderElection() bool {
return true
}
16 changes: 10 additions & 6 deletions internal/framework/runnables/runnables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,23 @@ func TestLeaderOrNonLeader(t *testing.T) {
g.Expect(leaderOrNonLeader.NeedLeaderElection()).To(BeFalse())
}

func TestEnableAfterBecameLeader(t *testing.T) {
func TestCallFunctionsAfterBecameLeader(t *testing.T) {
t.Parallel()
enabled := false
enableAfterBecameLeader := NewEnableAfterBecameLeader(func(_ context.Context) {
enabled = true
})
leader := false

callFunctionsAfterBecameLeader := NewCallFunctionsAfterBecameLeader(
func(_ context.Context) { enabled = true },
func() { leader = true },
)

g := NewWithT(t)
g.Expect(enableAfterBecameLeader.NeedLeaderElection()).To(BeTrue())
g.Expect(callFunctionsAfterBecameLeader.NeedLeaderElection()).To(BeTrue())
g.Expect(enabled).To(BeFalse())

err := enableAfterBecameLeader.Start(context.Background())
err := callFunctionsAfterBecameLeader.Start(context.Background())
g.Expect(err).ToNot(HaveOccurred())

g.Expect(enabled).To(BeTrue())
g.Expect(leader).To(BeTrue())
}
21 changes: 20 additions & 1 deletion internal/mode/static/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,31 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Log
h.cfg.metricsCollector.ObserveLastEventBatchProcessTime(duration)
}()

var newLeader bool
for _, event := range batch {
h.parseAndCaptureEvent(ctx, logger, event)
switch event.(type) {
case *events.NewLeaderEvent:
newLeader = true
default:
h.parseAndCaptureEvent(ctx, logger, event)
}
}

changeType, gr := h.cfg.processor.Process()

// if there is a newLeader event in the EventBatch, we want to generate and update nginx conf,
// so regardless of what came back from Process(), we want to update the nginx conf with the latest graph
if newLeader {
changeType = state.ClusterStateChange
gr = h.cfg.processor.GetLatestGraph()
}

// if this Pod is not the leader or does not have the leader lease yet,
// the nginx conf should not be updated.
if !h.cfg.graphBuiltHealthChecker.leader {
return
}

// Once we've processed resources on startup and built our first graph, mark the Pod as ready.
if !h.cfg.graphBuiltHealthChecker.ready {
h.cfg.graphBuiltHealthChecker.setAsReady()
Expand Down
35 changes: 34 additions & 1 deletion internal/mode/static/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ var _ = Describe("eventHandler", func() {
updateGatewayClassStatus: true,
})
Expect(handler.cfg.graphBuiltHealthChecker.ready).To(BeFalse())

handler.cfg.graphBuiltHealthChecker.leader = true
})

AfterEach(func() {
Expand Down Expand Up @@ -193,6 +195,17 @@ var _ = Describe("eventHandler", func() {
expectReconfig(dcfg, fakeCfgFiles)
Expect(helpers.Diff(handler.GetLatestConfiguration(), &dcfg)).To(BeEmpty())
})

It("should process a NewLeaderEvent", func() {
e := &events.NewLeaderEvent{}

batch := []interface{}{e}

handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)

dcfg := dataplane.GetDefaultConfiguration(&graph.Graph{}, 1)
Expect(helpers.Diff(handler.GetLatestConfiguration(), &dcfg)).To(BeEmpty())
})
})

When("a batch has multiple events", func() {
Expand All @@ -202,7 +215,8 @@ var _ = Describe("eventHandler", func() {
Type: &gatewayv1.HTTPRoute{},
NamespacedName: types.NamespacedName{Namespace: "test", Name: "route"},
}
batch := []interface{}{upsertEvent, deleteEvent}
newLeaderEvent := &events.NewLeaderEvent{}
batch := []interface{}{upsertEvent, deleteEvent, newLeaderEvent}

handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)

Expand Down Expand Up @@ -502,6 +516,25 @@ var _ = Describe("eventHandler", func() {
Expect(handler.cfg.graphBuiltHealthChecker.readyCheck(nil)).To(Succeed())
})

It("should not update nginx conf if NGF is not leader", func() {
e := &events.UpsertEvent{Resource: &gatewayv1.HTTPRoute{}}
batch := []interface{}{e}
readyChannel := handler.cfg.graphBuiltHealthChecker.getReadyCh()

fakeProcessor.ProcessReturns(state.ClusterStateChange, &graph.Graph{})

handler.cfg.graphBuiltHealthChecker.leader = false

Expect(handler.cfg.graphBuiltHealthChecker.readyCheck(nil)).ToNot(Succeed())
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)

Expect(handler.GetLatestConfiguration()).To(BeNil())

Expect(readyChannel).ShouldNot(BeClosed())

Expect(handler.cfg.graphBuiltHealthChecker.readyCheck(nil)).ToNot(Succeed())
})

It("should panic for an unknown event type", func() {
e := &struct{}{}

Expand Down
31 changes: 29 additions & 2 deletions internal/mode/static/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"errors"
"net/http"
"sync"

"github.com/nginx/nginx-gateway-fabric/internal/framework/events"
)

// newGraphBuiltHealthChecker creates a new graphBuiltHealthChecker.
Expand All @@ -13,20 +15,36 @@ func newGraphBuiltHealthChecker() *graphBuiltHealthChecker {
}
}

// graphBuiltHealthChecker is used to check if the initial graph is built and the NGF Pod is ready.
// graphBuiltHealthChecker is used to check if the initial graph is built, if the NGF Pod is leader, and if the
// NGF Pod is ready.
type graphBuiltHealthChecker struct {
// readyCh is a channel that is initialized in newGraphBuiltHealthChecker and represents if the NGF Pod is ready.
readyCh chan struct{}
// eventCh is a channel that a NewLeaderEvent gets sent to when the NGF Pod becomes leader.
eventCh chan interface{}
lock sync.RWMutex
ready bool
leader bool
}

func (h *graphBuiltHealthChecker) readyHandler(resp http.ResponseWriter, req *http.Request) {
if err := h.readyCheck(req); err != nil {
resp.WriteHeader(http.StatusServiceUnavailable)
} else {
resp.WriteHeader(http.StatusOK)
}
}

// readyCheck returns the ready-state of the Pod. It satisfies the controller-runtime Checker type.
// We are considered ready after the first graph is built.
// We are considered ready after the first graph is built and if the NGF Pod is leader.
func (h *graphBuiltHealthChecker) readyCheck(_ *http.Request) error {
h.lock.RLock()
defer h.lock.RUnlock()

if !h.leader {
return errors.New("this NGF Pod is not currently leader")
}

if !h.ready {
return errors.New("control plane is not yet ready")
}
Expand All @@ -47,3 +65,12 @@ func (h *graphBuiltHealthChecker) setAsReady() {
func (h *graphBuiltHealthChecker) getReadyCh() <-chan struct{} {
return h.readyCh
}

// setAsLeader marks the health check as leader and sends an empty event to the event channel.
func (h *graphBuiltHealthChecker) setAsLeader() {
h.lock.Lock()
defer h.lock.Unlock()

h.leader = true
h.eventCh <- &events.NewLeaderEvent{}
}
47 changes: 46 additions & 1 deletion internal/mode/static/health_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package static

import (
"errors"
"net/http"
"net/http/httptest"
"testing"

. "github.com/onsi/gomega"
Expand All @@ -10,8 +13,50 @@ func TestReadyCheck(t *testing.T) {
t.Parallel()
g := NewWithT(t)
healthChecker := newGraphBuiltHealthChecker()
g.Expect(healthChecker.readyCheck(nil)).ToNot(Succeed())

g.Expect(healthChecker.readyCheck(nil)).To(MatchError(errors.New("this NGF Pod is not currently leader")))

healthChecker.ready = true
g.Expect(healthChecker.readyCheck(nil)).To(MatchError(errors.New("this NGF Pod is not currently leader")))

healthChecker.ready = false
healthChecker.leader = true
g.Expect(healthChecker.readyCheck(nil)).To(MatchError(errors.New("control plane is not yet ready")))

healthChecker.ready = true
g.Expect(healthChecker.readyCheck(nil)).To(Succeed())
}

func TestSetAsLeader(t *testing.T) {
t.Parallel()
g := NewWithT(t)
healthChecker := newGraphBuiltHealthChecker()
healthChecker.eventCh = make(chan interface{}, 1)

g.Expect(healthChecker.leader).To(BeFalse())
g.Expect(healthChecker.eventCh).ShouldNot(Receive())

healthChecker.setAsLeader()

g.Expect(healthChecker.leader).To(BeTrue())
g.Expect(healthChecker.eventCh).Should(Receive())
}

func TestReadyHandler(t *testing.T) {
t.Parallel()
g := NewWithT(t)
healthChecker := newGraphBuiltHealthChecker()

r := httptest.NewRequest(http.MethodGet, "/readyz", nil)
w := httptest.NewRecorder()

healthChecker.readyHandler(w, r)
g.Expect(w.Result().StatusCode).To(Equal(http.StatusServiceUnavailable))

healthChecker.ready = true
healthChecker.leader = true

w = httptest.NewRecorder()
healthChecker.readyHandler(w, r)
g.Expect(w.Result().StatusCode).To(Equal(http.StatusOK))
}
Loading
Loading