-
Notifications
You must be signed in to change notification settings - Fork 3.2k
cache, goroutine and unbounded workers management #6420
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
bf86910
a0a6df4
0a8c5ad
4bc77ff
4a92b15
23a61ea
3c877b8
85fd30f
fc720c5
f54c094
2a1ff38
2a27dd0
4442040
beda45f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -48,8 +48,15 @@ func (e *Engine) executeAllSelfContained(ctx context.Context, alltemplates []*te | |||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // executeTemplateWithTargets executes a given template on x targets (with a internal targetpool(i.e concurrency)) | ||||||||||||||||||||||||||||
| func (e *Engine) executeTemplateWithTargets(ctx context.Context, template *templates.Template, target provider.InputProvider, results *atomic.Bool) { | ||||||||||||||||||||||||||||
| // this is target pool i.e max target to execute | ||||||||||||||||||||||||||||
| wg := e.workPool.InputPool(template.Type()) | ||||||||||||||||||||||||||||
| if e.workPool == nil { | ||||||||||||||||||||||||||||
| e.workPool = e.GetWorkPool() | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| // Bounded worker pool using input concurrency | ||||||||||||||||||||||||||||
| pool := e.workPool.InputPool(template.Type()) | ||||||||||||||||||||||||||||
| workerCount := 1 | ||||||||||||||||||||||||||||
| if pool != nil && pool.Size > 0 { | ||||||||||||||||||||||||||||
| workerCount = pool.Size | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| var ( | ||||||||||||||||||||||||||||
| index uint32 | ||||||||||||||||||||||||||||
|
|
@@ -78,6 +85,41 @@ func (e *Engine) executeTemplateWithTargets(ctx context.Context, template *templ | |||||||||||||||||||||||||||
| currentInfo.Unlock() | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // task represents a single target execution unit | ||||||||||||||||||||||||||||
| type task struct { | ||||||||||||||||||||||||||||
| index uint32 | ||||||||||||||||||||||||||||
| skip bool | ||||||||||||||||||||||||||||
| value *contextargs.MetaInput | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| tasks := make(chan task) | ||||||||||||||||||||||||||||
| var workersWg sync.WaitGroup | ||||||||||||||||||||||||||||
| workersWg.Add(workerCount) | ||||||||||||||||||||||||||||
| for i := 0; i < workerCount; i++ { | ||||||||||||||||||||||||||||
| go func() { | ||||||||||||||||||||||||||||
| defer workersWg.Done() | ||||||||||||||||||||||||||||
| for t := range tasks { | ||||||||||||||||||||||||||||
| func() { | ||||||||||||||||||||||||||||
| defer cleanupInFlight(t.index) | ||||||||||||||||||||||||||||
| select { | ||||||||||||||||||||||||||||
| case <-ctx.Done(): | ||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||
| default: | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| if t.skip { | ||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| match, err := e.executeTemplateOnInput(ctx, template, t.value) | ||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||
| e.options.Logger.Warning().Msgf("[%s] Could not execute step on %s: %s\n", e.executerOpts.Colorizer.BrightBlue(template.ID), t.value.Input, err) | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| results.CompareAndSwap(false, match) | ||||||||||||||||||||||||||||
| }() | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| }() | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| target.Iterate(func(scannedValue *contextargs.MetaInput) bool { | ||||||||||||||||||||||||||||
| select { | ||||||||||||||||||||||||||||
| case <-ctx.Done(): | ||||||||||||||||||||||||||||
|
|
@@ -128,43 +170,13 @@ func (e *Engine) executeTemplateWithTargets(ctx context.Context, template *templ | |||||||||||||||||||||||||||
| return true | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| wg.Add() | ||||||||||||||||||||||||||||
| go func(index uint32, skip bool, value *contextargs.MetaInput) { | ||||||||||||||||||||||||||||
| defer wg.Done() | ||||||||||||||||||||||||||||
| defer cleanupInFlight(index) | ||||||||||||||||||||||||||||
| if skip { | ||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| var match bool | ||||||||||||||||||||||||||||
| var err error | ||||||||||||||||||||||||||||
| ctxArgs := contextargs.New(ctx) | ||||||||||||||||||||||||||||
| ctxArgs.MetaInput = value | ||||||||||||||||||||||||||||
| ctx := scan.NewScanContext(ctx, ctxArgs) | ||||||||||||||||||||||||||||
| switch template.Type() { | ||||||||||||||||||||||||||||
| case types.WorkflowProtocol: | ||||||||||||||||||||||||||||
| match = e.executeWorkflow(ctx, template.CompiledWorkflow) | ||||||||||||||||||||||||||||
| default: | ||||||||||||||||||||||||||||
| if e.Callback != nil { | ||||||||||||||||||||||||||||
| if results, err := template.Executer.ExecuteWithResults(ctx); err == nil { | ||||||||||||||||||||||||||||
| for _, result := range results { | ||||||||||||||||||||||||||||
| e.Callback(result) | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| match = true | ||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||
| match, err = template.Executer.Execute(ctx) | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||
| e.options.Logger.Warning().Msgf("[%s] Could not execute step on %s: %s\n", e.executerOpts.Colorizer.BrightBlue(template.ID), value.Input, err) | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| results.CompareAndSwap(false, match) | ||||||||||||||||||||||||||||
| }(index, skip, scannedValue) | ||||||||||||||||||||||||||||
| tasks <- task{index: index, skip: skip, value: scannedValue} | ||||||||||||||||||||||||||||
| index++ | ||||||||||||||||||||||||||||
| return true | ||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||
| wg.Wait() | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| close(tasks) | ||||||||||||||||||||||||||||
| workersWg.Wait() | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
|
Comment on lines
+173
to
180
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix MetaInput aliasing: send a copy to workers Copy the value before enqueue so workers don’t observe reused/mutated structs. Apply: - tasks <- task{index: index, skip: skip, value: scannedValue}
+ vcopy := *scannedValue
+ tasks <- task{index: index, skip: skip, value: &vcopy}📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||
| // on completion marks the template as completed | ||||||||||||||||||||||||||||
| currentInfo.Lock() | ||||||||||||||||||||||||||||
|
|
@@ -202,30 +214,35 @@ func (e *Engine) executeTemplatesOnTarget(ctx context.Context, alltemplates []*t | |||||||||||||||||||||||||||
| go func(template *templates.Template, value *contextargs.MetaInput, wg *syncutil.AdaptiveWaitGroup) { | ||||||||||||||||||||||||||||
| defer wg.Done() | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| var match bool | ||||||||||||||||||||||||||||
| var err error | ||||||||||||||||||||||||||||
| ctxArgs := contextargs.New(ctx) | ||||||||||||||||||||||||||||
| ctxArgs.MetaInput = value | ||||||||||||||||||||||||||||
| ctx := scan.NewScanContext(ctx, ctxArgs) | ||||||||||||||||||||||||||||
| switch template.Type() { | ||||||||||||||||||||||||||||
| case types.WorkflowProtocol: | ||||||||||||||||||||||||||||
| match = e.executeWorkflow(ctx, template.CompiledWorkflow) | ||||||||||||||||||||||||||||
| default: | ||||||||||||||||||||||||||||
| if e.Callback != nil { | ||||||||||||||||||||||||||||
| if results, err := template.Executer.ExecuteWithResults(ctx); err == nil { | ||||||||||||||||||||||||||||
| for _, result := range results { | ||||||||||||||||||||||||||||
| e.Callback(result) | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| match = true | ||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||
| match, err = template.Executer.Execute(ctx) | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| match, err := e.executeTemplateOnInput(ctx, template, value) | ||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||
| e.options.Logger.Warning().Msgf("[%s] Could not execute step on %s: %s\n", e.executerOpts.Colorizer.BrightBlue(template.ID), value.Input, err) | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| results.CompareAndSwap(false, match) | ||||||||||||||||||||||||||||
| }(tpl, target, sg) | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // executeTemplateOnInput performs template execution for a single input and returns match status and error | ||||||||||||||||||||||||||||
| func (e *Engine) executeTemplateOnInput(ctx context.Context, template *templates.Template, value *contextargs.MetaInput) (bool, error) { | ||||||||||||||||||||||||||||
| ctxArgs := contextargs.New(ctx) | ||||||||||||||||||||||||||||
| ctxArgs.MetaInput = value | ||||||||||||||||||||||||||||
| scanCtx := scan.NewScanContext(ctx, ctxArgs) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| switch template.Type() { | ||||||||||||||||||||||||||||
| case types.WorkflowProtocol: | ||||||||||||||||||||||||||||
| return e.executeWorkflow(scanCtx, template.CompiledWorkflow), nil | ||||||||||||||||||||||||||||
| default: | ||||||||||||||||||||||||||||
| if e.Callback != nil { | ||||||||||||||||||||||||||||
| results, err := template.Executer.ExecuteWithResults(scanCtx) | ||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||
| return false, err | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| for _, result := range results { | ||||||||||||||||||||||||||||
| e.Callback(result) | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| return len(results) > 0, nil | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| return template.Executer.Execute(scanCtx) | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,148 @@ | ||
| package core | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "sync/atomic" | ||
| "testing" | ||
| "time" | ||
|
|
||
| inputtypes "github.com/projectdiscovery/nuclei/v3/pkg/input/types" | ||
| "github.com/projectdiscovery/nuclei/v3/pkg/output" | ||
| "github.com/projectdiscovery/nuclei/v3/pkg/protocols" | ||
| "github.com/projectdiscovery/nuclei/v3/pkg/protocols/common/contextargs" | ||
| "github.com/projectdiscovery/nuclei/v3/pkg/scan" | ||
| "github.com/projectdiscovery/nuclei/v3/pkg/templates" | ||
| tmpltypes "github.com/projectdiscovery/nuclei/v3/pkg/templates/types" | ||
| "github.com/projectdiscovery/nuclei/v3/pkg/types" | ||
| ) | ||
|
|
||
| // fakeExecuter is a simple stub for protocols.Executer used to test executeTemplateOnInput | ||
| type fakeExecuter struct { | ||
| withResults bool | ||
| } | ||
|
|
||
| func (f *fakeExecuter) Compile() error { return nil } | ||
| func (f *fakeExecuter) Requests() int { return 1 } | ||
| func (f *fakeExecuter) Execute(ctx *scan.ScanContext) (bool, error) { return !f.withResults, nil } | ||
| func (f *fakeExecuter) ExecuteWithResults(ctx *scan.ScanContext) ([]*output.ResultEvent, error) { | ||
| if !f.withResults { | ||
| return nil, nil | ||
| } | ||
| return []*output.ResultEvent{{Host: "h"}}, nil | ||
| } | ||
|
|
||
| // newTestEngine creates a minimal Engine for tests | ||
| func newTestEngine() *Engine { | ||
| return New(&types.Options{}) | ||
| } | ||
|
|
||
| func Test_executeTemplateOnInput_CallbackPath(t *testing.T) { | ||
| e := newTestEngine() | ||
| called := 0 | ||
| e.Callback = func(*output.ResultEvent) { called++ } | ||
|
|
||
| tpl := &templates.Template{} | ||
| tpl.Executer = &fakeExecuter{withResults: true} | ||
|
|
||
| ok, err := e.executeTemplateOnInput(context.Background(), tpl, &contextargs.MetaInput{Input: "x"}) | ||
| if err != nil { | ||
| t.Fatalf("unexpected error: %v", err) | ||
| } | ||
| if !ok { | ||
| t.Fatalf("expected match true") | ||
| } | ||
| if called == 0 { | ||
| t.Fatalf("expected callback to be called") | ||
| } | ||
| } | ||
|
|
||
| func Test_executeTemplateOnInput_ExecutePath(t *testing.T) { | ||
| e := newTestEngine() | ||
| tpl := &templates.Template{} | ||
| tpl.Executer = &fakeExecuter{withResults: false} | ||
|
|
||
| ok, err := e.executeTemplateOnInput(context.Background(), tpl, &contextargs.MetaInput{Input: "x"}) | ||
| if err != nil { | ||
| t.Fatalf("unexpected error: %v", err) | ||
| } | ||
| if !ok { | ||
| t.Fatalf("expected match true from Execute path") | ||
| } | ||
| } | ||
|
|
||
| type fakeExecuterErr struct{} | ||
|
|
||
| func (f *fakeExecuterErr) Compile() error { return nil } | ||
| func (f *fakeExecuterErr) Requests() int { return 1 } | ||
| func (f *fakeExecuterErr) Execute(ctx *scan.ScanContext) (bool, error) { return false, nil } | ||
| func (f *fakeExecuterErr) ExecuteWithResults(ctx *scan.ScanContext) ([]*output.ResultEvent, error) { | ||
| return nil, fmt.Errorf("boom") | ||
| } | ||
|
|
||
| func Test_executeTemplateOnInput_CallbackErrorPropagates(t *testing.T) { | ||
| e := newTestEngine() | ||
| e.Callback = func(*output.ResultEvent) {} | ||
| tpl := &templates.Template{} | ||
| tpl.Executer = &fakeExecuterErr{} | ||
|
|
||
| ok, err := e.executeTemplateOnInput(context.Background(), tpl, &contextargs.MetaInput{Input: "x"}) | ||
| if err == nil { | ||
| t.Fatalf("expected error to propagate") | ||
| } | ||
| if ok { | ||
| t.Fatalf("expected match to be false on error") | ||
| } | ||
| } | ||
|
|
||
| type fakeTargetProvider struct { | ||
| values []*contextargs.MetaInput | ||
| } | ||
|
|
||
| func (f *fakeTargetProvider) Count() int64 { return int64(len(f.values)) } | ||
| func (f *fakeTargetProvider) Iterate(cb func(value *contextargs.MetaInput) bool) { | ||
| for _, v := range f.values { | ||
| if !cb(v) { | ||
| return | ||
| } | ||
| } | ||
| } | ||
| func (f *fakeTargetProvider) Set(string, string) {} | ||
| func (f *fakeTargetProvider) SetWithProbe(string, string, inputtypes.InputLivenessProbe) error { | ||
| return nil | ||
| } | ||
| func (f *fakeTargetProvider) SetWithExclusions(string, string) error { return nil } | ||
| func (f *fakeTargetProvider) InputType() string { return "test" } | ||
| func (f *fakeTargetProvider) Close() {} | ||
|
|
||
| type slowExecuter struct{} | ||
|
|
||
| func (s *slowExecuter) Compile() error { return nil } | ||
| func (s *slowExecuter) Requests() int { return 1 } | ||
| func (s *slowExecuter) Execute(ctx *scan.ScanContext) (bool, error) { | ||
| select { | ||
| case <-ctx.Context().Done(): | ||
| return false, ctx.Context().Err() | ||
| case <-time.After(200 * time.Millisecond): | ||
| return true, nil | ||
| } | ||
| } | ||
| func (s *slowExecuter) ExecuteWithResults(ctx *scan.ScanContext) ([]*output.ResultEvent, error) { | ||
| return nil, nil | ||
| } | ||
|
|
||
| func Test_executeTemplateWithTargets_RespectsCancellation(t *testing.T) { | ||
| e := newTestEngine() | ||
| e.SetExecuterOptions(&protocols.ExecutorOptions{Logger: e.Logger, ResumeCfg: types.NewResumeCfg(), ProtocolType: tmpltypes.HTTPProtocol}) | ||
|
|
||
| tpl := &templates.Template{} | ||
| tpl.Executer = &slowExecuter{} | ||
|
|
||
| targets := &fakeTargetProvider{values: []*contextargs.MetaInput{{Input: "a"}, {Input: "b"}, {Input: "c"}}} | ||
|
|
||
| ctx, cancel := context.WithCancel(context.Background()) | ||
| cancel() | ||
|
|
||
| var matched atomic.Bool | ||
| e.executeTemplateWithTargets(ctx, tpl, targets, &matched) | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,62 @@ | ||
| package cache | ||
|
|
||
| import ( | ||
| "regexp" | ||
| "sync" | ||
|
|
||
| "github.com/Knetic/govaluate" | ||
| "github.com/projectdiscovery/gcache" | ||
| ) | ||
|
|
||
| var ( | ||
| initOnce sync.Once | ||
| mu sync.RWMutex | ||
|
|
||
| regexCap = 4096 | ||
| dslCap = 4096 | ||
|
|
||
| regexCache gcache.Cache[string, *regexp.Regexp] | ||
| dslCache gcache.Cache[string, *govaluate.EvaluableExpression] | ||
| ) | ||
|
|
||
| func initCaches() { | ||
| initOnce.Do(func() { | ||
| regexCache = gcache.New[string, *regexp.Regexp](regexCap).LRU().Build() | ||
| dslCache = gcache.New[string, *govaluate.EvaluableExpression](dslCap).LRU().Build() | ||
| }) | ||
| } | ||
|
|
||
| func SetCapacities(regexCapacity, dslCapacity int) { | ||
| // ensure caches are initialized under initOnce, so later Regex()/DSL() won't re-init | ||
| initCaches() | ||
dwisiswant0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| mu.Lock() | ||
| defer mu.Unlock() | ||
|
|
||
| if regexCapacity > 0 { | ||
| regexCap = regexCapacity | ||
| } | ||
| if dslCapacity > 0 { | ||
| dslCap = dslCapacity | ||
| } | ||
| if regexCapacity <= 0 && dslCapacity <= 0 { | ||
| return | ||
| } | ||
| // rebuild caches with new capacities | ||
| regexCache = gcache.New[string, *regexp.Regexp](regexCap).LRU().Build() | ||
| dslCache = gcache.New[string, *govaluate.EvaluableExpression](dslCap).LRU().Build() | ||
| } | ||
|
|
||
| func Regex() gcache.Cache[string, *regexp.Regexp] { | ||
| initCaches() | ||
| mu.RLock() | ||
| defer mu.RUnlock() | ||
| return regexCache | ||
| } | ||
|
|
||
| func DSL() gcache.Cache[string, *govaluate.EvaluableExpression] { | ||
| initCaches() | ||
| mu.RLock() | ||
| defer mu.RUnlock() | ||
| return dslCache | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.