Skip to content
Merged
83 changes: 57 additions & 26 deletions internal/provider/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"

Expand All @@ -25,33 +26,45 @@
"github.com/envoyproxy/gateway/internal/envoygateway/config"
"github.com/envoyproxy/gateway/internal/filewatcher"
"github.com/envoyproxy/gateway/internal/message"
"github.com/envoyproxy/gateway/internal/provider/kubernetes"
"github.com/envoyproxy/gateway/internal/utils/path"
)

type Provider struct {
paths []string
logger logr.Logger
watcher filewatcher.FileWatcher
resourcesStore *resourcesStore
extensionManagerEnabled bool
paths []string
logger logr.Logger
watcher filewatcher.FileWatcher
resources *message.ProviderResources
reconciler *kubernetes.OfflineGatewayAPIReconciler
store *resourcesStore
status *StatusHandler

// ready indicates whether the provider can start watching filesystem events.
ready atomic.Bool
}

func New(svr *config.Server, resources *message.ProviderResources) (*Provider, error) {
func New(ctx context.Context, svr *config.Server, resources *message.ProviderResources) (*Provider, error) {
logger := svr.Logger.Logger
paths := sets.New[string]()
if svr.EnvoyGateway.Provider.Custom.Resource.File != nil {
paths.Insert(svr.EnvoyGateway.Provider.Custom.Resource.File.Paths...)
}

// Create gateway-api offline reconciler.
statusHandler := NewStatusHandler(logger)
reconciler, err := kubernetes.NewOfflineGatewayAPIController(ctx, svr, statusHandler.Writer(), resources)
if err != nil {
return nil, fmt.Errorf("failed to create offline gateway-api controller")
}

Check warning on line 58 in internal/provider/file/file.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/file/file.go#L57-L58

Added lines #L57 - L58 were not covered by tests

return &Provider{
paths: paths.UnsortedList(),
logger: logger,
watcher: filewatcher.NewWatcher(),
resourcesStore: newResourcesStore(svr.EnvoyGateway.Gateway.ControllerName, resources, logger),
extensionManagerEnabled: svr.EnvoyGateway.ExtensionManager != nil,
paths: paths.UnsortedList(),
logger: logger,
watcher: filewatcher.NewWatcher(),
resources: resources,
reconciler: reconciler,
store: newResourcesStore(svr.EnvoyGateway.Gateway.ControllerName, reconciler.Client, resources, logger),
status: statusHandler,
}, nil
}

Expand All @@ -73,13 +86,18 @@
}
go p.startHealthProbeServer(ctx, readyzChecker)

// Subscribe resources status.
p.subscribeAndUpdateStatus(ctx)
// Offline controller should be started before initial resources load.
// Nor we may lose some messages from controller.
wg := new(sync.WaitGroup)
wg.Add(2)
go p.startReconciling(ctx, wg)
go p.status.Start(ctx, wg)
wg.Wait()

initDirs, initFiles := path.ListDirsAndFiles(p.paths)
// Initially load resources from paths on host.
if err := p.resourcesStore.LoadAndStore(initFiles.UnsortedList(), initDirs.UnsortedList()); err != nil {
return fmt.Errorf("failed to load resources into store: %w", err)
// Initially load resources.
if err := p.store.ReloadAll(ctx, initFiles.UnsortedList(), initDirs.UnsortedList()); err != nil {
p.logger.Error(err, "failed to reload resources initially")

Check warning on line 100 in internal/provider/file/file.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/file/file.go#L100

Added line #L100 was not covered by tests
}

// Add paths to the watcher, and aggregate all path channels into one.
Expand Down Expand Up @@ -150,18 +168,31 @@
}
p.logger.Info("file changed", "op", event.Op, "name", event.Name, "dir", filepath.Dir(event.Name))

switch event.Op {
case fsnotify.Create, fsnotify.Write, fsnotify.Remove:
// Since we do not watch any events in the subdirectories, any events involving files
// modifications in current directory will trigger the event handling.
goto handle
default:
// do nothing
continue
handle:
if err := p.store.ReloadAll(ctx, curFiles.UnsortedList(), curDirs.UnsortedList()); err != nil {
p.logger.Error(err, "error when reload resources", "op", event.Op, "name", event.Name)
}
}
}
}

handle:
p.resourcesStore.HandleEvent(curFiles.UnsortedList(), curDirs.UnsortedList())
// startReconciling starts reconcile on offline controller when receiving signal from resources store.
func (p *Provider) startReconciling(ctx context.Context, ready *sync.WaitGroup) {
p.logger.Info("start reconciling")
defer p.logger.Info("stop reconciling")
ready.Done()

for {
select {
case rid := <-p.store.reconcile:
p.logger.Info("start reconcile", "id", rid, "time", time.Now())
if err := p.reconciler.Reconcile(ctx); err != nil {
p.logger.Error(err, "failed to reconcile", "id", rid)
}

Check warning on line 191 in internal/provider/file/file.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/file/file.go#L190-L191

Added lines #L190 - L191 were not covered by tests
p.logger.Info("reconcile finished", "id", rid, "time", time.Now())

case <-ctx.Done():
return
}
}
}
Expand Down
Loading
Loading