Load packages concurrently in filesystem indexers#1489
Conversation
✅ Snyk checks have passed. No issues have been found so far.
💻 Catch issues earlier using the plugins for VS Code, JetBrains IDEs, Visual Studio, and Eclipse. |
jsoriano
left a comment
There was a problem hiding this comment.
Thanks for trying this, this can be a great improvement.
packages/packages.go
Outdated
| position int | ||
| path string | ||
| } | ||
| numWorkers := runtime.GOMAXPROCS(0) / 2 |
There was a problem hiding this comment.
Did you try directly to use GOMAXPROCS instead of half of them?
There was a problem hiding this comment.
I only used half of them because I was being conservative. But, it should not be any problem on setting GOMAXPROCS instead.
packages/packages.go
Outdated
| for range numWorkers { | ||
| wg.Add(1) | ||
| go func(logger *zap.Logger, fsBuilder FileSystemBuilder) { | ||
| defer wg.Done() |
There was a problem hiding this comment.
Nit, consider moving the workers pool logic to different functions for clarity. Something like this is done in https://github.com/elastic/package-registry/pull/1335/files#diff-8ea6771aea0aa89d25c6b29227dbc22db28f067930f7b9fc9921134c66c744b5R50.
There was a problem hiding this comment.
Updated the code to use the same functions as your link.
I added all these methods as an internal package in 01407b7
teresaromero
left a comment
There was a problem hiding this comment.
this looks like an improvement !
packages/packages.go
Outdated
| } | ||
| numWorkers := runtime.GOMAXPROCS(0) / 2 | ||
| mu := sync.Mutex{} | ||
| packagesFound := make(map[packageKey]struct{}) |
There was a problem hiding this comment.
why using here an empty struct? could we use a bool? and chech its found (by key) and true?
There was a problem hiding this comment.
Here, I just interested in whether or not the key is present. I think it is not needed to use boolean. An advantage of usingstruct{} here is that its size is also zero according to this.
There was a problem hiding this comment.
i need a 💡 for reacting , thanks! learned something today
packages/packages.go
Outdated
| return nil, err | ||
| } | ||
| for _, path := range packagePaths { | ||
| jobChan <- packageJob{position: count, path: path} |
There was a problem hiding this comment.
do we need the paths to keep the original positions? 🤔
There was a problem hiding this comment.
I thought to keep the same ordering that it was now, to ensure there are no changes in the list generated.
|
Some benchmarks run with a small subset of packages from
This shows that there is an improvement in speed ( |
5128360 to
01407b7
Compare
|
Looking at this error: https://buildkite.com/elastic/package-registry/builds/1528#019ac11a-cfdc-4fb0-9eeb-024454995a6b/118-1448 It happens some time when running |
|
After 9241261 , I re-run the benchmark and this is the result using all the CPUs available: It follows the same pattern as in #1489 (comment) |
| if _, found := packagesFound[key]; found { | ||
| i.logger.Debug("duplicated package", | ||
| zap.String("package.name", p.Name), | ||
| zap.String("package.version", p.Version), | ||
| zap.String("package.path", p.BasePath)) | ||
| continue | ||
| } |
There was a problem hiding this comment.
This process is now moved out of this loop to ensure the same ordering as before is kept.
Considering that the same package is defined in different paths, performing the load of package concurrently could lead to load first into the array the package from the second path but not the package from the first path.
The current implementation loads all the packages in the expected position (as it was before). And then once finished this loop, duplicated packages are removed following the order (keeping the first packages defined).
packages/packages.go
Outdated
| if err != nil { | ||
| return nil, fmt.Errorf("loading package failed (path: %s): %w", path, err) | ||
| } | ||
| taskPool := workers.NewTaskPool(runtime.GOMAXPROCS(0)) |
There was a problem hiding this comment.
Currently, the number of workers is hardcoded to be GOMAXPROCS number.
Should it be added some flag to be able to override this setting ? Maybe we could keep it as is for now. WDYT?
There was a problem hiding this comment.
from what i've read, this default has been "checked" at 1.25 making it more flexible, so it will take whatever is able to take, even in containers (previous 1.25 it took the machine limit, now it takes the cpu limit of the container) https://go.dev/blog/container-aware-gomaxprocs
given this, i would leave it as hardcoded GOMAXPROCS to have the max possible. perhaps we could monitor and change this in the future if it takes too many resources.
There was a problem hiding this comment.
although registry is still at 1.24 so this number is taking the cpu available on the machine when running inside a container 🤔 so until we upgrade maybe we have to limit as an option :D
There was a problem hiding this comment.
packages/packages.go
Outdated
| return nil, err | ||
| } | ||
|
|
||
| // Remove duplicates while preserving filesystem discovery order. |
There was a problem hiding this comment.
Out of curiosity, is it needed to preserve filesystem order? It could be quite random.
There was a problem hiding this comment.
This could be re-phrased. At least, it needs to be kept the same ordering of appearance of packages depending on which path they appear.
Those packages defined in the first path set by the user have precedence if they also are defined in the following paths defined by the user.
There is also a test that checks that too:
From this test:
Lines 535 to 541 in e42e823
Updated comment here e9ca9a6
| flag.StringVar(&proxyTo, "proxy-to", "https://epr.elastic.co/", "Proxy-to endpoint") | ||
|
|
||
| flag.BoolVar(&packagePathsEnableWatcher, "package-paths-enable-watcher", false, "Enable file system watcher for package paths to automatically detect new packages.") | ||
| flag.IntVar(&packagePathsWorkers, "package-paths-workers", runtime.GOMAXPROCS(0), "Number of workers to use for reading packages concurrently from the configured paths. Default is the number of CPU cores returned by GOMAXPROCS.") |
There was a problem hiding this comment.
Tested using flag and environment variable
./package-registry -log-level debug -package-paths-workers=5
EPR_PACKAGE_PATHS_WORKERS=2 ./package-registry -log-level debug
d182ee8 to
fcb290e
Compare
| logger.Debug("Using workers to read packages from package paths", zap.Int("workers", fsOptions.PathsWorkers)) | ||
| logger.Debug("Watching package paths for changes", zap.Bool("enabled", fsOptions.EnablePathsWatcher)) |
There was a problem hiding this comment.
I thought to show in debug level the values used by the service. WDYT ?
Examples of logs shown:
{
"log.level": "debug",
"@timestamp": "2025-11-27T20:17:56.815+0100",
"log.origin": {
"function": "main.initIndexer",
"file.name": "package-registry-load-packages-fsys/main.go",
"file.line": 407
},
"message": "Using workers to read packages from package paths",
"service.name": "package-registry",
"service.version": "1.33.1",
"workers": 16,
"ecs.version": "1.6.0"
}
{
"log.level": "debug",
"@timestamp": "2025-11-27T20:17:56.815+0100",
"log.origin": {
"function": "main.initIndexer",
"file.name": "package-registry-load-packages-fsys/main.go",
"file.line": 408
},
"message": "Wathching package paths for changes",
"service.name": "package-registry",
"service.version": "1.33.1",
"enabled": false,
"ecs.version": "1.6.0"
}
💚 Build Succeeded
History
cc @mrodm |
This PR updates the file system indexers to load all the packages found in the given paths in parallel using a worker pool.
Doing some local tests:
Author's checklist
main) logic.How to test this PR locally