diff --git a/cmd/integration-test/integration-test.go b/cmd/integration-test/integration-test.go index 10af587c04..0f16192e37 100644 --- a/cmd/integration-test/integration-test.go +++ b/cmd/integration-test/integration-test.go @@ -6,6 +6,7 @@ import ( "os" "regexp" "runtime" + "slices" "strings" "github.com/kitabisa/go-ci" @@ -24,7 +25,7 @@ type TestCaseInfo struct { } var ( - debug = os.Getenv("DEBUG") == "true" + debug = isDebugMode() customTests = os.Getenv("TESTS") protocol = os.Getenv("PROTO") @@ -59,6 +60,7 @@ var ( "matcher-status": matcherStatusTestcases, "exporters": exportersTestCases, } + // flakyTests are run with a retry count of 3 flakyTests = map[string]bool{ "protocols/http/self-contained-file-input.yaml": true, @@ -89,11 +91,12 @@ func main() { } // start fuzz playground server - defer fuzzplayground.Cleanup() server := fuzzplayground.GetPlaygroundServer() defer func() { + fuzzplayground.Cleanup() _ = server.Close() }() + go func() { if err := server.Start("localhost:8082"); err != nil { if !strings.Contains(err.Error(), "Server closed") { @@ -103,7 +106,6 @@ func main() { }() customTestsList := normalizeSplit(customTests) - failedTestTemplatePaths := runTests(customTestsList) if len(failedTestTemplatePaths) > 0 { @@ -130,6 +132,27 @@ func main() { } } +// isDebugMode checks if debug mode is enabled via any of the supported debug +// environment variables. +func isDebugMode() bool { + debugEnvVars := []string{ + "DEBUG", + "ACTIONS_RUNNER_DEBUG", // GitHub Actions runner debug + // Add more debug environment variables here as needed + } + + truthyValues := []string{"true", "1", "yes", "on", "enabled"} + + for _, envVar := range debugEnvVars { + envValue := strings.ToLower(strings.TrimSpace(os.Getenv(envVar))) + if slices.Contains(truthyValues, envValue) { + return true + } + } + + return false +} + // execute a testcase with retry and consider best of N // intended for flaky tests like interactsh func executeWithRetry(testCase testutils.TestCase, templatePath string, retryCount int) (string, error) { diff --git a/pkg/core/workflow_execute.go b/pkg/core/workflow_execute.go index 55d19dd677..697312aa47 100644 --- a/pkg/core/workflow_execute.go +++ b/pkg/core/workflow_execute.go @@ -34,17 +34,14 @@ func (e *Engine) executeWorkflow(ctx *scan.ScanContext, w *workflows.Workflow) b swg, _ := syncutil.New(syncutil.WithSize(templateThreads)) for _, template := range w.Workflows { - swg.Add() - - func(template *workflows.WorkflowTemplate) { - defer swg.Done() - - if err := e.runWorkflowStep(template, ctx, results, swg, w); err != nil { - gologger.Warning().Msgf(workflowStepExecutionError, template.Template, err) - } - }(template) + newCtx := scan.NewScanContext(ctx.Context(), ctx.Input.Clone()) + if err := e.runWorkflowStep(template, newCtx, results, swg, w); err != nil { + gologger.Warning().Msgf(workflowStepExecutionError, template.Template, err) + } } + swg.Wait() + return results.Load() } diff --git a/pkg/protocols/file/request.go b/pkg/protocols/file/request.go index 853cbb602f..b715a4bfc9 100644 --- a/pkg/protocols/file/request.go +++ b/pkg/protocols/file/request.go @@ -59,7 +59,7 @@ func (request *Request) ExecuteWithResults(input *contextargs.Context, metadata, } err = request.getInputPaths(input.MetaInput.Input, func(filePath string) { wg.Add() - func(filePath string) { + go func(filePath string) { defer wg.Done() fi, err := os.Open(filePath) if err != nil { diff --git a/pkg/protocols/file/request_test.go b/pkg/protocols/file/request_test.go index 118d1885c4..3f20de2ed5 100644 --- a/pkg/protocols/file/request_test.go +++ b/pkg/protocols/file/request_test.go @@ -7,7 +7,10 @@ import ( "context" "os" "path/filepath" + "sync" + "sync/atomic" "testing" + "time" "github.com/stretchr/testify/require" @@ -132,3 +135,80 @@ func TestFileExecuteWithResults(t *testing.T) { finalEvent = nil } } + +func TestFileProtocolConcurrentExecution(t *testing.T) { + tempDir, err := os.MkdirTemp("", "nuclei-test-*") + require.NoError(t, err) + + defer func() { + _ = os.RemoveAll(tempDir) + }() + + numFiles := 5 + for i := range numFiles { + content := "TEST_CONTENT_MATCH_DATA" + filePath := filepath.Join(tempDir, "test_"+string(rune('0'+i))+".txt") + err := os.WriteFile(filePath, []byte(content), permissionutil.TempFilePermission) + require.NoError(t, err) + } + + options := testutils.DefaultOptions + testutils.Init(options) + templateID := "testing-file-concurrent" + executerOpts := testutils.NewMockExecuterOptions(options, &testutils.TemplateInfo{ + ID: templateID, + Info: model.Info{SeverityHolder: severity.Holder{Severity: severity.Low}, Name: "test"}, + }) + + var timesMutex sync.Mutex + var processedFiles int64 + + request := &Request{ + ID: templateID, + MaxSize: "1Gb", + NoRecursive: false, + Extensions: []string{"txt"}, + Archive: false, + Operators: operators.Operators{ + Matchers: []*matchers.Matcher{{ + Name: "test", + Part: "raw", + Type: matchers.MatcherTypeHolder{MatcherType: matchers.WordsMatcher}, + Words: []string{"TEST_CONTENT_MATCH_DATA"}, + }}, + }, + options: executerOpts, + } + + err = request.Compile(executerOpts) + require.NoError(t, err) + + input := contextargs.NewWithInput(context.Background(), tempDir) + var results []*output.InternalWrappedEvent + var resultMutex sync.Mutex + + startTime := time.Now() + err = request.ExecuteWithResults(input, make(output.InternalEvent), make(output.InternalEvent), func(event *output.InternalWrappedEvent) { + atomic.AddInt64(&processedFiles, 1) + resultMutex.Lock() + results = append(results, event) + resultMutex.Unlock() + + // small delay to make timing differences more observable + time.Sleep(10 * time.Millisecond) + }) + totalTime := time.Since(startTime) + require.NoError(t, err) + + finalProcessedFiles := atomic.LoadInt64(&processedFiles) + t.Logf("Total execution time: %v", totalTime) + t.Logf("Files processed: %d", finalProcessedFiles) + t.Logf("Results returned: %d", len(results)) + + // test 1: all files should be processed + require.Equal(t, int64(numFiles), finalProcessedFiles, "Not all files were processed") + + // test 2: verify callback invocation timing shows concurrency + timesMutex.Lock() + defer timesMutex.Unlock() +}