Skip to content

Commit

Permalink
Add event processing histogram metric (#1134)
Browse files Browse the repository at this point in the history
Problem: NGF does not measure how long it takes to process an event batch.

Solution: Add a new histogram metric event_batch_processing_milliseconds, 
that measures the time it takes to process an event batch. Also adds a debug 
log statement with the same information so we can debug spikes in processing time.
  • Loading branch information
kate-osborn authored Oct 12, 2023
1 parent 4f40fca commit 567f27e
Show file tree
Hide file tree
Showing 17 changed files with 234 additions and 152 deletions.
3 changes: 3 additions & 0 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ NGINX Gateway Fabric exports the following metrics:
- nginx_stale_config. 1 means NGF failed to configure NGINX with the latest version of the configuration, which means
NGINX is running with a stale version.
- nginx_last_reload_milliseconds. Duration in milliseconds of NGINX reloads (histogram).
- event_batch_processing_milliseconds: Duration in milliseconds of event batch processing (histogram), which is the
time it takes NGF to process batches of Kubernetes events (changes to cluster resources). Note that NGF processes
events in batches, and while processing the current batch, it accumulates events for the next batch.
- These metrics have the namespace `nginx_gateway_fabric`, and include the label `class` which is set to the
Gateway class of NGF. For example, `nginx_gateway_fabric_nginx_reloads_total{class="nginx"}`.

Expand Down
23 changes: 13 additions & 10 deletions internal/framework/events/eventsfakes/fake_event_handler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion internal/framework/events/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package events

import (
"context"

"github.com/go-logr/logr"
)

//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . EventHandler
Expand All @@ -10,5 +12,5 @@ import (
type EventHandler interface {
// HandleEventBatch handles a batch of events.
// EventBatch can include duplicated events.
HandleEventBatch(ctx context.Context, batch EventBatch)
HandleEventBatch(ctx context.Context, logger logr.Logger, batch EventBatch)
}
12 changes: 9 additions & 3 deletions internal/framework/events/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ type EventLoop struct {
// The batches are swapped before starting the handler goroutine.
currentBatch EventBatch
nextBatch EventBatch

// the ID of the current batch
currentBatchID int
}

// NewEventLoop creates a new EventLoop.
Expand Down Expand Up @@ -63,11 +66,14 @@ func (el *EventLoop) Start(ctx context.Context) error {

handleBatch := func() {
go func(batch EventBatch) {
el.logger.Info("Handling events from the batch", "total", len(batch))
el.currentBatchID++
batchLogger := el.logger.WithName("eventHandler").WithValues("batchID", el.currentBatchID)

batchLogger.Info("Handling events from the batch", "total", len(batch))

el.handler.HandleEventBatch(ctx, batch)
el.handler.HandleEventBatch(ctx, batchLogger, batch)

el.logger.Info("Finished handling the batch")
batchLogger.Info("Finished handling the batch")
handlingDone <- struct{}{}
}(el.currentBatch)
}
Expand Down
11 changes: 6 additions & 5 deletions internal/framework/events/loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"

"github.com/go-logr/logr"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
Expand Down Expand Up @@ -47,7 +48,7 @@ var _ = Describe("EventLoop", func() {

// Ensure the first batch is handled
Eventually(fakeHandler.HandleEventBatchCallCount).Should(Equal(1))
_, batch = fakeHandler.HandleEventBatchArgsForCall(0)
_, _, batch = fakeHandler.HandleEventBatchArgsForCall(0)

var expectedBatch events.EventBatch = []interface{}{"event0"}
Expect(batch).Should(Equal(expectedBatch))
Expand All @@ -70,7 +71,7 @@ var _ = Describe("EventLoop", func() {
eventCh <- e

Eventually(fakeHandler.HandleEventBatchCallCount).Should(Equal(2))
_, batch := fakeHandler.HandleEventBatchArgsForCall(1)
_, _, batch := fakeHandler.HandleEventBatchArgsForCall(1)

var expectedBatch events.EventBatch = []interface{}{e}
Expect(batch).Should(Equal(expectedBatch))
Expand All @@ -82,7 +83,7 @@ var _ = Describe("EventLoop", func() {

// The func below will pause the handler goroutine while it is processing the batch with e1 until
// sentSecondAndThirdEvents is closed. This way we can add e2 and e3 to the current batch in the meantime.
fakeHandler.HandleEventBatchCalls(func(ctx context.Context, batch events.EventBatch) {
fakeHandler.HandleEventBatchCalls(func(ctx context.Context, logger logr.Logger, batch events.EventBatch) {
close(firstHandleEventBatchCallInProgress)
<-sentSecondAndThirdEvents
})
Expand All @@ -106,14 +107,14 @@ var _ = Describe("EventLoop", func() {
close(sentSecondAndThirdEvents)

Eventually(fakeHandler.HandleEventBatchCallCount).Should(Equal(3))
_, batch := fakeHandler.HandleEventBatchArgsForCall(1)
_, _, batch := fakeHandler.HandleEventBatchArgsForCall(1)

var expectedBatch events.EventBatch = []interface{}{e1}

// the first HandleEventBatch() call must have handled a batch with e1
Expect(batch).Should(Equal(expectedBatch))

_, batch = fakeHandler.HandleEventBatchArgsForCall(2)
_, _, batch = fakeHandler.HandleEventBatchArgsForCall(2)

expectedBatch = []interface{}{e2, e3}
// the second HandleEventBatch() call must have handled a batch with e2 and e3
Expand Down
13 changes: 5 additions & 8 deletions internal/mode/provisioner/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ type eventHandler struct {

statusUpdater status.Updater
k8sClient client.Client
logger logr.Logger

staticModeDeploymentYAML []byte

Expand All @@ -38,7 +37,6 @@ func newEventHandler(
gcName string,
statusUpdater status.Updater,
k8sClient client.Client,
logger logr.Logger,
staticModeDeploymentYAML []byte,
) *eventHandler {
return &eventHandler{
Expand All @@ -47,7 +45,6 @@ func newEventHandler(
statusUpdater: statusUpdater,
gcName: gcName,
k8sClient: k8sClient,
logger: logger,
staticModeDeploymentYAML: staticModeDeploymentYAML,
gatewayNextID: 1,
}
Expand Down Expand Up @@ -80,7 +77,7 @@ func (h *eventHandler) setGatewayClassStatuses(ctx context.Context) {
h.statusUpdater.Update(ctx, statuses)
}

func (h *eventHandler) ensureDeploymentsMatchGateways(ctx context.Context) {
func (h *eventHandler) ensureDeploymentsMatchGateways(ctx context.Context, logger logr.Logger) {
var gwsWithoutDeps, removedGwsWithDeps []types.NamespacedName

for nsname, gw := range h.store.gateways {
Expand Down Expand Up @@ -116,7 +113,7 @@ func (h *eventHandler) ensureDeploymentsMatchGateways(ctx context.Context) {

h.provisions[nsname] = deployment

h.logger.Info(
logger.Info(
"Created deployment",
"deployment", client.ObjectKeyFromObject(deployment),
"gateway", nsname,
Expand All @@ -134,18 +131,18 @@ func (h *eventHandler) ensureDeploymentsMatchGateways(ctx context.Context) {

delete(h.provisions, nsname)

h.logger.Info(
logger.Info(
"Deleted deployment",
"deployment", client.ObjectKeyFromObject(deployment),
"gateway", nsname,
)
}
}

func (h *eventHandler) HandleEventBatch(ctx context.Context, batch events.EventBatch) {
func (h *eventHandler) HandleEventBatch(ctx context.Context, logger logr.Logger, batch events.EventBatch) {
h.store.update(batch)
h.setGatewayClassStatuses(ctx)
h.ensureDeploymentsMatchGateways(ctx)
h.ensureDeploymentsMatchGateways(ctx, logger)
}

func (h *eventHandler) generateDeploymentID() string {
Expand Down
23 changes: 10 additions & 13 deletions internal/mode/provisioner/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ var _ = Describe("handler", func() {
Resource: gc,
},
}
handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)

// Ensure GatewayClass is accepted

Expand Down Expand Up @@ -126,7 +126,7 @@ var _ = Describe("handler", func() {
},
}

handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)

depNsName := types.NamespacedName{
Namespace: "nginx-gateway",
Expand Down Expand Up @@ -156,7 +156,7 @@ var _ = Describe("handler", func() {
}

handle := func() {
handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)
}

Expect(handle).Should(Panic())
Expand All @@ -179,7 +179,6 @@ var _ = Describe("handler", func() {
gcName,
statusUpdater,
k8sclient,
zap.New(),
embeddedfiles.StaticModeDeploymentYAML,
)
})
Expand Down Expand Up @@ -217,7 +216,7 @@ var _ = Describe("handler", func() {
},
}

handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)
deps := &v1.DeploymentList{}

err := k8sclient.List(context.Background(), deps)
Expand All @@ -237,7 +236,7 @@ var _ = Describe("handler", func() {
},
}

handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)

deps := &v1.DeploymentList{}

Expand Down Expand Up @@ -266,7 +265,7 @@ var _ = Describe("handler", func() {
},
}

handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)

deps := &v1.DeploymentList{}
err := k8sclient.List(context.Background(), deps)
Expand Down Expand Up @@ -295,7 +294,7 @@ var _ = Describe("handler", func() {
},
}

handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)

unknownGC := &v1beta1.GatewayClass{}
err = k8sclient.Get(context.Background(), client.ObjectKeyFromObject(gc), unknownGC)
Expand Down Expand Up @@ -330,7 +329,6 @@ var _ = Describe("handler", func() {
gcName,
statusUpdater,
k8sclient,
zap.New(),
embeddedfiles.StaticModeDeploymentYAML,
)
})
Expand All @@ -340,7 +338,7 @@ var _ = Describe("handler", func() {
batch := []interface{}{e}

handle := func() {
handler.HandleEventBatch(context.TODO(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)
}

Expect(handle).Should(Panic())
Expand Down Expand Up @@ -408,7 +406,7 @@ var _ = Describe("handler", func() {
}

handle := func() {
handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)
}

Expect(handle).Should(Panic())
Expand All @@ -429,7 +427,7 @@ var _ = Describe("handler", func() {
}

handle := func() {
handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)
}

Expect(handle).Should(Panic())
Expand All @@ -442,7 +440,6 @@ var _ = Describe("handler", func() {
gcName,
statusUpdater,
k8sclient,
zap.New(),
[]byte("broken YAML"),
)

Expand Down
1 change: 0 additions & 1 deletion internal/mode/provisioner/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ func StartManager(cfg Config) error {
cfg.GatewayClassName,
statusUpdater,
mgr.GetClient(),
cfg.Logger.WithName("eventHandler"),
embeddedfiles.StaticModeDeploymentYAML,
)

Expand Down
Loading

0 comments on commit 567f27e

Please sign in to comment.