Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Telemetry collector structure with some collected resources #1497

Merged
merged 43 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
951a631
WIP
bjee19 Jan 19, 2024
55320c6
Add graph resource count but still WIP
bjee19 Jan 22, 2024
3b43d1f
Add small formatting
bjee19 Jan 22, 2024
d6181a6
Add review feedback
bjee19 Jan 23, 2024
f51d0ee
Add version+name and adjust graphGetter
bjee19 Jan 23, 2024
0e467db
Change k8sClient to client Reader
bjee19 Jan 24, 2024
f7a2b08
Add small feedback review
bjee19 Jan 24, 2024
266e713
Add more feedback from review
bjee19 Jan 24, 2024
5a4f0c7
Add feedback and cfg
bjee19 Jan 25, 2024
e400f5e
Change ReferencedServices and add generic counting function
bjee19 Jan 25, 2024
cfc0519
Add collector tests
bjee19 Jan 26, 2024
f4e16a7
Refactor style on collector tests
bjee19 Jan 26, 2024
4372871
Add small style changes and documentation
bjee19 Jan 26, 2024
7eec0df
Add error case for latest graph being nil
bjee19 Jan 26, 2024
a1e230f
Add small feedback for error message
bjee19 Jan 26, 2024
d92cd83
Add latest configuration and tests
bjee19 Jan 26, 2024
130ce47
Add nodes resource to RBAC
bjee19 Jan 29, 2024
8b11f1d
Add additional documentation on exported types and functions
bjee19 Jan 29, 2024
905f476
Add lock to eventHandlerImpl
bjee19 Jan 30, 2024
1e455e5
Add GetLatestGraph into change processor tests
bjee19 Jan 30, 2024
4062c72
Add expData to job test
bjee19 Jan 30, 2024
c9749a6
Add GetLatestConfiguration to handler tests
bjee19 Jan 31, 2024
73e390d
Revert ReferencedServices to no longer store services
bjee19 Jan 31, 2024
1991643
Add feedback for collector tests
bjee19 Feb 1, 2024
89eb5ce
Add count for ignored gateway classes and gateways
bjee19 Feb 1, 2024
7213348
Add HealthChecker and blocking on job until NGF is ready
bjee19 Feb 1, 2024
32be3b7
Add job tests for waiting on health checker
bjee19 Feb 1, 2024
c07cb17
Add review feedback
bjee19 Feb 2, 2024
57805ad
Refactor CreateTelemetryJobWorker and job tests
bjee19 Feb 2, 2024
4ed9948
Add FIXME and and small comment
bjee19 Feb 2, 2024
49da1d3
Update manifests
bjee19 Feb 2, 2024
63f3a76
Add back createTelemetryJob
bjee19 Feb 2, 2024
a091bde
Add job worker name change and test changes
bjee19 Feb 2, 2024
6caf023
Change job worker tests to be unit tests
bjee19 Feb 2, 2024
9a1f99b
Add generic ReadyChannel to cronjob
bjee19 Feb 3, 2024
8119c21
Refactor collector tests to add list calls helper function
bjee19 Feb 3, 2024
455512a
Remove health checker interface
bjee19 Feb 4, 2024
0025558
Add setLatestConfiguration
bjee19 Feb 4, 2024
36fae13
Add small fixes
bjee19 Feb 4, 2024
e3a71e9
Add check for context canceled
bjee19 Feb 5, 2024
9ca0236
Add small line fix
bjee19 Feb 5, 2024
de6e3bc
Add review feedback
bjee19 Feb 5, 2024
89c41d5
Move test case around in collector tests
bjee19 Feb 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/gateway/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func createStaticModeCommand() *cobra.Command {
},
Plus: plus,
TelemetryReportPeriod: period,
Version: version,
}

if err := static.StartManager(conf); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions deploy/helm-chart/templates/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ rules:
- namespaces
- services
- secrets
# FIXME(bjee19): make nodes permission dependent on telemetry being enabled.
# https://github.com/nginxinc/nginx-gateway-fabric/issues/1317.
- nodes
bjee19 marked this conversation as resolved.
Show resolved Hide resolved
verbs:
- list
- watch
Expand Down
3 changes: 3 additions & 0 deletions deploy/manifests/nginx-gateway.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ rules:
- namespaces
- services
- secrets
# FIXME(bjee19): make nodes permission dependent on telemetry being enabled.
# https://github.com/nginxinc/nginx-gateway-fabric/issues/1317.
- nodes
verbs:
- list
- watch
Expand Down
9 changes: 9 additions & 0 deletions internal/framework/runnables/cronjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
type CronJobConfig struct {
// Worker is the function that will be run for every cronjob iteration.
Worker func(context.Context)
// ReadyCh delays the start of the job until the channel is closed.
ReadyCh <-chan struct{}
// Logger is the logger.
Logger logr.Logger
// Period defines the period of the cronjob. The cronjob will run every Period.
Expand All @@ -37,6 +39,13 @@ func NewCronJob(cfg CronJobConfig) *CronJob {
// Start starts the cronjob.
// Implements controller-runtime manager.Runnable
func (j *CronJob) Start(ctx context.Context) error {
select {
case <-j.cfg.ReadyCh:
case <-ctx.Done():
j.cfg.Logger.Info("Context canceled, failed to start cronjob")
return ctx.Err()
}

j.cfg.Logger.Info("Starting cronjob")

sliding := true // This means the period with jitter will be calculated after each worker call.
Expand Down
36 changes: 33 additions & 3 deletions internal/framework/runnables/cronjob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
func TestCronJob(t *testing.T) {
g := NewWithT(t)

readyChannel := make(chan struct{})

timeout := 10 * time.Second
var callCount int

Expand All @@ -22,9 +24,10 @@ func TestCronJob(t *testing.T) {
}

cfg := CronJobConfig{
Worker: worker,
Logger: zap.New(),
Period: 1 * time.Millisecond, // 1ms is much smaller than timeout so the CronJob should run a few times
Worker: worker,
Logger: zap.New(),
Period: 1 * time.Millisecond, // 1ms is much smaller than timeout so the CronJob should run a few times
ReadyCh: readyChannel,
}
job := NewCronJob(cfg)

Expand All @@ -35,6 +38,7 @@ func TestCronJob(t *testing.T) {
errCh <- job.Start(ctx)
close(errCh)
}()
close(readyChannel)

minReports := 2 // ensure that the CronJob reports more than once: it doesn't exit after the first run

Expand All @@ -44,3 +48,29 @@ func TestCronJob(t *testing.T) {
g.Eventually(errCh).Should(Receive(BeNil()))
g.Eventually(errCh).Should(BeClosed())
}

func TestCronJob_ContextCanceled(t *testing.T) {
g := NewWithT(t)

readyChannel := make(chan struct{})

cfg := CronJobConfig{
Worker: func(ctx context.Context) {},
Logger: zap.New(),
Period: 1 * time.Millisecond, // 1ms is much smaller than timeout so the CronJob should run a few times
ReadyCh: readyChannel,
}
job := NewCronJob(cfg)

ctx, cancel := context.WithCancel(context.Background())

errCh := make(chan error)
go func() {
errCh <- job.Start(ctx)
close(errCh)
}()

cancel()
g.Eventually(errCh).Should(Receive(MatchError(context.Canceled)))
g.Eventually(errCh).Should(BeClosed())
}
2 changes: 2 additions & 0 deletions internal/mode/static/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
)

type Config struct {
// Version is the running NGF version.
Version string
// AtomicLevel is an atomically changeable, dynamic logging level.
AtomicLevel zap.AtomicLevel
// GatewayNsName is the namespaced name of a Gateway resource that the Gateway will use.
Expand Down
50 changes: 39 additions & 11 deletions internal/mode/static/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package static
import (
"context"
"fmt"
"sync"
"time"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -58,8 +59,8 @@ type eventHandlerConfig struct {
logLevelSetter logLevelSetter
// metricsCollector collects metrics for this controller.
metricsCollector handlerMetricsCollector
// healthChecker sets the health of the Pod to Ready once we've written out our initial config
healthChecker *healthChecker
// nginxConfiguredOnStartChecker sets the health of the Pod to Ready once we've written out our initial config.
nginxConfiguredOnStartChecker *nginxConfiguredOnStartChecker
// controlConfigNSName is the NamespacedName of the NginxGateway config for this controller.
controlConfigNSName types.NamespacedName
// version is the current version number of the nginx config.
Expand All @@ -72,7 +73,10 @@ type eventHandlerConfig struct {
// (2) Keeping the statuses of the Gateway API resources updated.
// (3) Updating control plane configuration.
type eventHandlerImpl struct {
cfg eventHandlerConfig
// latestConfiguration is the latest Configuration generation.
latestConfiguration *dataplane.Configuration
bjee19 marked this conversation as resolved.
Show resolved Hide resolved
bjee19 marked this conversation as resolved.
Show resolved Hide resolved
cfg eventHandlerConfig
lock sync.Mutex
bjee19 marked this conversation as resolved.
Show resolved Hide resolved
}

// newEventHandlerImpl creates a new eventHandlerImpl.
Expand Down Expand Up @@ -105,36 +109,44 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Log
switch changeType {
case state.NoChange:
logger.Info("Handling events didn't result into NGINX configuration changes")
if !h.cfg.healthChecker.ready && h.cfg.healthChecker.firstBatchError == nil {
h.cfg.healthChecker.setAsReady()
if !h.cfg.nginxConfiguredOnStartChecker.ready && h.cfg.nginxConfiguredOnStartChecker.firstBatchError == nil {
h.cfg.nginxConfiguredOnStartChecker.setAsReady()
}
return
case state.EndpointsOnlyChange:
h.cfg.version++
cfg := dataplane.BuildConfiguration(ctx, graph, h.cfg.serviceResolver, h.cfg.version)

h.setLatestConfiguration(&cfg)

err = h.updateUpstreamServers(
ctx,
logger,
dataplane.BuildConfiguration(ctx, graph, h.cfg.serviceResolver, h.cfg.version),
cfg,
)
case state.ClusterStateChange:
h.cfg.version++
cfg := dataplane.BuildConfiguration(ctx, graph, h.cfg.serviceResolver, h.cfg.version)

h.setLatestConfiguration(&cfg)

err = h.updateNginxConf(
ctx,
dataplane.BuildConfiguration(ctx, graph, h.cfg.serviceResolver, h.cfg.version),
cfg,
)
}

var nginxReloadRes nginxReloadResult
if err != nil {
logger.Error(err, "Failed to update NGINX configuration")
nginxReloadRes.error = err
if !h.cfg.healthChecker.ready {
h.cfg.healthChecker.firstBatchError = err
if !h.cfg.nginxConfiguredOnStartChecker.ready {
h.cfg.nginxConfiguredOnStartChecker.firstBatchError = err
}
} else {
logger.Info("NGINX configuration was successfully updated")
if !h.cfg.healthChecker.ready {
h.cfg.healthChecker.setAsReady()
if !h.cfg.nginxConfiguredOnStartChecker.ready {
h.cfg.nginxConfiguredOnStartChecker.setAsReady()
}
}

Expand Down Expand Up @@ -384,3 +396,19 @@ func getGatewayAddresses(

return gwAddresses, nil
}

// GetLatestConfiguration gets the latest configuration.
func (h *eventHandlerImpl) GetLatestConfiguration() *dataplane.Configuration {
bjee19 marked this conversation as resolved.
Show resolved Hide resolved
h.lock.Lock()
defer h.lock.Unlock()

return h.latestConfiguration
}

// setLatestConfiguration sets the latest configuration.
func (h *eventHandlerImpl) setLatestConfiguration(cfg *dataplane.Configuration) {
h.lock.Lock()
defer h.lock.Unlock()

h.latestConfiguration = cfg
}
Loading
Loading