From 6ed5f330320b29bc6bd8bd1c68b2f49da07ae221 Mon Sep 17 00:00:00 2001 From: Mario Rodriguez Molins Date: Tue, 25 Nov 2025 12:23:33 +0100 Subject: [PATCH 01/14] Load in parallel packages from filesystem --- packages/packages.go | 106 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 86 insertions(+), 20 deletions(-) diff --git a/packages/packages.go b/packages/packages.go index 0c1f06d56..b454fbf55 100644 --- a/packages/packages.go +++ b/packages/packages.go @@ -7,11 +7,14 @@ package packages import ( "archive/zip" "context" + "errors" "fmt" "os" "path/filepath" + "runtime" "slices" "strings" + "sync" "time" "github.com/Masterminds/semver/v3" @@ -235,40 +238,103 @@ func (i *FileSystemIndexer) getPackagesFromFileSystem(ctx context.Context) (Pack name string version string } + type packageJob struct { + position int + path string + } + numWorkers := runtime.GOMAXPROCS(0) / 2 + mu := sync.Mutex{} packagesFound := make(map[packageKey]struct{}) + jobChan := make(chan packageJob) + errChan := make(chan error) - var pList Packages + count := 0 for _, basePath := range i.paths { packagePaths, err := i.getPackagePaths(basePath) if err != nil { return nil, err } + count += len(packagePaths) + } + pList := make(Packages, count) - i.logger.Info("Searching packages in " + basePath) - for _, path := range packagePaths { - p, err := NewPackage(i.logger, path, i.fsBuilder) - if err != nil { - return nil, fmt.Errorf("loading package failed (path: %s): %w", path, err) - } + var wg sync.WaitGroup - key := packageKey{name: p.Name, version: p.Version} - if _, found := packagesFound[key]; found { - i.logger.Debug("duplicated package", - zap.String("package.name", p.Name), - zap.String("package.version", p.Version), - zap.String("package.path", p.BasePath)) - continue + i.logger.Info("Searching packages in filesystem", zap.String("indexer", i.label)) + // Start workers + for range numWorkers { + wg.Add(1) + go func(logger *zap.Logger, fsBuilder FileSystemBuilder) { + defer wg.Done() + for pkgJob := range jobChan { + p, err := NewPackage(logger, pkgJob.path, fsBuilder) + if err != nil { + errChan <- fmt.Errorf("loading package failed (path: %s): %w", pkgJob.path, err) + continue + } + + key := packageKey{name: p.Name, version: p.Version} + func() { + mu.Lock() + defer mu.Unlock() + if _, found := packagesFound[key]; found { + i.logger.Debug("duplicated package", + zap.String("package.name", p.Name), + zap.String("package.version", p.Version), + zap.String("package.path", p.BasePath)) + return + } + + packagesFound[key] = struct{}{} + pList[pkgJob.position] = p + + i.logger.Debug("found package", + zap.String("package.name", p.Name), + zap.String("package.version", p.Version), + zap.String("package.path", p.BasePath)) + + }() } + }(i.logger, i.fsBuilder) + } - packagesFound[key] = struct{}{} - pList = append(pList, p) + count = 0 + for _, basePath := range i.paths { + packagePaths, err := i.getPackagePaths(basePath) + if err != nil { + return nil, err + } + for _, path := range packagePaths { + jobChan <- packageJob{position: count, path: path} + count++ + } + } + close(jobChan) + + wg.Wait() - i.logger.Debug("found package", - zap.String("package.name", p.Name), - zap.String("package.version", p.Version), - zap.String("package.path", p.BasePath)) + close(errChan) + + var multiErr error + for err := range errChan { + multiErr = errors.Join(multiErr, err) + } + + if multiErr != nil { + return nil, multiErr + } + + // Remove null entries in case of duplicated packages + current := 0 + for _, p := range pList { + if p != nil { + pList[current] = p + current++ } } + pList = pList[:current] + i.logger.Info("Searching packages in filesystem done", zap.String("indexer", i.label)) + return pList, nil } From c9eff98578d99455cfa5a3270b0f15c85ced639a Mon Sep 17 00:00:00 2001 From: Mario Rodriguez Molins Date: Wed, 26 Nov 2025 17:06:32 +0100 Subject: [PATCH 02/14] Add benchmark --- packages/packages_test.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/packages/packages_test.go b/packages/packages_test.go index 7028885e4..55dfaa6a8 100644 --- a/packages/packages_test.go +++ b/packages/packages_test.go @@ -9,8 +9,34 @@ import ( "github.com/Masterminds/semver/v3" "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" + + "github.com/elastic/package-registry/internal/util" ) +func BenchmarkInit(b *testing.B) { + // given + packagesBasePaths := []string{"../testdata/second_package_path", "../testdata/package"} + + testLogger := util.NewTestLoggerLevel(zapcore.FatalLevel) + b.ResetTimer() + for i := 0; i < b.N; i++ { + zipIndexer := NewZipFileSystemIndexer(testLogger, "../testdata/local-storage") + dirIndexer := NewFileSystemIndexer(testLogger, packagesBasePaths...) + + err := zipIndexer.Init(b.Context()) + require.NoError(b, err) + + err = dirIndexer.Init(b.Context()) + require.NoError(b, err) + + b.StopTimer() + require.NoError(b, zipIndexer.Close(b.Context())) + require.NoError(b, dirIndexer.Close(b.Context())) + b.StartTimer() + } +} + func TestPackagesFilter(t *testing.T) { filterTestPackages := []filterTestPackage{ { From 01407b79b284ae738036fe47a5f28ff43f34df0e Mon Sep 17 00:00:00 2001 From: Mario Rodriguez Molins Date: Wed, 26 Nov 2025 17:07:07 +0100 Subject: [PATCH 03/14] Refactor workers logic into functions - moved to an internal package --- internal/workers/taskpool.go | 55 +++++++++++++++++++++++++++++ packages/packages.go | 68 ++++++++++++------------------------ 2 files changed, 78 insertions(+), 45 deletions(-) create mode 100644 internal/workers/taskpool.go diff --git a/internal/workers/taskpool.go b/internal/workers/taskpool.go new file mode 100644 index 000000000..a3b35abc3 --- /dev/null +++ b/internal/workers/taskpool.go @@ -0,0 +1,55 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package workers + +import ( + "errors" + "sync" +) + +type taskPool struct { + wg sync.WaitGroup + pool chan struct{} + errC chan error + errors []error +} + +func NewTaskPool(size int) *taskPool { + p := &taskPool{ + pool: make(chan struct{}, size), + errC: make(chan error), + } + go p.errorLoop() + return p +} + +func (p *taskPool) errorLoop() { + go func() { + for err := range p.errC { + if err != nil { + p.errors = append(p.errors, err) + } + } + }() +} + +// Do runs the task in a goroutine, ensuring no more tasks are running than the size of the pool. +func (p *taskPool) Do(task func() error) { + p.pool <- struct{}{} + p.wg.Add(1) + go func() { + defer func() { <-p.pool }() + defer p.wg.Done() + p.errC <- task() + }() +} + +// Wait waits for all the tasks to finish, and joins the errors found. The pool cannot be used after calling Wait. +func (p *taskPool) Wait() error { + close(p.pool) + p.wg.Wait() + close(p.errC) + return errors.Join(p.errors...) +} diff --git a/packages/packages.go b/packages/packages.go index b454fbf55..8047e84d3 100644 --- a/packages/packages.go +++ b/packages/packages.go @@ -7,7 +7,6 @@ package packages import ( "archive/zip" "context" - "errors" "fmt" "os" "path/filepath" @@ -23,6 +22,7 @@ import ( "go.elastic.co/apm/v2" "go.uber.org/zap" + "github.com/elastic/package-registry/internal/workers" "github.com/elastic/package-registry/metrics" ) @@ -238,15 +238,10 @@ func (i *FileSystemIndexer) getPackagesFromFileSystem(ctx context.Context) (Pack name string version string } - type packageJob struct { - position int - path string - } - numWorkers := runtime.GOMAXPROCS(0) / 2 + + numWorkers := runtime.GOMAXPROCS(0) mu := sync.Mutex{} packagesFound := make(map[packageKey]struct{}) - jobChan := make(chan packageJob) - errChan := make(chan error) count := 0 for _, basePath := range i.paths { @@ -258,19 +253,23 @@ func (i *FileSystemIndexer) getPackagesFromFileSystem(ctx context.Context) (Pack } pList := make(Packages, count) - var wg sync.WaitGroup + taskPool := workers.NewTaskPool(numWorkers) i.logger.Info("Searching packages in filesystem", zap.String("indexer", i.label)) - // Start workers - for range numWorkers { - wg.Add(1) - go func(logger *zap.Logger, fsBuilder FileSystemBuilder) { - defer wg.Done() - for pkgJob := range jobChan { - p, err := NewPackage(logger, pkgJob.path, fsBuilder) + count = 0 + for _, basePath := range i.paths { + packagePaths, err := i.getPackagePaths(basePath) + if err != nil { + return nil, err + } + for _, p := range packagePaths { + position := count + path := p + count++ + taskPool.Do(func() error { + p, err := NewPackage(i.logger, path, i.fsBuilder) if err != nil { - errChan <- fmt.Errorf("loading package failed (path: %s): %w", pkgJob.path, err) - continue + return fmt.Errorf("loading package failed (path: %s): %w", path, err) } key := packageKey{name: p.Name, version: p.Version} @@ -286,7 +285,7 @@ func (i *FileSystemIndexer) getPackagesFromFileSystem(ctx context.Context) (Pack } packagesFound[key] = struct{}{} - pList[pkgJob.position] = p + pList[position] = p i.logger.Debug("found package", zap.String("package.name", p.Name), @@ -294,34 +293,13 @@ func (i *FileSystemIndexer) getPackagesFromFileSystem(ctx context.Context) (Pack zap.String("package.path", p.BasePath)) }() - } - }(i.logger, i.fsBuilder) - } - - count = 0 - for _, basePath := range i.paths { - packagePaths, err := i.getPackagePaths(basePath) - if err != nil { - return nil, err + return nil + }) } - for _, path := range packagePaths { - jobChan <- packageJob{position: count, path: path} - count++ - } - } - close(jobChan) - - wg.Wait() - - close(errChan) - - var multiErr error - for err := range errChan { - multiErr = errors.Join(multiErr, err) } - if multiErr != nil { - return nil, multiErr + if err := taskPool.Wait(); err != nil { + return nil, err } // Remove null entries in case of duplicated packages @@ -333,7 +311,7 @@ func (i *FileSystemIndexer) getPackagesFromFileSystem(ctx context.Context) (Pack } } pList = pList[:current] - i.logger.Info("Searching packages in filesystem done", zap.String("indexer", i.label)) + i.logger.Info("Searching packages in filesystem done", zap.String("indexer", i.label), zap.Int("packages.size", len(pList))) return pList, nil } From 024d5d30fbc932397baff42f10e02f70fe339935 Mon Sep 17 00:00:00 2001 From: Mario Rodriguez Molins Date: Wed, 26 Nov 2025 17:58:17 +0100 Subject: [PATCH 04/14] Add changelog entries --- CHANGELOG.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 061047de9..ee86ba33c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,10 +8,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Breaking changes -### Bugfixes +### Bugfixes + +* Aded missing mapping for azure/metrics input. [#1474](https://github.com/elastic/package-registry/pull/1474) ### Added +* Load local packages in parallel. [#1489](https://github.com/elastic/package-registry/pull/1489) + ### Deprecated ### Known Issues From 9e0a0d0e536086459943eee5e2f52b5acd7e2d3d Mon Sep 17 00:00:00 2001 From: Mario Rodriguez Molins Date: Wed, 26 Nov 2025 18:03:03 +0100 Subject: [PATCH 05/14] Rephrase changelog entry --- CHANGELOG.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ee86ba33c..4b295bd13 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,13 +8,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Breaking changes -### Bugfixes +### Bugfixes -* Aded missing mapping for azure/metrics input. [#1474](https://github.com/elastic/package-registry/pull/1474) +* Added missing mapping for azure/metrics input. [#1474](https://github.com/elastic/package-registry/pull/1474) ### Added -* Load local packages in parallel. [#1489](https://github.com/elastic/package-registry/pull/1489) +* Optimized package loading by reading package ZIP archives and package folders concurrently. [#1489](https://github.com/elastic/package-registry/pull/1489) ### Deprecated From 9241261cbc897f484af6ddc118035e2e6356461d Mon Sep 17 00:00:00 2001 From: Mario Rodriguez Molins Date: Wed, 26 Nov 2025 18:56:08 +0100 Subject: [PATCH 06/14] Ensure packages kept in the final list are the ones loaded first --- packages/packages.go | 37 ++++++++++++++++++------------------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/packages/packages.go b/packages/packages.go index 8047e84d3..c353431e8 100644 --- a/packages/packages.go +++ b/packages/packages.go @@ -13,7 +13,6 @@ import ( "runtime" "slices" "strings" - "sync" "time" "github.com/Masterminds/semver/v3" @@ -240,8 +239,6 @@ func (i *FileSystemIndexer) getPackagesFromFileSystem(ctx context.Context) (Pack } numWorkers := runtime.GOMAXPROCS(0) - mu := sync.Mutex{} - packagesFound := make(map[packageKey]struct{}) count := 0 for _, basePath := range i.paths { @@ -272,19 +269,7 @@ func (i *FileSystemIndexer) getPackagesFromFileSystem(ctx context.Context) (Pack return fmt.Errorf("loading package failed (path: %s): %w", path, err) } - key := packageKey{name: p.Name, version: p.Version} func() { - mu.Lock() - defer mu.Unlock() - if _, found := packagesFound[key]; found { - i.logger.Debug("duplicated package", - zap.String("package.name", p.Name), - zap.String("package.version", p.Version), - zap.String("package.path", p.BasePath)) - return - } - - packagesFound[key] = struct{}{} pList[position] = p i.logger.Debug("found package", @@ -302,14 +287,28 @@ func (i *FileSystemIndexer) getPackagesFromFileSystem(ctx context.Context) (Pack return nil, err } - // Remove null entries in case of duplicated packages + // Loop through the packages and remove duplicates. + // Remove duplicates while preserving filesystem discovery order. + // Duplicate removal happens after initial loading to maintain the order packages + // are discovered in the filesystem. This ensures that when the same package version + // exists in multiple paths, we keep the version from the first path in the search order, + // not necessarily the first one loaded by the concurrent workers. current := 0 + packagesFound := make(map[packageKey]struct{}) for _, p := range pList { - if p != nil { - pList[current] = p - current++ + key := packageKey{name: p.Name, version: p.Version} + if _, found := packagesFound[key]; found { + i.logger.Debug("duplicated package", + zap.String("package.name", p.Name), + zap.String("package.version", p.Version), + zap.String("package.path", p.BasePath)) + continue } + packagesFound[key] = struct{}{} + pList[current] = p + current++ } + pList = pList[:current] i.logger.Info("Searching packages in filesystem done", zap.String("indexer", i.label), zap.Int("packages.size", len(pList))) From 4e78562cc54277f56be783dd26a0edd873fe6735 Mon Sep 17 00:00:00 2001 From: Mario Rodriguez Molins Date: Wed, 26 Nov 2025 19:03:57 +0100 Subject: [PATCH 07/14] Re-phrase comment --- packages/packages.go | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/packages.go b/packages/packages.go index c353431e8..c457ba410 100644 --- a/packages/packages.go +++ b/packages/packages.go @@ -287,7 +287,6 @@ func (i *FileSystemIndexer) getPackagesFromFileSystem(ctx context.Context) (Pack return nil, err } - // Loop through the packages and remove duplicates. // Remove duplicates while preserving filesystem discovery order. // Duplicate removal happens after initial loading to maintain the order packages // are discovered in the filesystem. This ensures that when the same package version From 21ec258d62ac2d5971ab721929291ef23c206113 Mon Sep 17 00:00:00 2001 From: Mario Rodriguez Molins Date: Thu, 27 Nov 2025 12:23:37 +0100 Subject: [PATCH 08/14] Fix parameter for filesystem constructors --- packages/packages_test.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/packages/packages_test.go b/packages/packages_test.go index c5b6e5457..d0280ae2d 100644 --- a/packages/packages_test.go +++ b/packages/packages_test.go @@ -28,9 +28,12 @@ func BenchmarkInit(b *testing.B) { testLogger := util.NewTestLoggerLevel(zapcore.FatalLevel) b.ResetTimer() + fsOptions := FSIndexerOptions{ + Logger: testLogger, + } for i := 0; i < b.N; i++ { - zipIndexer := NewZipFileSystemIndexer(testLogger, "../testdata/local-storage") - dirIndexer := NewFileSystemIndexer(testLogger, packagesBasePaths...) + zipIndexer := NewZipFileSystemIndexer(fsOptions, "../testdata/local-storage") + dirIndexer := NewFileSystemIndexer(fsOptions, packagesBasePaths...) err := zipIndexer.Init(b.Context()) require.NoError(b, err) From 44f341984612752c05657a243dca718977bf3920 Mon Sep 17 00:00:00 2001 From: Mario Rodriguez Molins Date: Thu, 27 Nov 2025 13:37:30 +0100 Subject: [PATCH 09/14] Update changelog entry descriptions --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b295bd13..6766184f0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,11 +10,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Bugfixes -* Added missing mapping for azure/metrics input. [#1474](https://github.com/elastic/package-registry/pull/1474) +* Add missing mapping for azure/metrics input. [#1474](https://github.com/elastic/package-registry/pull/1474) ### Added -* Optimized package loading by reading package ZIP archives and package folders concurrently. [#1489](https://github.com/elastic/package-registry/pull/1489) +* Optimize package loading by reading package ZIP archives and package folders concurrently. [#1489](https://github.com/elastic/package-registry/pull/1489) ### Deprecated From 6dcc8590f56a6c1795a20329858e1761d03ff907 Mon Sep 17 00:00:00 2001 From: Mario Rodriguez Molins Date: Thu, 27 Nov 2025 13:43:32 +0100 Subject: [PATCH 10/14] Assign directly GOMAXPROCS as parameter --- packages/packages.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/packages.go b/packages/packages.go index b6f085842..c19746bad 100644 --- a/packages/packages.go +++ b/packages/packages.go @@ -358,8 +358,6 @@ func (i *FileSystemIndexer) getPackagesFromFileSystem(ctx context.Context) (Pack version string } - numWorkers := runtime.GOMAXPROCS(0) - count := 0 for _, basePath := range i.paths { packagePaths, err := i.getPackagePaths(basePath) @@ -370,7 +368,7 @@ func (i *FileSystemIndexer) getPackagesFromFileSystem(ctx context.Context) (Pack } pList := make(Packages, count) - taskPool := workers.NewTaskPool(numWorkers) + taskPool := workers.NewTaskPool(runtime.GOMAXPROCS(0)) i.logger.Info("Searching packages in filesystem", zap.String("indexer", i.label)) count = 0 From 5aea1c6b2436ad873683a53c12e88021e1e2de7d Mon Sep 17 00:00:00 2001 From: Mario Rodriguez Molins Date: Thu, 27 Nov 2025 19:34:08 +0100 Subject: [PATCH 11/14] Add changelog entry --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6766184f0..bd64cd133 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added * Optimize package loading by reading package ZIP archives and package folders concurrently. [#1489](https://github.com/elastic/package-registry/pull/1489) +* Allow empty package paths on startup. [#1482](https://github.com/elastic/package-registry/pull/1482) ### Deprecated From a62499dab0979d63db7f776d72dfeb47424268bd Mon Sep 17 00:00:00 2001 From: Mario Rodriguez Molins Date: Thu, 27 Nov 2025 19:36:42 +0100 Subject: [PATCH 12/14] Remove unnecessary function --- packages/packages.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/packages/packages.go b/packages/packages.go index c19746bad..113c36cf4 100644 --- a/packages/packages.go +++ b/packages/packages.go @@ -387,15 +387,13 @@ func (i *FileSystemIndexer) getPackagesFromFileSystem(ctx context.Context) (Pack return fmt.Errorf("loading package failed (path: %s): %w", path, err) } - func() { - pList[position] = p + pList[position] = p - i.logger.Debug("found package", - zap.String("package.name", p.Name), - zap.String("package.version", p.Version), - zap.String("package.path", p.BasePath)) + i.logger.Debug("found package", + zap.String("package.name", p.Name), + zap.String("package.version", p.Version), + zap.String("package.path", p.BasePath)) - }() return nil }) } From e9ca9a67a716d1e3e683c1f2918c3a4472e9c3ae Mon Sep 17 00:00:00 2001 From: Mario Rodriguez Molins Date: Thu, 27 Nov 2025 19:42:55 +0100 Subject: [PATCH 13/14] Rephrase comment about duplicates --- packages/packages.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/packages.go b/packages/packages.go index 113c36cf4..c97f30870 100644 --- a/packages/packages.go +++ b/packages/packages.go @@ -403,9 +403,9 @@ func (i *FileSystemIndexer) getPackagesFromFileSystem(ctx context.Context) (Pack return nil, err } - // Remove duplicates while preserving filesystem discovery order. - // Duplicate removal happens after initial loading to maintain the order packages - // are discovered in the filesystem. This ensures that when the same package version + // Remove duplicates while preserving the package discovery order in the paths set in the configuration. + // Duplicate removal happens after initial loading all packages in all paths to maintain the order packages + // are discovered in the paths set in the configuration. This ensures that when the same package version // exists in multiple paths, we keep the version from the first path in the search order, // not necessarily the first one loaded by the concurrent workers. current := 0 From fcb290e03a968200f5b8ca09ee163c1046fbeddb Mon Sep 17 00:00:00 2001 From: Mario Rodriguez Molins Date: Thu, 27 Nov 2025 20:20:42 +0100 Subject: [PATCH 14/14] Add flag to override number of workers assigned to read packages concurrently --- main.go | 11 ++++++++++- packages/packages.go | 22 +++++++++++++++++++++- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/main.go b/main.go index edc22940e..8ac0b2551 100644 --- a/main.go +++ b/main.go @@ -17,6 +17,7 @@ import ( "os" "os/signal" "path/filepath" + "runtime" "strconv" "strings" "syscall" @@ -85,6 +86,7 @@ var ( serviceName = getServiceName() packagePathsEnableWatcher = false + packagePathsWorkers = 1 defaultConfig = Config{ CacheTimeIndex: 10 * time.Second, @@ -132,6 +134,7 @@ func init() { flag.StringVar(&proxyTo, "proxy-to", "https://epr.elastic.co/", "Proxy-to endpoint") flag.BoolVar(&packagePathsEnableWatcher, "package-paths-enable-watcher", false, "Enable file system watcher for package paths to automatically detect new packages.") + flag.IntVar(&packagePathsWorkers, "package-paths-workers", runtime.GOMAXPROCS(0), "Number of workers to use for reading packages concurrently from the configured paths. Default is the number of CPU cores returned by GOMAXPROCS.") } type Config struct { @@ -370,7 +373,6 @@ func initMetricsServer(logger *zap.Logger) { } }() } - func initIndexer(ctx context.Context, logger *zap.Logger, options serverOptions) Indexer { tx := options.apmTracer.StartTransaction("initIndexer", "backend.init") defer tx.End() @@ -400,7 +402,10 @@ func initIndexer(ctx context.Context, logger *zap.Logger, options serverOptions) Logger: logger, EnablePathsWatcher: packagePathsEnableWatcher, APMTracer: options.apmTracer, + PathsWorkers: packagePathsWorkers, } + logger.Debug("Using workers to read packages from package paths", zap.Int("workers", fsOptions.PathsWorkers)) + logger.Debug("Watching package paths for changes", zap.Bool("enabled", fsOptions.EnablePathsWatcher)) combined = append(combined, packages.NewZipFileSystemIndexer(fsOptions, packagesBasePaths...), @@ -753,5 +758,9 @@ func validateFlags() error { return fmt.Errorf("categories cache is just supported with SQL Storage indexer: feature-enable-categories-cache is enabled, but feature-sql-storage-indexer is not enabled") } + if packagePathsWorkers <= 0 { + return fmt.Errorf("package-paths-workers must be greater than 0") + } + return nil } diff --git a/packages/packages.go b/packages/packages.go index c97f30870..3e09d8614 100644 --- a/packages/packages.go +++ b/packages/packages.go @@ -121,6 +121,9 @@ type FileSystemIndexer struct { enablePathsWatcher bool + // pathsWorkers is the number of concurrent workers to use when reading packages from the filesystem. + pathsWorkers int + m sync.RWMutex apmTracer *apm.Tracer @@ -130,6 +133,9 @@ type FSIndexerOptions struct { Logger *zap.Logger EnablePathsWatcher bool APMTracer *apm.Tracer + + // PathsWorkers is the number of concurrent workers to use when reading packages from the filesystem. + PathsWorkers int } // NewFileSystemIndexer creates a new FileSystemIndexer for the given paths. @@ -161,6 +167,12 @@ func NewFileSystemIndexer(options FSIndexerOptions, paths ...string) *FileSystem options.Logger.Warn("ignoring unexpected file", zap.String("file.path", path)) return false, nil } + + // If PathsWorkers is not set or is less than or equal to 0, use the number of CPU cores. + pathWorkers := options.PathsWorkers + if options.PathsWorkers <= 0 { + pathWorkers = runtime.GOMAXPROCS(0) + } return &FileSystemIndexer{ paths: paths, label: fileSystemIndexerName, @@ -169,6 +181,7 @@ func NewFileSystemIndexer(options FSIndexerOptions, paths ...string) *FileSystem logger: options.Logger, enablePathsWatcher: options.EnablePathsWatcher, apmTracer: options.APMTracer, + pathsWorkers: pathWorkers, } } @@ -197,6 +210,12 @@ func NewZipFileSystemIndexer(options FSIndexerOptions, paths ...string) *FileSys return true, nil } + + // If PathsWorkers is not set or is less than or equal to 0, use the number of CPU cores. + pathWorkers := options.PathsWorkers + if options.PathsWorkers <= 0 { + pathWorkers = runtime.GOMAXPROCS(0) + } return &FileSystemIndexer{ paths: paths, label: zipFileSystemIndexerName, @@ -205,6 +224,7 @@ func NewZipFileSystemIndexer(options FSIndexerOptions, paths ...string) *FileSys logger: options.Logger, enablePathsWatcher: options.EnablePathsWatcher, apmTracer: options.APMTracer, + pathsWorkers: pathWorkers, } } @@ -368,7 +388,7 @@ func (i *FileSystemIndexer) getPackagesFromFileSystem(ctx context.Context) (Pack } pList := make(Packages, count) - taskPool := workers.NewTaskPool(runtime.GOMAXPROCS(0)) + taskPool := workers.NewTaskPool(i.pathsWorkers) i.logger.Info("Searching packages in filesystem", zap.String("indexer", i.label)) count = 0