Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions internal/extproc/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ type ConfigReceiver interface {
}

type configWatcher struct {
lastMod time.Time
path string
rcv ConfigReceiver
l *slog.Logger
current string
lastMod time.Time
path string
rcv ConfigReceiver
l *slog.Logger
current string
usingDefaultCfg bool
}

// StartConfigWatcher starts a watcher for the given path and Receiver.
Expand Down Expand Up @@ -73,21 +74,26 @@ func (cw *configWatcher) loadConfig(ctx context.Context) error {
stat, err := os.Stat(cw.path)
switch {
case err != nil && os.IsNotExist(err):
// If the file does not exist, do not fail (which could lead to the extproc process to terminate)
// Instead, load the default configuration and keep running unconfigured
// If the file does not exist, do not fail (which could lead to the extproc process to terminate).
// Instead, load the default configuration and keep running unconfigured.
cfg, raw = filterapi.MustLoadDefaultConfig()
case err != nil:
return err
}

if cfg != nil {
if cw.usingDefaultCfg { // Do not re-reload the same thing on every tick.
return nil
}
cw.l.Info("config file does not exist; loading default config", slog.String("path", cw.path))
cw.lastMod = time.Now()
cw.usingDefaultCfg = true
} else {
cw.l.Info("loading a new config", slog.String("path", cw.path))
cw.usingDefaultCfg = false
if stat.ModTime().Sub(cw.lastMod) <= 0 {
return nil
}
cw.l.Info("loading a new config", slog.String("path", cw.path))
cw.lastMod = stat.ModTime()
cfg, raw, err = filterapi.UnmarshalConfigYaml(cw.path)
if err != nil {
Expand Down
31 changes: 21 additions & 10 deletions internal/extproc/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@ import (

// mockReceiver is a mock implementation of Receiver.
type mockReceiver struct {
cfg *filterapi.Config
mux sync.Mutex
cfg *filterapi.Config
mux sync.Mutex
loadCount int
}

// LoadConfig implements ConfigReceiver.
func (m *mockReceiver) LoadConfig(_ context.Context, cfg *filterapi.Config) error {
m.mux.Lock()
defer m.mux.Unlock()
m.cfg = cfg
m.loadCount++
return nil
}

Expand Down Expand Up @@ -77,8 +79,9 @@ func TestStartConfigWatcher(t *testing.T) {
path := tmpdir + "/config.yaml"
rcv := &mockReceiver{}

const tickInterval = time.Millisecond * 100
logger, buf := newTestLoggerWithBuffer()
err := StartConfigWatcher(t.Context(), path, rcv, logger, time.Millisecond*100)
err := StartConfigWatcher(t.Context(), path, rcv, logger, tickInterval)
require.NoError(t, err)

defaultCfg, _ := filterapi.MustLoadDefaultConfig()
Expand All @@ -87,12 +90,16 @@ func TestStartConfigWatcher(t *testing.T) {
// Verify the default config has been loaded.
require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Equal(c, defaultCfg, rcv.getConfig())
}, 1*time.Second, 100*time.Millisecond)
}, 1*time.Second, tickInterval)

// Verify the buffer contains the default config loading.
require.Eventually(t, func() bool {
return strings.Contains(buf.String(), "config file does not exist; loading default config")
}, 1*time.Second, 100*time.Millisecond, buf.String())
}, 1*time.Second, tickInterval, buf.String())

// Wait for a couple ticks to verify default config is not reloaded.
time.Sleep(2 * tickInterval)
require.Equal(t, 1, rcv.loadCount)

// Create the initial config file.
cfg := `
Expand Down Expand Up @@ -126,7 +133,7 @@ rules:
// Initial loading should have happened.
require.Eventually(t, func() bool {
return rcv.getConfig() != defaultCfg
}, 1*time.Second, 100*time.Millisecond)
}, 1*time.Second, tickInterval)
firstCfg := rcv.getConfig()
require.NotNil(t, firstCfg)

Expand All @@ -151,18 +158,22 @@ rules:
// Verify the config has been updated.
require.Eventually(t, func() bool {
return rcv.getConfig() != firstCfg
}, 1*time.Second, 100*time.Millisecond)
}, 1*time.Second, tickInterval)
require.NotEqual(t, firstCfg, rcv.getConfig())

// Verify the buffer contains the updated loading.
require.Eventually(t, func() bool {
return strings.Contains(buf.String(), "loading a new config")
}, 1*time.Second, 100*time.Millisecond, buf.String())
}, 1*time.Second, tickInterval, buf.String())

// Verify the buffer contains the config line changed
// Verify the buffer contains the config line changed.
require.Eventually(t, func() bool {
return strings.Contains(buf.String(), "config line changed")
}, 1*time.Second, 100*time.Millisecond, buf.String())
}, 1*time.Second, tickInterval, buf.String())

// Wait for a couple ticks to verify config is not reloaded if file does not change.
time.Sleep(2 * tickInterval)
require.Equal(t, 3, rcv.loadCount)
}

func TestDiff(t *testing.T) {
Expand Down