From 916a2ce2c289f42e502a60bc54347b6a5653a396 Mon Sep 17 00:00:00 2001 From: Lorenz Bauer Date: Wed, 9 Aug 2023 17:59:28 +0100 Subject: [PATCH] health: only launch /hello after host datapath is ready Delay starting the /hello endpoint until we've loaded the host datapath at least once. This means that the presence of /health can be used to infer not only that the cilium unix socket API is up but also that the datapath can do basic packet processing. Signed-off-by: Lorenz Bauer --- cilium-health/launch/launcher.go | 9 ++++++--- daemon/cmd/health.go | 2 +- pkg/datapath/fake/datapath.go | 4 ++++ pkg/datapath/loader/loader.go | 15 ++++++++++++++- pkg/datapath/loader/loader_test.go | 8 ++++---- pkg/datapath/types/loader.go | 1 + pkg/recorder/recorder.go | 2 +- 7 files changed, 31 insertions(+), 10 deletions(-) diff --git a/cilium-health/launch/launcher.go b/cilium-health/launch/launcher.go index d21369f680f3e..1b2ddcc3ae1dd 100644 --- a/cilium-health/launch/launcher.go +++ b/cilium-health/launch/launcher.go @@ -41,7 +41,7 @@ const ( ) // Launch starts the cilium-health server and returns a handle to obtain its status -func Launch(spec *healthApi.Spec) (*CiliumHealth, error) { +func Launch(spec *healthApi.Spec, initialized <-chan struct{}) (*CiliumHealth, error) { var ( err error ch = &CiliumHealth{} @@ -65,12 +65,15 @@ func Launch(spec *healthApi.Spec) (*CiliumHealth, error) { return nil, fmt.Errorf("failed to instantiate cilium-health client: %s", err) } - go ch.runServer() + go ch.runServer(initialized) return ch, nil } -func (ch *CiliumHealth) runServer() { +func (ch *CiliumHealth) runServer(initialized <-chan struct{}) { + // Wait until the agent has initialized sufficiently + <-initialized + // Wait until Cilium API is available for { cli, err := ciliumPkg.NewDefaultClient() diff --git a/daemon/cmd/health.go b/daemon/cmd/health.go index b043a24a5c457..25af5a29fcdc1 100644 --- a/daemon/cmd/health.go +++ b/daemon/cmd/health.go @@ -23,7 +23,7 @@ import ( func (d *Daemon) initHealth(spec *healthApi.Spec, cleaner *daemonCleanup) { // Launch cilium-health in the same process (and namespace) as cilium. log.Info("Launching Cilium health daemon") - if ch, err := health.Launch(spec); err != nil { + if ch, err := health.Launch(spec, d.datapath.Loader().HostDatapathInitialized()); err != nil { log.WithError(err).Fatal("Failed to launch cilium-health") } else { d.ciliumHealth = ch diff --git a/pkg/datapath/fake/datapath.go b/pkg/datapath/fake/datapath.go index 302ed88d73d1f..5629f149f6241 100644 --- a/pkg/datapath/fake/datapath.go +++ b/pkg/datapath/fake/datapath.go @@ -153,3 +153,7 @@ func (f *fakeLoader) CustomCallsMapPath(id uint16) string { func (f *fakeLoader) Reinitialize(ctx context.Context, o datapath.BaseProgramOwner, deviceMTU int, iptMgr datapath.IptablesManager, p datapath.Proxy) error { return nil } + +func (f *fakeLoader) HostDatapathInitialized() <-chan struct{} { + return nil +} diff --git a/pkg/datapath/loader/loader.go b/pkg/datapath/loader/loader.go index ec87285bee85f..48b4cf3f8f22f 100644 --- a/pkg/datapath/loader/loader.go +++ b/pkg/datapath/loader/loader.go @@ -71,11 +71,14 @@ type Loader struct { templateCache *objectCache ipsecMu lock.Mutex // guards reinitializeIPSec + + hostDpInitializedOnce sync.Once + hostDpInitialized chan struct{} } // NewLoader returns a new loader. func NewLoader() *Loader { - return &Loader{} + return &Loader{hostDpInitialized: make(chan struct{})} } // Init initializes the datapath cache with base program hashes derived from @@ -383,6 +386,10 @@ func (l *Loader) reloadHostDatapath(ctx context.Context, ep datapath.Endpoint, o log.WithError(err).Error("Failed to remove obsolete netdev programs") } + l.hostDpInitializedOnce.Do(func() { + close(l.hostDpInitialized) + }) + return nil } @@ -619,3 +626,9 @@ func (l *Loader) CallsMapPath(id uint16) string { func (l *Loader) CustomCallsMapPath(id uint16) string { return bpf.LocalMapPath(callsmap.CustomCallsMapName, id) } + +// HostDatapathInitialized returns a channel which is closed when the +// host datapath has been loaded for the first time. +func (l *Loader) HostDatapathInitialized() <-chan struct{} { + return l.hostDpInitialized +} diff --git a/pkg/datapath/loader/loader_test.go b/pkg/datapath/loader/loader_test.go index d37425e3239fd..d10b047f7650b 100644 --- a/pkg/datapath/loader/loader_test.go +++ b/pkg/datapath/loader/loader_test.go @@ -149,7 +149,7 @@ func (s *LoaderTestSuite) testCompileAndLoad(c *C, ep *testutils.TestEndpoint) { defer cancel() stats := &metrics.SpanStat{} - l := &Loader{} + l := NewLoader() err := l.compileAndLoad(ctx, ep, dirInfo, stats) c.Assert(err, IsNil) } @@ -216,7 +216,7 @@ func (s *LoaderTestSuite) testCompileFailure(c *C, ep *testutils.TestEndpoint) { } }() - l := &Loader{} + l := NewLoader() timeout := time.Now().Add(contextTimeout) var err error stats := &metrics.SpanStat{} @@ -257,7 +257,7 @@ func BenchmarkCompileAndLoad(b *testing.B) { ctx, cancel := context.WithTimeout(context.Background(), benchTimeout) defer cancel() - l := &Loader{} + l := NewLoader() b.ResetTimer() for i := 0; i < b.N; i++ { @@ -332,7 +332,7 @@ func BenchmarkCompileOrLoad(b *testing.B) { } defer os.RemoveAll(epDir) - l := &Loader{} + l := NewLoader() l.templateCache = newObjectCache(&config.HeaderfileWriter{}, nil, tmpDir) if err := l.CompileOrLoad(ctx, &ep, nil); err != nil { log.Warningf("Failure in %s: %s", tmpDir, err) diff --git a/pkg/datapath/types/loader.go b/pkg/datapath/types/loader.go index 09dfb3e13cb08..32793d1322398 100644 --- a/pkg/datapath/types/loader.go +++ b/pkg/datapath/types/loader.go @@ -22,6 +22,7 @@ type Loader interface { EndpointHash(cfg EndpointConfiguration) (string, error) Unload(ep Endpoint) Reinitialize(ctx context.Context, o BaseProgramOwner, deviceMTU int, iptMgr IptablesManager, p Proxy) error + HostDatapathInitialized() <-chan struct{} } // BaseProgramOwner is any type for which a loader is building base programs. diff --git a/pkg/recorder/recorder.go b/pkg/recorder/recorder.go index a90f93ad4832b..81bd7cb9123e3 100644 --- a/pkg/recorder/recorder.go +++ b/pkg/recorder/recorder.go @@ -218,7 +218,7 @@ func (r *Recorder) orderedMaskSets() ([]*RecMask, []*RecMask) { func (r *Recorder) triggerDatapathRegenerate() error { var masks4, masks6 string - l := &loader.Loader{} + l := loader.NewLoader() extraCArgs := []string{} if len(r.recMask) == 0 { extraCArgs = append(extraCArgs, "-Dcapture_enabled=0")