diff --git a/categories_test.go b/categories_test.go index a5ec861ad..3b2024987 100644 --- a/categories_test.go +++ b/categories_test.go @@ -38,7 +38,11 @@ func TestCategoriesWithProxyMode(t *testing.T) { })) defer webServer.Close() - indexerProxy := packages.NewFileSystemIndexer(testLogger, "./testdata/second_package_path") + fsOpts := packages.FSIndexerOptions{ + Logger: testLogger, + } + + indexerProxy := packages.NewFileSystemIndexer(fsOpts, "./testdata/second_package_path") defer indexerProxy.Close(t.Context()) err := indexerProxy.Init(t.Context()) diff --git a/go.mod b/go.mod index c71a9a334..089050be2 100644 --- a/go.mod +++ b/go.mod @@ -52,6 +52,7 @@ require ( github.com/elastic/go-windows v1.0.2 // indirect github.com/envoyproxy/go-control-plane/envoy v1.32.4 // indirect github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect + github.com/fsnotify/fsnotify v1.9.0 github.com/go-jose/go-jose/v4 v4.1.2 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect diff --git a/go.sum b/go.sum index f3f230301..0e0abb766 100644 --- a/go.sum +++ b/go.sum @@ -77,6 +77,8 @@ github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= +github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= github.com/fsouza/fake-gcs-server v1.52.3 h1:hXddOPMGDKq5ENmttw6xkodVJy0uVhf7HhWvQgAOH6g= github.com/fsouza/fake-gcs-server v1.52.3/go.mod h1:A0XtSRX+zz5pLRAt88j9+Of0omQQW+RMqipFbvdNclQ= github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A= diff --git a/main.go b/main.go index 74d0476a7..edc22940e 100644 --- a/main.go +++ b/main.go @@ -84,6 +84,8 @@ var ( proxyTo string serviceName = getServiceName() + packagePathsEnableWatcher = false + defaultConfig = Config{ CacheTimeIndex: 10 * time.Second, CacheTimeSearch: 10 * time.Minute, @@ -128,6 +130,8 @@ func init() { // The following proxy-indexer related flags are technical preview and might be removed in the future or renamed flag.BoolVar(&featureProxyMode, "feature-proxy-mode", false, "Enable proxy mode to include packages from other endpoint (technical preview).") flag.StringVar(&proxyTo, "proxy-to", "https://epr.elastic.co/", "Proxy-to endpoint") + + flag.BoolVar(&packagePathsEnableWatcher, "package-paths-enable-watcher", false, "Enable file system watcher for package paths to automatically detect new packages.") } type Config struct { @@ -138,9 +142,9 @@ type Config struct { CacheTimeCatchAll time.Duration `config:"cache_time.catch_all"` SQLIndexerDatabaseFolderPath string `config:"sql_indexer.database_folder_path"` // technical preview, used by the SQL storage indexer SearchCacheSize int `config:"search.cache_size"` // technical preview, used by the SQL storage indexer - SearchCacheTTL time.Duration `config:"search.cache_ttl"` // technical preview, used by the SQL storage indexe^ + SearchCacheTTL time.Duration `config:"search.cache_ttl"` // technical preview, used by the SQL storage indexer CategoriesCacheSize int `config:"categories.cache_size"` // technical preview, used by the SQL storage indexer - CategoriesCacheTTL time.Duration `config:"categories.cache_ttl"` // technical preview, used by the SQL storage indexe^ + CategoriesCacheTTL time.Duration `config:"categories.cache_ttl"` // technical preview, used by the SQL storage indexer } func main() { @@ -179,6 +183,10 @@ func main() { config := mustLoadConfig(logger) + if len(config.PackagePaths) == 0 && packagePathsEnableWatcher { + logger.Fatal("no package paths configured, cannot enable watcher") + } + options := serverOptions{ apmTracer: apmTracer, config: config, @@ -388,9 +396,15 @@ func initIndexer(ctx context.Context, logger *zap.Logger, options serverOptions) combined = append(combined, indexer) } + fsOptions := packages.FSIndexerOptions{ + Logger: logger, + EnablePathsWatcher: packagePathsEnableWatcher, + APMTracer: options.apmTracer, + } + combined = append(combined, - packages.NewZipFileSystemIndexer(logger, packagesBasePaths...), - packages.NewFileSystemIndexer(logger, packagesBasePaths...), + packages.NewZipFileSystemIndexer(fsOptions, packagesBasePaths...), + packages.NewFileSystemIndexer(fsOptions, packagesBasePaths...), ) ensurePackagesAvailable(ctx, logger, combined) return combined @@ -574,6 +588,7 @@ func getConfig(logger *zap.Logger) (*Config, error) { } config.CategoriesCacheTTL = cacheTTL } + return &config, nil } @@ -619,6 +634,8 @@ func ensurePackagesAvailable(ctx context.Context, logger *zap.Logger, indexer In logger.Info(fmt.Sprintf("%v local package manifests loaded.", len(packages))) } else if featureProxyMode { logger.Info("No local packages found, but the proxy mode can access remote ones.") + } else if packagePathsEnableWatcher { + logger.Warn("No packages found at startup. The registry is running but no content is available yet.") } else { logger.Fatal("No local packages found.") } diff --git a/main_test.go b/main_test.go index 1dc98a8ab..8418de800 100644 --- a/main_test.go +++ b/main_test.go @@ -60,11 +60,14 @@ func TestRouter(t *testing.T) { func TestEndpoints(t *testing.T) { t.Parallel() + fsOpts := packages.FSIndexerOptions{ + Logger: testLogger, + } packagesBasePaths := []string{"./testdata/second_package_path", "./testdata/package"} indexer := NewCombinedIndexer( - packages.NewZipFileSystemIndexer(testLogger, "./testdata/local-storage"), - packages.NewFileSystemIndexer(testLogger, packagesBasePaths...), + packages.NewZipFileSystemIndexer(fsOpts, "./testdata/local-storage"), + packages.NewFileSystemIndexer(fsOpts, packagesBasePaths...), ) t.Cleanup(func() { indexer.Close(context.Background()) }) @@ -171,8 +174,12 @@ func TestEndpoints(t *testing.T) { func TestArtifacts(t *testing.T) { t.Parallel() + fsOpts := packages.FSIndexerOptions{ + Logger: testLogger, + } + packagesBasePaths := []string{"./testdata/package"} - indexer := packages.NewFileSystemIndexer(testLogger, packagesBasePaths...) + indexer := packages.NewFileSystemIndexer(fsOpts, packagesBasePaths...) t.Cleanup(func() { indexer.Close(context.Background()) }) err := indexer.Init(t.Context()) @@ -203,7 +210,10 @@ func TestArtifacts(t *testing.T) { func TestSignatures(t *testing.T) { t.Parallel() - indexer := packages.NewZipFileSystemIndexer(testLogger, "./testdata/local-storage") + fsOpts := packages.FSIndexerOptions{ + Logger: testLogger, + } + indexer := packages.NewZipFileSystemIndexer(fsOpts, "./testdata/local-storage") t.Cleanup(func() { indexer.Close(context.Background()) }) err := indexer.Init(t.Context()) @@ -232,8 +242,11 @@ func TestSignatures(t *testing.T) { func TestStatics(t *testing.T) { t.Parallel() + fsOpts := packages.FSIndexerOptions{ + Logger: testLogger, + } packagesBasePaths := []string{"./testdata/package"} - indexer := packages.NewFileSystemIndexer(testLogger, packagesBasePaths...) + indexer := packages.NewFileSystemIndexer(fsOpts, packagesBasePaths...) t.Cleanup(func() { indexer.Close(context.Background()) }) err := indexer.Init(t.Context()) @@ -324,9 +337,13 @@ func TestStaticsModifiedTime(t *testing.T) { }, } + fsOpts := packages.FSIndexerOptions{ + Logger: testLogger, + } + indexer := NewCombinedIndexer( - packages.NewZipFileSystemIndexer(testLogger, "./testdata/local-storage"), - packages.NewFileSystemIndexer(testLogger, "./testdata/package"), + packages.NewZipFileSystemIndexer(fsOpts, "./testdata/local-storage"), + packages.NewFileSystemIndexer(fsOpts, "./testdata/package"), ) t.Cleanup(func() { indexer.Close(context.Background()) }) @@ -361,7 +378,11 @@ func TestStaticsModifiedTime(t *testing.T) { func TestZippedArtifacts(t *testing.T) { t.Parallel() - indexer := packages.NewZipFileSystemIndexer(testLogger, "./testdata/local-storage") + fsOpts := packages.FSIndexerOptions{ + Logger: testLogger, + } + + indexer := packages.NewZipFileSystemIndexer(fsOpts, "./testdata/local-storage") t.Cleanup(func() { indexer.Close(context.Background()) }) err := indexer.Init(t.Context()) @@ -395,10 +416,12 @@ func TestZippedArtifacts(t *testing.T) { func TestPackageIndex(t *testing.T) { t.Parallel() - + fsOpts := packages.FSIndexerOptions{ + Logger: testLogger, + } indexer := NewCombinedIndexer( - packages.NewZipFileSystemIndexer(testLogger, "./testdata/local-storage"), - packages.NewFileSystemIndexer(testLogger, "./testdata/package"), + packages.NewZipFileSystemIndexer(fsOpts, "./testdata/local-storage"), + packages.NewFileSystemIndexer(fsOpts, "./testdata/package"), ) t.Cleanup(func() { indexer.Close(context.Background()) }) @@ -435,8 +458,11 @@ func TestPackageIndex(t *testing.T) { func TestZippedPackageIndex(t *testing.T) { t.Parallel() + fsOpts := packages.FSIndexerOptions{ + Logger: testLogger, + } packagesBasePaths := []string{"./testdata/local-storage"} - indexer := packages.NewZipFileSystemIndexer(testLogger, packagesBasePaths...) + indexer := packages.NewZipFileSystemIndexer(fsOpts, packagesBasePaths...) t.Cleanup(func() { indexer.Close(context.Background()) }) err := indexer.Init(t.Context()) @@ -468,10 +494,14 @@ func TestZippedPackageIndex(t *testing.T) { func TestAllPackageIndex(t *testing.T) { t.Parallel() + fsOpts := packages.FSIndexerOptions{ + Logger: testLogger, + } + testPackagePath := filepath.Join("testdata", "package") secondPackagePath := filepath.Join("testdata", "second_package_path") packagesBasePaths := []string{secondPackagePath, testPackagePath} - indexer := packages.NewFileSystemIndexer(testLogger, packagesBasePaths...) + indexer := packages.NewFileSystemIndexer(fsOpts, packagesBasePaths...) t.Cleanup(func() { indexer.Close(context.Background()) }) err := indexer.Init(t.Context()) @@ -528,9 +558,13 @@ func TestContentTypes(t *testing.T) { {"/package/example/1.0.1/img/kibana-envoyproxy.jpg", "image/jpeg"}, } + fsOpts := packages.FSIndexerOptions{ + Logger: testLogger, + } + indexer := NewCombinedIndexer( - packages.NewZipFileSystemIndexer(testLogger, "./testdata/local-storage"), - packages.NewFileSystemIndexer(testLogger, "./testdata/package"), + packages.NewZipFileSystemIndexer(fsOpts, "./testdata/local-storage"), + packages.NewFileSystemIndexer(fsOpts, "./testdata/package"), ) t.Cleanup(func() { indexer.Close(context.Background()) }) @@ -562,9 +596,13 @@ func TestContentTypes(t *testing.T) { func TestRangeDownloads(t *testing.T) { t.Parallel() + fsOpts := packages.FSIndexerOptions{ + Logger: testLogger, + } + indexer := NewCombinedIndexer( - packages.NewZipFileSystemIndexer(testLogger, "./testdata/local-storage"), - packages.NewFileSystemIndexer(testLogger, "./testdata/package"), + packages.NewZipFileSystemIndexer(fsOpts, "./testdata/local-storage"), + packages.NewFileSystemIndexer(fsOpts, "./testdata/package"), ) t.Cleanup(func() { indexer.Close(context.Background()) }) diff --git a/packages/marshaler_test.go b/packages/marshaler_test.go index accf8396d..db3ec8daa 100644 --- a/packages/marshaler_test.go +++ b/packages/marshaler_test.go @@ -23,7 +23,10 @@ var generateFlag = flag.Bool("generate", false, "Write golden files") func TestMarshalJSON(t *testing.T) { // given packagesBasePaths := []string{"../testdata/second_package_path", "../testdata/package"} - indexer := NewFileSystemIndexer(util.NewTestLogger(), packagesBasePaths...) + fsOpts := FSIndexerOptions{ + Logger: util.NewTestLogger(), + } + indexer := NewFileSystemIndexer(fsOpts, packagesBasePaths...) defer indexer.Close(t.Context()) err := indexer.Init(t.Context()) @@ -39,8 +42,12 @@ func TestMarshalJSON(t *testing.T) { func TestUnmarshalJSON(t *testing.T) { // given + + fsOpts := FSIndexerOptions{ + Logger: util.NewTestLogger(), + } packagesBasePaths := []string{"../testdata/second_package_path", "../testdata/package"} - indexer := NewFileSystemIndexer(util.NewTestLogger(), packagesBasePaths...) + indexer := NewFileSystemIndexer(fsOpts, packagesBasePaths...) defer indexer.Close(t.Context()) err := indexer.Init(t.Context()) diff --git a/packages/packages.go b/packages/packages.go index 0c1f06d56..defc96048 100644 --- a/packages/packages.go +++ b/packages/packages.go @@ -12,9 +12,11 @@ import ( "path/filepath" "slices" "strings" + "sync" "time" "github.com/Masterminds/semver/v3" + "github.com/fsnotify/fsnotify" "github.com/prometheus/client_golang/prometheus" "go.elastic.co/apm/v2" @@ -23,6 +25,11 @@ import ( "github.com/elastic/package-registry/metrics" ) +const ( + zipFileSystemIndexerName = "ZipFileSystemIndexer" + fileSystemIndexerName = "FileSystemIndexer" +) + // ValidationDisabled is a flag which can disable package content validation (package, data streams, assets, etc.). var ValidationDisabled bool @@ -109,10 +116,22 @@ type FileSystemIndexer struct { fsBuilder FileSystemBuilder logger *zap.Logger + + enablePathsWatcher bool + + m sync.RWMutex + + apmTracer *apm.Tracer +} + +type FSIndexerOptions struct { + Logger *zap.Logger + EnablePathsWatcher bool + APMTracer *apm.Tracer } // NewFileSystemIndexer creates a new FileSystemIndexer for the given paths. -func NewFileSystemIndexer(logger *zap.Logger, paths ...string) *FileSystemIndexer { +func NewFileSystemIndexer(options FSIndexerOptions, paths ...string) *FileSystemIndexer { walkerFn := func(basePath, path string, info os.DirEntry) (bool, error) { relativePath, err := filepath.Rel(basePath, path) if err != nil { @@ -128,7 +147,7 @@ func NewFileSystemIndexer(logger *zap.Logger, paths ...string) *FileSystemIndexe versionDir := dirs[1] _, err := semver.StrictNewVersion(versionDir) if err != nil { - logger.Warn("ignoring unexpected directory", + options.Logger.Warn("ignoring unexpected directory", zap.String("file.path", path)) return false, filepath.SkipDir } @@ -137,15 +156,17 @@ func NewFileSystemIndexer(logger *zap.Logger, paths ...string) *FileSystemIndexe // Unexpected file, return nil in order to continue processing sibling directories // Fixes an annoying problem when the .DS_Store file is left behind and the package // is not loading without any error information - logger.Warn("ignoring unexpected file", zap.String("file.path", path)) + options.Logger.Warn("ignoring unexpected file", zap.String("file.path", path)) return false, nil } return &FileSystemIndexer{ - paths: paths, - label: "FileSystemIndexer", - walkerFn: walkerFn, - fsBuilder: ExtractedFileSystemBuilder, - logger: logger, + paths: paths, + label: fileSystemIndexerName, + walkerFn: walkerFn, + fsBuilder: ExtractedFileSystemBuilder, + logger: options.Logger, + enablePathsWatcher: options.EnablePathsWatcher, + apmTracer: options.APMTracer, } } @@ -154,7 +175,7 @@ var ExtractedFileSystemBuilder = func(p *Package) (PackageFileSystem, error) { } // NewZipFileSystemIndexer creates a new ZipFileSystemIndexer for the given paths. -func NewZipFileSystemIndexer(logger *zap.Logger, paths ...string) *FileSystemIndexer { +func NewZipFileSystemIndexer(options FSIndexerOptions, paths ...string) *FileSystemIndexer { walkerFn := func(basePath, path string, info os.DirEntry) (bool, error) { if info.IsDir() { return false, nil @@ -166,7 +187,7 @@ func NewZipFileSystemIndexer(logger *zap.Logger, paths ...string) *FileSystemInd // Check if the file is actually a zip file. r, err := zip.OpenReader(path) if err != nil { - logger.Warn("ignoring invalid zip file", + options.Logger.Warn("ignoring invalid zip file", zap.String("file.path", path), zap.Error(err)) return false, nil } @@ -175,11 +196,13 @@ func NewZipFileSystemIndexer(logger *zap.Logger, paths ...string) *FileSystemInd return true, nil } return &FileSystemIndexer{ - paths: paths, - label: "ZipFileSystemIndexer", - walkerFn: walkerFn, - fsBuilder: ZipFileSystemBuilder, - logger: logger, + paths: paths, + label: zipFileSystemIndexerName, + walkerFn: walkerFn, + fsBuilder: ZipFileSystemBuilder, + logger: options.Logger, + enablePathsWatcher: options.EnablePathsWatcher, + apmTracer: options.APMTracer, } } @@ -189,10 +212,104 @@ var ZipFileSystemBuilder = func(p *Package) (PackageFileSystem, error) { // Init initializes the indexer. func (i *FileSystemIndexer) Init(ctx context.Context) (err error) { - i.packageList, err = i.getPackagesFromFileSystem(ctx) + if err := i.updatePackageFileSystemIndex(ctx); err != nil { + i.logger.Error("initializing package filesystem index failed", + zap.Error(err), + zap.String("indexer", i.label)) + return err + } + + if i.enablePathsWatcher { + // removing current transaction as we are starting a new one at watcher + go i.watchPackageFileSystem(apm.ContextWithTransaction(ctx, nil)) + } + return nil +} + +func (i *FileSystemIndexer) watchPackageFileSystem(ctx context.Context) { + // TODO: https://github.com/elastic/package-registry/issues/1488 + watcher, err := fsnotify.NewWatcher() if err != nil { - return fmt.Errorf("reading packages from filesystem failed: %w", err) + i.logger.Error("failed to create fsnotify watcher", zap.String("indexer", i.label), zap.Error(err)) + return + } + defer watcher.Close() + + if i.apmTracer == nil { + i.apmTracer = apm.DefaultTracer() } + + for _, path := range i.paths { + if err := watcher.Add(path); err != nil { + i.logger.Error("failed to watch path", zap.String("path", path), zap.String("indexer", i.label), zap.Error(err)) + return + } + i.logger.Debug("watching path for changes", zap.String("path", path), zap.String("indexer", i.label)) + } + + debouncer := time.NewTimer(0) + debouncer.Stop() + defer debouncer.Stop() + + for { + select { + case <-ctx.Done(): + i.logger.Info("stopping filesystem watcher", zap.String("indexer", i.label)) + return + case event, ok := <-watcher.Events: + if !ok { + return + } + + // Watching for create, write, rename and remove events + // https://pkg.go.dev/github.com/fsnotify/fsnotify@v1.9.0#Watcher + if !event.Has(fsnotify.Create) && !event.Has(fsnotify.Write) && + !event.Has(fsnotify.Rename) && !event.Has(fsnotify.Remove) { + continue + } + // skip events that are not relevant for this indexer + if (i.label == zipFileSystemIndexerName && !strings.HasSuffix(event.Name, ".zip")) || + (i.label == fileSystemIndexerName && strings.HasSuffix(event.Name, ".zip")) { + i.logger.Debug("skipping event at indexer", zap.String("indexer", i.label)) + continue + } + + i.logger.Debug("filesystem change detected", zap.String("event", event.String()), zap.String("indexer", i.label)) + const debounceDelay = 1 * time.Second + debouncer.Reset(debounceDelay) + case <-debouncer.C: + tx := i.apmTracer.StartTransaction("updateFSIndex", "backend.watcher") + defer tx.End() + + ctx := apm.ContextWithTransaction(ctx, tx) + // only when debouncer fires, we update the index + // debouncer only fires when no new events arrive during the debounceDelay + if err := i.updatePackageFileSystemIndex(ctx); err != nil { + i.logger.Error("updating package filesystem index failed", + zap.Error(err), + zap.String("indexer", i.label)) + } else { + i.logger.Info("package filesystem index updated", + zap.String("indexer", i.label)) + } + case err, ok := <-watcher.Errors: + if !ok { + return + } + i.logger.Error("fsnotify watcher error", zap.Error(err), zap.String("indexer", i.label)) + } + } +} + +func (i *FileSystemIndexer) updatePackageFileSystemIndex(ctx context.Context) error { + i.m.Lock() + defer i.m.Unlock() + + newPackageList, err := i.getPackagesFromFileSystem(ctx) + if err != nil { + return err + } + i.packageList = newPackageList return nil } @@ -211,6 +328,9 @@ func (i *FileSystemIndexer) Get(ctx context.Context, opts *GetOptions) (Packages span.Context.SetLabel("indexer", i.label) defer span.End() + i.m.RLock() + defer i.m.RUnlock() + if opts == nil { return i.packageList, nil } diff --git a/packages/packages_test.go b/packages/packages_test.go index 7028885e4..19909bd08 100644 --- a/packages/packages_test.go +++ b/packages/packages_test.go @@ -5,10 +5,19 @@ package packages import ( + "context" + "fmt" + "os" + "path/filepath" "testing" + "time" "github.com/Masterminds/semver/v3" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/elastic/package-registry/archiver" ) func TestPackagesFilter(t *testing.T) { @@ -779,6 +788,246 @@ func TestPackagesSpecMinMaxFilter(t *testing.T) { } } +func TestFileSystemIndexer_watchPackageFileSystem(t *testing.T) { + t.Run("context cancellation stops watcher", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ValidationDisabled = true + + tmpDir := t.TempDir() + logger := zap.NewNop() + + indexer := NewFileSystemIndexer(FSIndexerOptions{ + Logger: logger, + EnablePathsWatcher: true, + }, tmpDir) + + indexer.packageList = make(Packages, 0) + + done := make(chan struct{}) + go func() { + indexer.watchPackageFileSystem(ctx) + close(done) + }() + + // Cancel context and verify watcher stops + cancel() + + select { + case <-done: + // Success - watcher stopped + case <-time.After(2 * time.Second): + t.Fatal("watcher did not stop after context cancellation") + } + }) + + t.Run("file indexer debouncing filesystem events", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ValidationDisabled = true + tmpDir := t.TempDir() + logger := zap.NewNop() + + indexer := NewFileSystemIndexer(FSIndexerOptions{ + Logger: logger, + EnablePathsWatcher: true, + }, tmpDir) + + indexer.packageList = make(Packages, 0) + + done := make(chan struct{}) + go func() { + indexer.watchPackageFileSystem(ctx) + close(done) + }() + + // Give watcher time to initialize + time.Sleep(100 * time.Millisecond) + + for i := 0; i < 25; i++ { + createMockPackage(t, tmpDir, fmt.Sprintf("mypackage%d", i)) + } + + // Wait for debounce period plus buffer + time.Sleep(1500 * time.Millisecond) + + cancel() + <-done + + assert.Len(t, indexer.packageList, 25) + }) + + t.Run("zip indexer debouncing filesystem events", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ValidationDisabled = true + tmpDir := t.TempDir() + logger := zap.NewNop() + + indexer := NewZipFileSystemIndexer(FSIndexerOptions{ + Logger: logger, + EnablePathsWatcher: true, + }, tmpDir) + + indexer.packageList = make(Packages, 0) + + done := make(chan struct{}) + go func() { + indexer.watchPackageFileSystem(ctx) + close(done) + }() + + // Give watcher time to initialize + time.Sleep(100 * time.Millisecond) + + for i := 0; i < 25; i++ { + path := createMockZipPackage(t, tmpDir, fmt.Sprintf("mypackage%d", i)) + defer func() { + err := os.Remove(path) + require.NoError(t, err) + }() + } + + // Wait for debounce period plus buffer + time.Sleep(1500 * time.Millisecond) + + cancel() + <-done + + assert.Len(t, indexer.packageList, 25) + }) + + t.Run("zip indexer ignores non-zip files", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ValidationDisabled = true + + tmpDir := t.TempDir() + logger := zap.NewNop() + + indexer := NewZipFileSystemIndexer(FSIndexerOptions{ + Logger: logger, + EnablePathsWatcher: true, + }, tmpDir) + + indexer.packageList = make(Packages, 0) + + done := make(chan struct{}) + go func() { + indexer.watchPackageFileSystem(ctx) + close(done) + }() + + time.Sleep(100 * time.Millisecond) + + // Create non-zip file (should be ignored by zip indexer) + createMockPackage(t, tmpDir, "mypackage") + time.Sleep(1500 * time.Millisecond) + assert.Empty(t, indexer.packageList) + + path := createMockZipPackage(t, tmpDir, "mypackage") + defer func() { + err := os.Remove(path) + require.NoError(t, err) + }() + time.Sleep(1500 * time.Millisecond) + + cancel() + <-done + + assert.Len(t, indexer.packageList, 1) + }) + + t.Run("file system indexer ignores zip files", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ValidationDisabled = true + + tmpDir := t.TempDir() + logger := zap.NewNop() + + indexer := NewFileSystemIndexer(FSIndexerOptions{ + Logger: logger, + EnablePathsWatcher: true, + }, tmpDir) + + indexer.packageList = make(Packages, 0) + + done := make(chan struct{}) + go func() { + indexer.watchPackageFileSystem(ctx) + close(done) + }() + + time.Sleep(100 * time.Millisecond) + + // Create zip file (should be ignored by regular file system indexer) + path := createMockZipPackage(t, tmpDir, "mypackage") + defer func() { + err := os.Remove(path) + require.NoError(t, err) + }() + time.Sleep(1500 * time.Millisecond) + assert.Empty(t, indexer.packageList) + + createMockPackage(t, tmpDir, "mypackage") + time.Sleep(1500 * time.Millisecond) + + cancel() + <-done + + assert.Len(t, indexer.packageList, 1) + }) +} + +func createMockPackage(t *testing.T, dest, pkgName string) { + t.Helper() + + pkgDir := filepath.Join(dest, pkgName, "1.0.0") + err := os.MkdirAll(pkgDir, 0755) + require.NoError(t, err) + + manifestContent := `name: ` + pkgName + ` +version: 1.0.0 +type: integration +format_version: 1.0.0 +` + err = os.WriteFile(filepath.Join(pkgDir, "manifest.yml"), []byte(manifestContent), 0644) + require.NoError(t, err) + + docsDir := filepath.Join(pkgDir, "docs") + err = os.MkdirAll(docsDir, 0755) + require.NoError(t, err) + + err = os.WriteFile(filepath.Join(docsDir, "README.md"), []byte("# My Package\nThis is a test package."), 0644) + require.NoError(t, err) +} + +func createMockZipPackage(t *testing.T, dest, pkgName string) string { + t.Helper() + + tmpMockFSDir := t.TempDir() + createMockPackage(t, tmpMockFSDir, pkgName) + + zipFilepath := filepath.Join(dest, pkgName+"-1.0.0.zip") + file, err := os.Create(zipFilepath) + require.NoError(t, err) + + err = archiver.ArchivePackage(file, archiver.PackageProperties{ + Name: pkgName, + Version: "1.0.0", + Path: filepath.Join(tmpMockFSDir, pkgName, "1.0.0"), + }) + require.NoError(t, err) + + err = file.Close() + require.NoError(t, err) + + return zipFilepath +} + func mustBuildDiscoveryFilter(filters []string) discoveryFilters { discoveryFilters := make([]*discoveryFilter, 0, len(filters)) for _, filter := range filters { diff --git a/search_test.go b/search_test.go index 2e1d8e36c..fd3d4a76f 100644 --- a/search_test.go +++ b/search_test.go @@ -94,10 +94,14 @@ func TestSearchWithProxyMode(t *testing.T) { })) defer webServer.Close() + fsOpts := packages.FSIndexerOptions{ + Logger: testLogger, + } + packagesBasePaths := []string{"./testdata/second_package_path", "./testdata/package"} indexer := NewCombinedIndexer( - packages.NewZipFileSystemIndexer(testLogger, "./testdata/local-storage"), - packages.NewFileSystemIndexer(testLogger, packagesBasePaths...), + packages.NewZipFileSystemIndexer(fsOpts, "./testdata/local-storage"), + packages.NewFileSystemIndexer(fsOpts, packagesBasePaths...), ) defer indexer.Close(t.Context())