diff --git a/CHANGELOG.md b/CHANGELOG.md index 061047de9..bd64cd133 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,8 +10,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Bugfixes +* Add missing mapping for azure/metrics input. [#1474](https://github.com/elastic/package-registry/pull/1474) + ### Added +* Optimize package loading by reading package ZIP archives and package folders concurrently. [#1489](https://github.com/elastic/package-registry/pull/1489) +* Allow empty package paths on startup. [#1482](https://github.com/elastic/package-registry/pull/1482) + ### Deprecated ### Known Issues diff --git a/internal/workers/taskpool.go b/internal/workers/taskpool.go new file mode 100644 index 000000000..a3b35abc3 --- /dev/null +++ b/internal/workers/taskpool.go @@ -0,0 +1,55 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package workers + +import ( + "errors" + "sync" +) + +type taskPool struct { + wg sync.WaitGroup + pool chan struct{} + errC chan error + errors []error +} + +func NewTaskPool(size int) *taskPool { + p := &taskPool{ + pool: make(chan struct{}, size), + errC: make(chan error), + } + go p.errorLoop() + return p +} + +func (p *taskPool) errorLoop() { + go func() { + for err := range p.errC { + if err != nil { + p.errors = append(p.errors, err) + } + } + }() +} + +// Do runs the task in a goroutine, ensuring no more tasks are running than the size of the pool. +func (p *taskPool) Do(task func() error) { + p.pool <- struct{}{} + p.wg.Add(1) + go func() { + defer func() { <-p.pool }() + defer p.wg.Done() + p.errC <- task() + }() +} + +// Wait waits for all the tasks to finish, and joins the errors found. The pool cannot be used after calling Wait. +func (p *taskPool) Wait() error { + close(p.pool) + p.wg.Wait() + close(p.errC) + return errors.Join(p.errors...) +} diff --git a/main.go b/main.go index edc22940e..8ac0b2551 100644 --- a/main.go +++ b/main.go @@ -17,6 +17,7 @@ import ( "os" "os/signal" "path/filepath" + "runtime" "strconv" "strings" "syscall" @@ -85,6 +86,7 @@ var ( serviceName = getServiceName() packagePathsEnableWatcher = false + packagePathsWorkers = 1 defaultConfig = Config{ CacheTimeIndex: 10 * time.Second, @@ -132,6 +134,7 @@ func init() { flag.StringVar(&proxyTo, "proxy-to", "https://epr.elastic.co/", "Proxy-to endpoint") flag.BoolVar(&packagePathsEnableWatcher, "package-paths-enable-watcher", false, "Enable file system watcher for package paths to automatically detect new packages.") + flag.IntVar(&packagePathsWorkers, "package-paths-workers", runtime.GOMAXPROCS(0), "Number of workers to use for reading packages concurrently from the configured paths. Default is the number of CPU cores returned by GOMAXPROCS.") } type Config struct { @@ -370,7 +373,6 @@ func initMetricsServer(logger *zap.Logger) { } }() } - func initIndexer(ctx context.Context, logger *zap.Logger, options serverOptions) Indexer { tx := options.apmTracer.StartTransaction("initIndexer", "backend.init") defer tx.End() @@ -400,7 +402,10 @@ func initIndexer(ctx context.Context, logger *zap.Logger, options serverOptions) Logger: logger, EnablePathsWatcher: packagePathsEnableWatcher, APMTracer: options.apmTracer, + PathsWorkers: packagePathsWorkers, } + logger.Debug("Using workers to read packages from package paths", zap.Int("workers", fsOptions.PathsWorkers)) + logger.Debug("Watching package paths for changes", zap.Bool("enabled", fsOptions.EnablePathsWatcher)) combined = append(combined, packages.NewZipFileSystemIndexer(fsOptions, packagesBasePaths...), @@ -753,5 +758,9 @@ func validateFlags() error { return fmt.Errorf("categories cache is just supported with SQL Storage indexer: feature-enable-categories-cache is enabled, but feature-sql-storage-indexer is not enabled") } + if packagePathsWorkers <= 0 { + return fmt.Errorf("package-paths-workers must be greater than 0") + } + return nil } diff --git a/packages/packages.go b/packages/packages.go index defc96048..3e09d8614 100644 --- a/packages/packages.go +++ b/packages/packages.go @@ -10,6 +10,7 @@ import ( "fmt" "os" "path/filepath" + "runtime" "slices" "strings" "sync" @@ -22,6 +23,7 @@ import ( "go.elastic.co/apm/v2" "go.uber.org/zap" + "github.com/elastic/package-registry/internal/workers" "github.com/elastic/package-registry/metrics" ) @@ -119,6 +121,9 @@ type FileSystemIndexer struct { enablePathsWatcher bool + // pathsWorkers is the number of concurrent workers to use when reading packages from the filesystem. + pathsWorkers int + m sync.RWMutex apmTracer *apm.Tracer @@ -128,6 +133,9 @@ type FSIndexerOptions struct { Logger *zap.Logger EnablePathsWatcher bool APMTracer *apm.Tracer + + // PathsWorkers is the number of concurrent workers to use when reading packages from the filesystem. + PathsWorkers int } // NewFileSystemIndexer creates a new FileSystemIndexer for the given paths. @@ -159,6 +167,12 @@ func NewFileSystemIndexer(options FSIndexerOptions, paths ...string) *FileSystem options.Logger.Warn("ignoring unexpected file", zap.String("file.path", path)) return false, nil } + + // If PathsWorkers is not set or is less than or equal to 0, use the number of CPU cores. + pathWorkers := options.PathsWorkers + if options.PathsWorkers <= 0 { + pathWorkers = runtime.GOMAXPROCS(0) + } return &FileSystemIndexer{ paths: paths, label: fileSystemIndexerName, @@ -167,6 +181,7 @@ func NewFileSystemIndexer(options FSIndexerOptions, paths ...string) *FileSystem logger: options.Logger, enablePathsWatcher: options.EnablePathsWatcher, apmTracer: options.APMTracer, + pathsWorkers: pathWorkers, } } @@ -195,6 +210,12 @@ func NewZipFileSystemIndexer(options FSIndexerOptions, paths ...string) *FileSys return true, nil } + + // If PathsWorkers is not set or is less than or equal to 0, use the number of CPU cores. + pathWorkers := options.PathsWorkers + if options.PathsWorkers <= 0 { + pathWorkers = runtime.GOMAXPROCS(0) + } return &FileSystemIndexer{ paths: paths, label: zipFileSystemIndexerName, @@ -203,6 +224,7 @@ func NewZipFileSystemIndexer(options FSIndexerOptions, paths ...string) *FileSys logger: options.Logger, enablePathsWatcher: options.EnablePathsWatcher, apmTracer: options.APMTracer, + pathsWorkers: pathWorkers, } } @@ -355,40 +377,76 @@ func (i *FileSystemIndexer) getPackagesFromFileSystem(ctx context.Context) (Pack name string version string } - packagesFound := make(map[packageKey]struct{}) - var pList Packages + count := 0 for _, basePath := range i.paths { packagePaths, err := i.getPackagePaths(basePath) if err != nil { return nil, err } + count += len(packagePaths) + } + pList := make(Packages, count) - i.logger.Info("Searching packages in " + basePath) - for _, path := range packagePaths { - p, err := NewPackage(i.logger, path, i.fsBuilder) - if err != nil { - return nil, fmt.Errorf("loading package failed (path: %s): %w", path, err) - } + taskPool := workers.NewTaskPool(i.pathsWorkers) + + i.logger.Info("Searching packages in filesystem", zap.String("indexer", i.label)) + count = 0 + for _, basePath := range i.paths { + packagePaths, err := i.getPackagePaths(basePath) + if err != nil { + return nil, err + } + for _, p := range packagePaths { + position := count + path := p + count++ + taskPool.Do(func() error { + p, err := NewPackage(i.logger, path, i.fsBuilder) + if err != nil { + return fmt.Errorf("loading package failed (path: %s): %w", path, err) + } + + pList[position] = p - key := packageKey{name: p.Name, version: p.Version} - if _, found := packagesFound[key]; found { - i.logger.Debug("duplicated package", + i.logger.Debug("found package", zap.String("package.name", p.Name), zap.String("package.version", p.Version), zap.String("package.path", p.BasePath)) - continue - } - packagesFound[key] = struct{}{} - pList = append(pList, p) + return nil + }) + } + } + + if err := taskPool.Wait(); err != nil { + return nil, err + } - i.logger.Debug("found package", + // Remove duplicates while preserving the package discovery order in the paths set in the configuration. + // Duplicate removal happens after initial loading all packages in all paths to maintain the order packages + // are discovered in the paths set in the configuration. This ensures that when the same package version + // exists in multiple paths, we keep the version from the first path in the search order, + // not necessarily the first one loaded by the concurrent workers. + current := 0 + packagesFound := make(map[packageKey]struct{}) + for _, p := range pList { + key := packageKey{name: p.Name, version: p.Version} + if _, found := packagesFound[key]; found { + i.logger.Debug("duplicated package", zap.String("package.name", p.Name), zap.String("package.version", p.Version), zap.String("package.path", p.BasePath)) + continue } + packagesFound[key] = struct{}{} + pList[current] = p + current++ } + + pList = pList[:current] + i.logger.Info("Searching packages in filesystem done", zap.String("indexer", i.label), zap.Int("packages.size", len(pList))) + return pList, nil } diff --git a/packages/packages_test.go b/packages/packages_test.go index 19909bd08..d0280ae2d 100644 --- a/packages/packages_test.go +++ b/packages/packages_test.go @@ -16,10 +16,38 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "github.com/elastic/package-registry/archiver" + "github.com/elastic/package-registry/internal/util" ) +func BenchmarkInit(b *testing.B) { + // given + packagesBasePaths := []string{"../testdata/second_package_path", "../testdata/package"} + + testLogger := util.NewTestLoggerLevel(zapcore.FatalLevel) + b.ResetTimer() + fsOptions := FSIndexerOptions{ + Logger: testLogger, + } + for i := 0; i < b.N; i++ { + zipIndexer := NewZipFileSystemIndexer(fsOptions, "../testdata/local-storage") + dirIndexer := NewFileSystemIndexer(fsOptions, packagesBasePaths...) + + err := zipIndexer.Init(b.Context()) + require.NoError(b, err) + + err = dirIndexer.Init(b.Context()) + require.NoError(b, err) + + b.StopTimer() + require.NoError(b, zipIndexer.Close(b.Context())) + require.NoError(b, dirIndexer.Close(b.Context())) + b.StartTimer() + } +} + func TestPackagesFilter(t *testing.T) { filterTestPackages := []filterTestPackage{ {