Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Bugfixes

* Add missing mapping for azure/metrics input. [#1474](https://github.com/elastic/package-registry/pull/1474)

### Added

* Optimize package loading by reading package ZIP archives and package folders concurrently. [#1489](https://github.com/elastic/package-registry/pull/1489)
* Allow empty package paths on startup. [#1482](https://github.com/elastic/package-registry/pull/1482)

### Deprecated

### Known Issues
Expand Down
55 changes: 55 additions & 0 deletions internal/workers/taskpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package workers

import (
"errors"
"sync"
)

type taskPool struct {
wg sync.WaitGroup
pool chan struct{}
errC chan error
errors []error
}

func NewTaskPool(size int) *taskPool {
p := &taskPool{
pool: make(chan struct{}, size),
errC: make(chan error),
}
go p.errorLoop()
return p
}

func (p *taskPool) errorLoop() {
go func() {
for err := range p.errC {
if err != nil {
p.errors = append(p.errors, err)
}
}
}()
}

// Do runs the task in a goroutine, ensuring no more tasks are running than the size of the pool.
func (p *taskPool) Do(task func() error) {
p.pool <- struct{}{}
p.wg.Add(1)
go func() {
defer func() { <-p.pool }()
defer p.wg.Done()
p.errC <- task()
}()
}

// Wait waits for all the tasks to finish, and joins the errors found. The pool cannot be used after calling Wait.
func (p *taskPool) Wait() error {
close(p.pool)
p.wg.Wait()
close(p.errC)
return errors.Join(p.errors...)
}
11 changes: 10 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"os"
"os/signal"
"path/filepath"
"runtime"
"strconv"
"strings"
"syscall"
Expand Down Expand Up @@ -85,6 +86,7 @@ var (
serviceName = getServiceName()

packagePathsEnableWatcher = false
packagePathsWorkers = 1

defaultConfig = Config{
CacheTimeIndex: 10 * time.Second,
Expand Down Expand Up @@ -132,6 +134,7 @@ func init() {
flag.StringVar(&proxyTo, "proxy-to", "https://epr.elastic.co/", "Proxy-to endpoint")

flag.BoolVar(&packagePathsEnableWatcher, "package-paths-enable-watcher", false, "Enable file system watcher for package paths to automatically detect new packages.")
flag.IntVar(&packagePathsWorkers, "package-paths-workers", runtime.GOMAXPROCS(0), "Number of workers to use for reading packages concurrently from the configured paths. Default is the number of CPU cores returned by GOMAXPROCS.")
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested using flag and environment variable

./package-registry -log-level debug -package-paths-workers=5
EPR_PACKAGE_PATHS_WORKERS=2 ./package-registry -log-level debug

}

type Config struct {
Expand Down Expand Up @@ -370,7 +373,6 @@ func initMetricsServer(logger *zap.Logger) {
}
}()
}

func initIndexer(ctx context.Context, logger *zap.Logger, options serverOptions) Indexer {
tx := options.apmTracer.StartTransaction("initIndexer", "backend.init")
defer tx.End()
Expand Down Expand Up @@ -400,7 +402,10 @@ func initIndexer(ctx context.Context, logger *zap.Logger, options serverOptions)
Logger: logger,
EnablePathsWatcher: packagePathsEnableWatcher,
APMTracer: options.apmTracer,
PathsWorkers: packagePathsWorkers,
}
logger.Debug("Using workers to read packages from package paths", zap.Int("workers", fsOptions.PathsWorkers))
logger.Debug("Watching package paths for changes", zap.Bool("enabled", fsOptions.EnablePathsWatcher))
Comment on lines +407 to +408
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought to show in debug level the values used by the service. WDYT ?

Examples of logs shown:

{
  "log.level": "debug",
  "@timestamp": "2025-11-27T20:17:56.815+0100",
  "log.origin": {
    "function": "main.initIndexer",
    "file.name": "package-registry-load-packages-fsys/main.go",
    "file.line": 407
  },
  "message": "Using workers to read packages from package paths",
  "service.name": "package-registry",
  "service.version": "1.33.1",
  "workers": 16,
  "ecs.version": "1.6.0"
}
{
  "log.level": "debug",
  "@timestamp": "2025-11-27T20:17:56.815+0100",
  "log.origin": {
    "function": "main.initIndexer",
    "file.name": "package-registry-load-packages-fsys/main.go",
    "file.line": 408
  },
  "message": "Wathching package paths for changes",
  "service.name": "package-registry",
  "service.version": "1.33.1",
  "enabled": false,
  "ecs.version": "1.6.0"
}


combined = append(combined,
packages.NewZipFileSystemIndexer(fsOptions, packagesBasePaths...),
Expand Down Expand Up @@ -753,5 +758,9 @@ func validateFlags() error {
return fmt.Errorf("categories cache is just supported with SQL Storage indexer: feature-enable-categories-cache is enabled, but feature-sql-storage-indexer is not enabled")
}

if packagePathsWorkers <= 0 {
return fmt.Errorf("package-paths-workers must be greater than 0")
}

return nil
}
90 changes: 74 additions & 16 deletions packages/packages.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"os"
"path/filepath"
"runtime"
"slices"
"strings"
"sync"
Expand All @@ -22,6 +23,7 @@ import (
"go.elastic.co/apm/v2"
"go.uber.org/zap"

"github.com/elastic/package-registry/internal/workers"
"github.com/elastic/package-registry/metrics"
)

Expand Down Expand Up @@ -119,6 +121,9 @@ type FileSystemIndexer struct {

enablePathsWatcher bool

// pathsWorkers is the number of concurrent workers to use when reading packages from the filesystem.
pathsWorkers int

m sync.RWMutex

apmTracer *apm.Tracer
Expand All @@ -128,6 +133,9 @@ type FSIndexerOptions struct {
Logger *zap.Logger
EnablePathsWatcher bool
APMTracer *apm.Tracer

// PathsWorkers is the number of concurrent workers to use when reading packages from the filesystem.
PathsWorkers int
}

// NewFileSystemIndexer creates a new FileSystemIndexer for the given paths.
Expand Down Expand Up @@ -159,6 +167,12 @@ func NewFileSystemIndexer(options FSIndexerOptions, paths ...string) *FileSystem
options.Logger.Warn("ignoring unexpected file", zap.String("file.path", path))
return false, nil
}

// If PathsWorkers is not set or is less than or equal to 0, use the number of CPU cores.
pathWorkers := options.PathsWorkers
if options.PathsWorkers <= 0 {
pathWorkers = runtime.GOMAXPROCS(0)
}
return &FileSystemIndexer{
paths: paths,
label: fileSystemIndexerName,
Expand All @@ -167,6 +181,7 @@ func NewFileSystemIndexer(options FSIndexerOptions, paths ...string) *FileSystem
logger: options.Logger,
enablePathsWatcher: options.EnablePathsWatcher,
apmTracer: options.APMTracer,
pathsWorkers: pathWorkers,
}
}

Expand Down Expand Up @@ -195,6 +210,12 @@ func NewZipFileSystemIndexer(options FSIndexerOptions, paths ...string) *FileSys

return true, nil
}

// If PathsWorkers is not set or is less than or equal to 0, use the number of CPU cores.
pathWorkers := options.PathsWorkers
if options.PathsWorkers <= 0 {
pathWorkers = runtime.GOMAXPROCS(0)
}
return &FileSystemIndexer{
paths: paths,
label: zipFileSystemIndexerName,
Expand All @@ -203,6 +224,7 @@ func NewZipFileSystemIndexer(options FSIndexerOptions, paths ...string) *FileSys
logger: options.Logger,
enablePathsWatcher: options.EnablePathsWatcher,
apmTracer: options.APMTracer,
pathsWorkers: pathWorkers,
}
}

Expand Down Expand Up @@ -355,40 +377,76 @@ func (i *FileSystemIndexer) getPackagesFromFileSystem(ctx context.Context) (Pack
name string
version string
}
packagesFound := make(map[packageKey]struct{})

var pList Packages
count := 0
for _, basePath := range i.paths {
packagePaths, err := i.getPackagePaths(basePath)
if err != nil {
return nil, err
}
count += len(packagePaths)
}
pList := make(Packages, count)

i.logger.Info("Searching packages in " + basePath)
for _, path := range packagePaths {
p, err := NewPackage(i.logger, path, i.fsBuilder)
if err != nil {
return nil, fmt.Errorf("loading package failed (path: %s): %w", path, err)
}
taskPool := workers.NewTaskPool(i.pathsWorkers)

i.logger.Info("Searching packages in filesystem", zap.String("indexer", i.label))
count = 0
for _, basePath := range i.paths {
packagePaths, err := i.getPackagePaths(basePath)
if err != nil {
return nil, err
}
for _, p := range packagePaths {
position := count
path := p
count++
taskPool.Do(func() error {
p, err := NewPackage(i.logger, path, i.fsBuilder)
if err != nil {
return fmt.Errorf("loading package failed (path: %s): %w", path, err)
}

pList[position] = p

key := packageKey{name: p.Name, version: p.Version}
if _, found := packagesFound[key]; found {
i.logger.Debug("duplicated package",
i.logger.Debug("found package",
zap.String("package.name", p.Name),
zap.String("package.version", p.Version),
zap.String("package.path", p.BasePath))
continue
}

packagesFound[key] = struct{}{}
pList = append(pList, p)
return nil
})
}
}

if err := taskPool.Wait(); err != nil {
return nil, err
}

i.logger.Debug("found package",
// Remove duplicates while preserving the package discovery order in the paths set in the configuration.
// Duplicate removal happens after initial loading all packages in all paths to maintain the order packages
// are discovered in the paths set in the configuration. This ensures that when the same package version
// exists in multiple paths, we keep the version from the first path in the search order,
// not necessarily the first one loaded by the concurrent workers.
current := 0
packagesFound := make(map[packageKey]struct{})
for _, p := range pList {
key := packageKey{name: p.Name, version: p.Version}
if _, found := packagesFound[key]; found {
i.logger.Debug("duplicated package",
zap.String("package.name", p.Name),
zap.String("package.version", p.Version),
zap.String("package.path", p.BasePath))
continue
}
packagesFound[key] = struct{}{}
pList[current] = p
current++
}

pList = pList[:current]
i.logger.Info("Searching packages in filesystem done", zap.String("indexer", i.label), zap.Int("packages.size", len(pList)))

return pList, nil
}

Expand Down
28 changes: 28 additions & 0 deletions packages/packages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,38 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/elastic/package-registry/archiver"
"github.com/elastic/package-registry/internal/util"
)

func BenchmarkInit(b *testing.B) {
// given
packagesBasePaths := []string{"../testdata/second_package_path", "../testdata/package"}

testLogger := util.NewTestLoggerLevel(zapcore.FatalLevel)
b.ResetTimer()
fsOptions := FSIndexerOptions{
Logger: testLogger,
}
for i := 0; i < b.N; i++ {
zipIndexer := NewZipFileSystemIndexer(fsOptions, "../testdata/local-storage")
dirIndexer := NewFileSystemIndexer(fsOptions, packagesBasePaths...)

err := zipIndexer.Init(b.Context())
require.NoError(b, err)

err = dirIndexer.Init(b.Context())
require.NoError(b, err)

b.StopTimer()
require.NoError(b, zipIndexer.Close(b.Context()))
require.NoError(b, dirIndexer.Close(b.Context()))
b.StartTimer()
}
}

func TestPackagesFilter(t *testing.T) {
filterTestPackages := []filterTestPackage{
{
Expand Down