diff --git a/pipeline/backend/local/clone.go b/pipeline/backend/local/clone.go index 53fa1503485..11b4a8ad2fe 100644 --- a/pipeline/backend/local/clone.go +++ b/pipeline/backend/local/clone.go @@ -100,12 +100,20 @@ func (e *local) execClone(ctx context.Context, step *types.Step, state *workflow cmd.Env = env cmd.Dir = state.workspaceDir + reader, err := cmd.StdoutPipe() + if err != nil { + return err + } + + // Save state + state.stepState.Store(step.UUID, &stepState{ + cmd: cmd, + output: reader, + }) + // Get output and redirect Stderr to Stdout - e.output, _ = cmd.StdoutPipe() cmd.Stderr = cmd.Stdout - state.stepCMDs[step.UUID] = cmd - return cmd.Start() } diff --git a/pipeline/backend/local/command.go b/pipeline/backend/local/command.go index 225eca53889..099b9d3c4f2 100644 --- a/pipeline/backend/local/command.go +++ b/pipeline/backend/local/command.go @@ -17,14 +17,66 @@ package local import ( + "context" "fmt" + "io" "os" "os/exec" "strings" "al.essio.dev/pkg/shellescape" + "golang.org/x/text/encoding/unicode" + "golang.org/x/text/transform" + + "go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types" ) +// execCommands use step.Image as shell and run the commands in it. +func (e *local) execCommands(ctx context.Context, step *types.Step, state *workflowState, env []string) error { + if err := checkShellExistence(step.Image); err != nil { + return err + } + + // Prepare commands + // TODO: support `entrypoint` from pipeline config + args, err := e.genCmdByShell(step.Image, step.Commands) + if err != nil { + return fmt.Errorf("could not convert commands into args: %w", err) + } + + // Use "image name" as run command (indicate shell) + cmd := exec.CommandContext(ctx, step.Image, args...) + cmd.Env = env + cmd.Dir = state.workspaceDir + + reader, err := cmd.StdoutPipe() + if err != nil { + return err + } + + if e.os == "windows" { + // we get non utf8 output from windows so just sanitize it + // TODO: remove hack + reader = io.NopCloser(transform.NewReader(reader, unicode.UTF8.NewDecoder().Transformer)) + } + + // Get output and redirect Stderr to Stdout + cmd.Stderr = cmd.Stdout + + // Save state + state.stepState.Store(step.UUID, &stepState{ + cmd: cmd, + output: reader, + }) + + return cmd.Start() +} + +func checkShellExistence(shell string) error { + _, err := exec.LookPath(shell) + return err +} + func (e *local) genCmdByShell(shell string, cmdList []string) (args []string, err error) { if len(cmdList) == 0 { return nil, ErrNoCmdSet diff --git a/pipeline/backend/local/const.go b/pipeline/backend/local/const.go index 4c53de89ff2..604fec0a64f 100644 --- a/pipeline/backend/local/const.go +++ b/pipeline/backend/local/const.go @@ -15,7 +15,6 @@ package local import ( - "errors" "fmt" ) @@ -30,11 +29,6 @@ var notAllowedEnvVarOverwrites = []string{ "CI_WORKSPACE", } -var ( - ErrUnsupportedStepType = errors.New("unsupported step type") - ErrWorkflowStateNotFound = errors.New("workflow state not found") -) - const netrcFile = ` machine %s login %s diff --git a/pipeline/backend/local/errors.go b/pipeline/backend/local/errors.go index f9c11c71a74..0a7c0aeb68c 100644 --- a/pipeline/backend/local/errors.go +++ b/pipeline/backend/local/errors.go @@ -22,8 +22,12 @@ import ( ) var ( - ErrNoShellSet = errors.New("no shell was set") - ErrNoCmdSet = errors.New("no commands where set") + ErrUnsupportedStepType = errors.New("unsupported step type") + ErrStepReaderNotFound = errors.New("could not found pipe reader for step") + ErrWorkflowStateNotFound = errors.New("workflow state not found") + ErrStepStateNotFound = errors.New("step state not found") + ErrNoShellSet = errors.New("no shell was set") + ErrNoCmdSet = errors.New("no commands where set") ) // ErrNoPosixShell indicates that a shell was assumed to be POSIX-compatible but failed the test. diff --git a/pipeline/backend/local/local.go b/pipeline/backend/local/local.go index ab5eff802d8..5514889e83c 100644 --- a/pipeline/backend/local/local.go +++ b/pipeline/backend/local/local.go @@ -28,24 +28,26 @@ import ( "github.com/rs/zerolog/log" "github.com/urfave/cli/v3" - "golang.org/x/text/encoding/unicode" - "golang.org/x/text/transform" "go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types" ) type workflowState struct { - stepCMDs map[string]*exec.Cmd + stepState sync.Map // map of *stepState baseDir string homeDir string workspaceDir string pluginGitBinary string } +type stepState struct { + cmd *exec.Cmd + output io.ReadCloser +} + type local struct { tempDir string workflows sync.Map - output io.ReadCloser pluginGitBinary string os, arch string } @@ -89,7 +91,6 @@ func (e *local) Load(ctx context.Context) (*types.BackendInfo, error) { }, nil } -// SetupWorkflow the pipeline environment. func (e *local) SetupWorkflow(_ context.Context, _ *types.Config, taskUUID string) error { log.Trace().Str("taskUUID", taskUUID).Msg("create workflow environment") @@ -99,7 +100,6 @@ func (e *local) SetupWorkflow(_ context.Context, _ *types.Config, taskUUID strin } state := &workflowState{ - stepCMDs: make(map[string]*exec.Cmd), baseDir: baseDir, workspaceDir: filepath.Join(baseDir, "workspace"), homeDir: filepath.Join(baseDir, "home"), @@ -113,16 +113,15 @@ func (e *local) SetupWorkflow(_ context.Context, _ *types.Config, taskUUID strin return err } - e.saveState(taskUUID, state) + e.workflows.Store(taskUUID, state) return nil } -// StartStep the pipeline step. func (e *local) StartStep(ctx context.Context, step *types.Step, taskUUID string) error { log.Trace().Str("taskUUID", taskUUID).Msgf("start step %s", step.Name) - state, err := e.getState(taskUUID) + state, err := e.getWorkflowState(taskUUID) if err != nil { return err } @@ -153,117 +152,93 @@ func (e *local) StartStep(ctx context.Context, step *types.Step, taskUUID string } } -// execCommands use step.Image as shell and run the commands in it. -func (e *local) execCommands(ctx context.Context, step *types.Step, state *workflowState, env []string) error { - // Prepare commands - // TODO: support `entrypoint` from pipeline config - args, err := e.genCmdByShell(step.Image, step.Commands) - if err != nil { - return fmt.Errorf("could not convert commands into args: %w", err) - } - - // Use "image name" as run command (indicate shell) - cmd := exec.CommandContext(ctx, step.Image, args...) - cmd.Env = env - cmd.Dir = state.workspaceDir - - // Get output and redirect Stderr to Stdout - e.output, _ = cmd.StdoutPipe() - cmd.Stderr = cmd.Stdout - - if e.os == "windows" { - // we get non utf8 output from windows so just sanitize it - // TODO: remove hack - e.output = io.NopCloser(transform.NewReader(e.output, unicode.UTF8.NewDecoder().Transformer)) - } - - state.stepCMDs[step.UUID] = cmd - - return cmd.Start() -} - -// execPlugin use step.Image as exec binary. -func (e *local) execPlugin(ctx context.Context, step *types.Step, state *workflowState, env []string) error { - binary, err := exec.LookPath(step.Image) - if err != nil { - return fmt.Errorf("lookup plugin binary: %w", err) - } - - cmd := exec.CommandContext(ctx, binary) - cmd.Env = env - cmd.Dir = state.workspaceDir - - // Get output and redirect Stderr to Stdout - e.output, _ = cmd.StdoutPipe() - cmd.Stderr = cmd.Stdout - - state.stepCMDs[step.UUID] = cmd - - return cmd.Start() -} - -// WaitStep for the pipeline step to complete and returns -// the completion results. func (e *local) WaitStep(_ context.Context, step *types.Step, taskUUID string) (*types.State, error) { log.Trace().Str("taskUUID", taskUUID).Msgf("wait for step %s", step.Name) - state, err := e.getState(taskUUID) + state, err := e.getStepState(taskUUID, step.UUID) if err != nil { return nil, err } - cmd, ok := state.stepCMDs[step.UUID] - if !ok { - return nil, fmt.Errorf("step cmd for %s not found", step.UUID) + // normally we use cmd.Wait() to wait for *exec.Cmd, but cmd.StdoutPipe() tells us not + // as Wait() would close the io pipe even if not all logs where read and send back + // so we have to do use the underlying functions + if state.cmd.Process == nil { + return nil, errors.New("exec: not started") } - - err = cmd.Wait() - ExitCode := 0 - - var execExitError *exec.ExitError - if errors.As(err, &execExitError) { - ExitCode = execExitError.ExitCode() - // Non-zero exit code is a pipeline failure, but not an agent error. - err = nil + if state.cmd.ProcessState == nil { + cmdState, err := state.cmd.Process.Wait() + if err != nil { + return nil, err + } + state.cmd.ProcessState = cmdState } return &types.State{ Exited: true, - ExitCode: ExitCode, + ExitCode: state.cmd.ProcessState.ExitCode(), }, err } -// TailStep the pipeline step logs. func (e *local) TailStep(_ context.Context, step *types.Step, taskUUID string) (io.ReadCloser, error) { - log.Trace().Str("taskUUID", taskUUID).Msgf("tail logs of step %s", step.Name) - return e.output, nil + state, err := e.getStepState(taskUUID, step.UUID) + if err != nil { + return nil, err + } else if state.output == nil { + return nil, ErrStepReaderNotFound + } + return state.output, nil } -func (e *local) DestroyStep(_ context.Context, _ *types.Step, _ string) error { - // WaitStep already waits for the command to finish, so there is nothing to do here. +func (e *local) DestroyStep(_ context.Context, step *types.Step, taskUUID string) error { + state, err := e.getStepState(taskUUID, step.UUID) + if err != nil { + return err + } + + // As WaitStep can not use cmd.Wait() witch ensures the process already finished and + // the io pipe is closed on process end, we make sure it is done. + _ = state.output.Close() + state.output = nil + _ = state.cmd.Cancel() + state.cmd = nil + workflowState, _ := e.getWorkflowState(taskUUID) + workflowState.stepState.Delete(step.UUID) + return nil } -// DestroyWorkflow the pipeline environment. func (e *local) DestroyWorkflow(_ context.Context, _ *types.Config, taskUUID string) error { log.Trace().Str("taskUUID", taskUUID).Msg("delete workflow environment") - state, err := e.getState(taskUUID) + state, err := e.getWorkflowState(taskUUID) if err != nil { return err } + // clean up steps not cleaned up because of context cancel or detached function + state.stepState.Range(func(_, value any) bool { + state, _ := value.(*stepState) + _ = state.output.Close() + state.output = nil + _ = state.cmd.Cancel() + state.cmd = nil + return true + }) + err = os.RemoveAll(state.baseDir) if err != nil { return err } - e.deleteState(taskUUID) + // hint for the gc to clean stuff + state.stepState.Clear() + e.workflows.Delete(taskUUID) return err } -func (e *local) getState(taskUUID string) (*workflowState, error) { +func (e *local) getWorkflowState(taskUUID string) (*workflowState, error) { state, ok := e.workflows.Load(taskUUID) if !ok { return nil, ErrWorkflowStateNotFound @@ -277,10 +252,21 @@ func (e *local) getState(taskUUID string) (*workflowState, error) { return s, nil } -func (e *local) saveState(taskUUID string, state *workflowState) { - e.workflows.Store(taskUUID, state) -} +func (e *local) getStepState(taskUUID, stepUUID string) (*stepState, error) { + wState, err := e.getWorkflowState(taskUUID) + if err != nil { + return nil, err + } -func (e *local) deleteState(taskUUID string) { - e.workflows.Delete(taskUUID) + state, ok := wState.stepState.Load(stepUUID) + if !ok { + return nil, ErrStepStateNotFound + } + + s, ok := state.(*stepState) + if !ok { + return nil, fmt.Errorf("could not parse state: %v", state) + } + + return s, nil } diff --git a/pipeline/backend/local/local_test.go b/pipeline/backend/local/local_test.go new file mode 100644 index 00000000000..20c84dd4ac2 --- /dev/null +++ b/pipeline/backend/local/local_test.go @@ -0,0 +1,484 @@ +// Copyright 2022 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build linux +// +build linux + +package local + +import ( + "context" + "fmt" + "io" + "os" + "os/exec" + "path/filepath" + "runtime" + "slices" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/urfave/cli/v3" + + "go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types" +) + +func TestIsAvailable(t *testing.T) { + t.Run("not available in container", func(t *testing.T) { + backend := New() + + t.Setenv("WOODPECKER_IN_CONTAINER", "true") + + available := backend.IsAvailable(context.Background()) + assert.False(t, available) + }) + + t.Run("available without container env and no cli context", func(t *testing.T) { + backend := New() + + os.Unsetenv("WOODPECKER_IN_CONTAINER") + available := backend.IsAvailable(context.Background()) + assert.True(t, available) + }) +} + +func TestLoad(t *testing.T) { + backend, _ := New().(*local) + + t.Run("load without cli context", func(t *testing.T) { + ctx := context.Background() + info, err := backend.Load(ctx) + + require.NoError(t, err) + assert.NotNil(t, info) + assert.Equal(t, runtime.GOOS+"/"+runtime.GOARCH, info.Platform) + }) + + t.Run("load with cli context and temp dir", func(t *testing.T) { + tmpDir := t.TempDir() + cmd := &cli.Command{} + cmd.Flags = []cli.Flag{ + &cli.StringFlag{ + Name: "backend-local-temp-dir", + Value: tmpDir, + }, + } + ctx := context.WithValue(context.Background(), types.CliCommand, cmd) + + info, err := backend.Load(ctx) + + require.NoError(t, err) + assert.NotNil(t, info) + assert.Equal(t, tmpDir, backend.tempDir) + assert.Equal(t, runtime.GOOS+"/"+runtime.GOARCH, info.Platform) + }) +} + +func TestSetupWorkflow(t *testing.T) { + backend, _ := New().(*local) + backend.tempDir = t.TempDir() + + ctx := context.Background() + taskUUID := "test-task-uuid-123" + config := &types.Config{} + + err := backend.SetupWorkflow(ctx, config, taskUUID) + require.NoError(t, err) + + // Verify state was saved + state, err := backend.getWorkflowState(taskUUID) + require.NoError(t, err) + assert.NotNil(t, state) + assert.NotEmpty(t, state.baseDir) + assert.NotEmpty(t, state.workspaceDir) + assert.NotEmpty(t, state.homeDir) + + // Verify directories were created + assert.DirExists(t, state.baseDir) + assert.DirExists(t, state.workspaceDir) + assert.DirExists(t, state.homeDir) + + // Verify directory structure + assert.Equal(t, filepath.Join(state.baseDir, "workspace"), state.workspaceDir) + assert.Equal(t, filepath.Join(state.baseDir, "home"), state.homeDir) + + // Cleanup + assert.NoError(t, os.RemoveAll(state.baseDir)) +} + +func TestDestroyWorkflow(t *testing.T) { + backend, _ := New().(*local) + backend.tempDir = t.TempDir() + + ctx := context.Background() + taskUUID := "test-destroy-task" + config := &types.Config{} + + // Setup workflow first + err := backend.SetupWorkflow(ctx, config, taskUUID) + require.NoError(t, err) + + state, err := backend.getWorkflowState(taskUUID) + require.NoError(t, err) + baseDir := state.baseDir + + // Verify directory exists + assert.DirExists(t, baseDir) + + // Destroy workflow + err = backend.DestroyWorkflow(ctx, config, taskUUID) + require.NoError(t, err) + + // Verify directory was removed + assert.NoDirExists(t, baseDir) + + // Verify state was deleted + _, err = backend.getWorkflowState(taskUUID) + assert.ErrorIs(t, err, ErrWorkflowStateNotFound) +} + +func prepairEnv(t *testing.T) { + prevEnv := os.Environ() + os.Clearenv() + t.Cleanup(func() { + for i := range prevEnv { + env := strings.SplitN(prevEnv[i], "=", 2) + //nolint:usetesting // reason: the suggested t.Setenv will be undone on t.Run() end witch we explizite dont want here + _ = os.Setenv(env[0], env[1]) + } + }) +} + +func TestRunStep(t *testing.T) { + if runtime.GOOS != "linux" { + t.Skip("skipping on non linux due to shell availability and symlink capability") + } + + // we lookup shell tools we use first and create the PATH var based on that + shBinary, err := exec.LookPath("sh") + require.NoError(t, err) + path := []string{filepath.Dir(shBinary)} + echoBinary, err := exec.LookPath("echo") + require.NoError(t, err) + if echoPath := filepath.Dir(echoBinary); !slices.Contains(path, echoPath) { + path = append(path, echoPath) + } + // we make a symlinc to have a posix but non default shell + altShellDir := t.TempDir() + altShellPath := filepath.Join(altShellDir, "altsh") + require.NoError(t, os.Symlink(shBinary, altShellPath)) + path = append(path, altShellDir) + + prepairEnv(t) + //nolint:usetesting // reason: we use prepairEnv() + os.Setenv("PATH", strings.Join(path, ":")) + + backend, _ := New().(*local) + backend.tempDir = t.TempDir() + ctx := t.Context() + taskUUID := "test-run-tasks" + + // Setup workflow + require.NoError(t, backend.SetupWorkflow(ctx, &types.Config{}, taskUUID)) + + t.Run("type commands", func(t *testing.T) { + step := &types.Step{ + UUID: "step-1", + Name: "test-step", + Type: types.StepTypeCommands, + Image: "sh", + Commands: []string{"echo hello", "env"}, + Environment: map[string]string{ + "TEST_VAR": "test_value", + }, + } + + t.Run("start successful", func(t *testing.T) { + err = backend.StartStep(ctx, step, taskUUID) + require.NoError(t, err) + + // Verify command was started + state, err := backend.getWorkflowState(taskUUID) + require.NoError(t, err) + stepStateWraped, contains := state.stepState.Load(step.UUID) + assert.True(t, contains) + stepState, _ := stepStateWraped.(*stepState) + assert.NotNil(t, stepState.cmd) + + var outputData []byte + outputDataMutex := sync.Mutex{} + go t.Run("TailStep", func(t *testing.T) { + outputDataMutex.Lock() + go outputDataMutex.Unlock() + output, err := backend.TailStep(ctx, step, taskUUID) + require.NoError(t, err) + assert.NotNil(t, output) + + // Read output + outputData, err = io.ReadAll(output) + require.NoError(t, err) + }) + + // Wait for step to finish + t.Run("TestWaitStep", func(t *testing.T) { + state, err := backend.WaitStep(ctx, step, taskUUID) + require.NoError(t, err) + assert.True(t, state.Exited) + assert.Equal(t, 0, state.ExitCode) + }) + + // Verify output + outputDataMutex.Lock() + go outputDataMutex.Unlock() + outputLines := strings.Split(strings.TrimSpace(string(outputData)), "\n") + // we first test output without environments + wantBeforeEnvs := []string{ + "+ echo hello", + "hello", + "+ env", + } + gotBeforeEnvs := outputLines[:len(wantBeforeEnvs)] + assert.Equal(t, wantBeforeEnvs, gotBeforeEnvs) + // we filter out nixos specific stuff catched up in env output + gotEnvs := slices.DeleteFunc(outputLines[len(wantBeforeEnvs):], func(s string) bool { + return strings.HasPrefix(s, "_=") || strings.HasPrefix(s, "SHLVL=") + }) + assert.ElementsMatch(t, []string{ + "PWD=" + state.baseDir + "/workspace", + "USERPROFILE=" + state.baseDir + "/home", + "TEST_VAR=test_value", + "HOME=" + state.baseDir + "/home", + "CI_WORKSPACE=" + state.baseDir + "/workspace", + "PATH=" + strings.Join(path, ":"), + }, gotEnvs) + }) + }) + + t.Run("run command in alternate unix shell", func(t *testing.T) { + step := &types.Step{ + UUID: "step-altshell", + Name: "altshell", + Type: types.StepTypeCommands, + Image: "altsh", + Commands: []string{"echo success"}, + } + + err = backend.StartStep(ctx, step, taskUUID) + require.NoError(t, err) + + state, err := backend.WaitStep(ctx, step, taskUUID) + require.NoError(t, err) + assert.True(t, state.Exited) + assert.Equal(t, 0, state.ExitCode) + }) + + t.Run("command should fail", func(t *testing.T) { + step := &types.Step{ + UUID: "step-fail", + Name: "fail-step", + Type: types.StepTypeCommands, + Image: "sh", + Commands: []string{"exit 1"}, + } + + err = backend.StartStep(ctx, step, taskUUID) + require.NoError(t, err) + + state, err := backend.WaitStep(ctx, step, taskUUID) + require.NoError(t, err) + assert.True(t, state.Exited) + assert.Equal(t, 1, state.ExitCode) + }) + + t.Run("WaitStep", func(t *testing.T) { + t.Run("step not found", func(t *testing.T) { + step := &types.Step{ + UUID: "nonexistent-step", + Name: "missing", + } + + _, err = backend.WaitStep(ctx, step, taskUUID) + assert.Error(t, err) + assert.Contains(t, err.Error(), "not found") + }) + }) + + t.Run("type plugin", func(t *testing.T) { + step := &types.Step{ + UUID: "step-plugin-1", + Name: "test-plugin", + Type: types.StepTypePlugin, + Image: "echo", // Use a binary that exists + Environment: map[string]string{}, + } + + t.Run("start", func(t *testing.T) { + err = backend.StartStep(ctx, step, taskUUID) + require.NoError(t, err) + + // Verify command was started + state, err := backend.getStepState(taskUUID, step.UUID) + require.NoError(t, err) + assert.NotEqualf(t, 0, state.cmd.Process.Pid, "expect an pid of the process") + }) + }) + + t.Run("type unsupported", func(t *testing.T) { + step := &types.Step{ + UUID: "step-unsupported", + Name: "test-unsupported", + Type: "unsupported-type", + } + + t.Run("start", func(t *testing.T) { + err = backend.StartStep(ctx, step, taskUUID) + assert.ErrorIs(t, err, ErrUnsupportedStepType) + }) + }) + + // Cleanup + assert.NoError(t, backend.DestroyWorkflow(ctx, &types.Config{}, taskUUID)) +} + +func TestStateManagement(t *testing.T) { + backend, _ := New().(*local) + + t.Run("save and get state", func(t *testing.T) { + taskUUID := "test-state-uuid" + state := &workflowState{ + baseDir: "/tmp/test", + homeDir: "/tmp/test/2home", + workspaceDir: "/tmp/test/2workspace", + } + + backend.workflows.Store(taskUUID, state) + + retrieved, err := backend.getWorkflowState(taskUUID) + require.NoError(t, err) + assert.Equal(t, state.baseDir, retrieved.baseDir) + assert.Equal(t, state.homeDir, retrieved.homeDir) + assert.Equal(t, state.workspaceDir, retrieved.workspaceDir) + }) + + t.Run("get nonexistent state", func(t *testing.T) { + _, err := backend.getWorkflowState("nonexistent-uuid") + assert.ErrorIs(t, err, ErrWorkflowStateNotFound) + }) + + t.Run("delete state", func(t *testing.T) { + taskUUID := "test-delete-uuid" + state := &workflowState{} + + backend.workflows.Store(taskUUID, state) + + // Verify state exists + _, err := backend.getWorkflowState(taskUUID) + require.NoError(t, err) + + // Delete state + backend.workflows.Delete(taskUUID) + + // Verify state is gone + _, err = backend.getWorkflowState(taskUUID) + assert.ErrorIs(t, err, ErrWorkflowStateNotFound) + }) +} + +func TestConcurrentWorkflows(t *testing.T) { + backend, _ := New().(*local) + backend.tempDir = t.TempDir() + + ctx := context.Background() + + // Create multiple workflows concurrently + taskUUIDs := []string{"task-1", "task-2", "task-3"} + + for _, uuid := range taskUUIDs { + err := backend.SetupWorkflow(ctx, &types.Config{}, uuid) + require.NoError(t, err) + } + + counter := atomic.Int32{} + counter.Store(0) + for _, uuid := range taskUUIDs { + go t.Run("start step in "+uuid, func(t *testing.T) { + for i := 0; i < 3; i++ { + counter.Store(counter.Load() + 1) + step := &types.Step{ + UUID: fmt.Sprintf("step-%s-%d", uuid, i), + Name: fmt.Sprintf("step-name-%s-%d", uuid, i), + Type: types.StepTypePlugin, + Image: "sh", + Commands: []string{fmt.Sprintf("echo %s %d", uuid, i)}, + Environment: map[string]string{}, + } + require.NoError(t, backend.StartStep(ctx, step, uuid)) + _, err := backend.WaitStep(ctx, step, uuid) + require.NoError(t, err) + counter.Store(counter.Load() - 1) + } + }) + } + + // Verify all states exist + for _, uuid := range taskUUIDs { + state, err := backend.getWorkflowState(uuid) + require.NoError(t, err) + assert.NotNil(t, state) + } + + failSave := 0 +loop: + for { + if failSave == 10000 { // wait max 10s + t.Log("failSave was hit") + t.FailNow() + } + failSave++ + select { + case <-time.After(time.Millisecond): + if count := counter.Load(); count == 0 { + break loop + } else { + t.Logf("count at: %d", count) + } + case <-ctx.Done(): + return + } + } + + // Cleanup all workflows + for _, uuid := range taskUUIDs { + // Cleanup all steps + for i := 0; i < 3; i++ { + stepUUID := fmt.Sprintf("step-%s-%d", uuid, i) + assert.NoError(t, backend.DestroyStep(ctx, &types.Step{UUID: stepUUID}, uuid)) + } + + // finish with workflow cleanup + err := backend.DestroyWorkflow(ctx, &types.Config{}, uuid) + require.NoError(t, err) + } + + // Verify all states are deleted + for _, uuid := range taskUUIDs { + _, err := backend.getWorkflowState(uuid) + assert.ErrorIs(t, err, ErrWorkflowStateNotFound) + } +} diff --git a/pipeline/backend/local/plugin.go b/pipeline/backend/local/plugin.go new file mode 100644 index 00000000000..fa8cec58aca --- /dev/null +++ b/pipeline/backend/local/plugin.go @@ -0,0 +1,51 @@ +// Copyright 2025 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package local + +import ( + "context" + "fmt" + "os/exec" + + "go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types" +) + +// execPlugin use step.Image as exec binary. +func (e *local) execPlugin(ctx context.Context, step *types.Step, state *workflowState, env []string) error { + binary, err := exec.LookPath(step.Image) + if err != nil { + return fmt.Errorf("lookup plugin binary: %w", err) + } + + cmd := exec.CommandContext(ctx, binary) + cmd.Env = env + cmd.Dir = state.workspaceDir + + reader, err := cmd.StdoutPipe() + if err != nil { + return err + } + + // Get output and redirect Stderr to Stdout + cmd.Stderr = cmd.Stdout + + // Save state + state.stepState.Store(step.UUID, &stepState{ + cmd: cmd, + output: reader, + }) + + return cmd.Start() +}