diff --git a/op-devstack/sysgo/l1_nodes_subprocess.go b/op-devstack/sysgo/l1_nodes_subprocess.go index 56183becbbeab..d54e9e555baf2 100644 --- a/op-devstack/sysgo/l1_nodes_subprocess.go +++ b/op-devstack/sysgo/l1_nodes_subprocess.go @@ -135,7 +135,7 @@ func (n *ExternalL1Geth) Start() { func (n *ExternalL1Geth) Stop() { n.mu.Lock() defer n.mu.Unlock() - err := n.sub.Stop() + err := n.sub.Stop(true) n.p.Require().NoError(err, "Must stop") n.sub = nil } diff --git a/op-devstack/sysgo/l2_cl_kona.go b/op-devstack/sysgo/l2_cl_kona.go index 52e5655dc928f..11add6eafe5e4 100644 --- a/op-devstack/sysgo/l2_cl_kona.go +++ b/op-devstack/sysgo/l2_cl_kona.go @@ -122,7 +122,7 @@ func (k *KonaNode) Stop() { k.p.Logger().Warn("kona-node already stopped") return } - err := k.sub.Stop() + err := k.sub.Stop(true) k.p.Require().NoError(err, "Must stop") k.sub = nil } diff --git a/op-devstack/sysgo/l2_el_opreth.go b/op-devstack/sysgo/l2_el_opreth.go index 55f4682fa4d6a..184c1bab9d0a9 100644 --- a/op-devstack/sysgo/l2_el_opreth.go +++ b/op-devstack/sysgo/l2_el_opreth.go @@ -128,7 +128,7 @@ func (n *OpReth) Start() { func (n *OpReth) Stop() { n.mu.Lock() defer n.mu.Unlock() - err := n.sub.Stop() + err := n.sub.Stop(true) n.p.Require().NoError(err, "Must stop") n.sub = nil } diff --git a/op-devstack/sysgo/subproc.go b/op-devstack/sysgo/subproc.go index 799357a049733..d2f5cb202ae77 100644 --- a/op-devstack/sysgo/subproc.go +++ b/op-devstack/sysgo/subproc.go @@ -1,26 +1,16 @@ package sysgo import ( - "context" "fmt" "os" "os/exec" "sync" - "time" "github.com/ethereum-optimism/optimism/op-devstack/devtest" "github.com/ethereum-optimism/optimism/op-service/logpipe" ) -// SubProcess is a process that can be started, and stopped, and restarted. -// -// If at any point the process fails to start or exit successfully, -// the failure is reported to the devtest.P. -// -// If the sub-process exits by itself, the exit is detected, -// and if not successful (non-zero exit code on unix) it also reports failure to the devtest.P. -// -// Sub-process logs are assumed to be structured JSON logs, and are piped to the logger. +// SubProcess is a process that can be started and stopped. type SubProcess struct { p devtest.P cmd *exec.Cmd @@ -28,8 +18,6 @@ type SubProcess struct { stdOutLogs logpipe.LogProcessor stdErrLogs logpipe.LogProcessor - waitCtx context.Context // closed when process-Wait completes - mu sync.Mutex } @@ -49,38 +37,14 @@ func (sp *SubProcess) Start(cmdPath string, args []string, env []string) error { } cmd := exec.Command(cmdPath, args...) cmd.Env = append(os.Environ(), env...) - stdout, err := cmd.StdoutPipe() - sp.p.Require().NoError(err, "stdout err") - stderr, err := cmd.StderrPipe() - sp.p.Require().NoError(err, "stderr err") - go func() { - err := logpipe.PipeLogs(stdout, sp.stdOutLogs) - sp.p.Require().NoError(err, "stdout logging error") - }() - go func() { - err := logpipe.PipeLogs(stderr, sp.stdErrLogs) - sp.p.Require().NoError(err, "stderr logging error") - }() + cmd.Stdout = sp.stdOutLogs + cmd.Stderr = sp.stdErrLogs if err := cmd.Start(); err != nil { return err } sp.cmd = cmd - - subCtx, subCancel := context.WithCancelCause(context.Background()) - go func() { - state, err := cmd.Process.Wait() - subCancel(err) - sp.p.Require().NoError(err, "Sub-process failed to be closed") - sp.p.Logger().Info("Sub-process stopped", "exitCode", state.ExitCode(), "pid", state.Pid()) - // if it exited on its own, then we care about the error. If not, we (or the user) signaled it. - if state.Exited() { - sp.p.Require().True(state.Success(), "Sub-process closed with error status: %s", state.String()) - } - }() - sp.waitCtx = subCtx - sp.p.Cleanup(func() { - err := sp.Stop() + err := sp.Stop(true) if err != nil { sp.p.Logger().Error("Shutdown error", "err", err) } @@ -88,76 +52,29 @@ func (sp *SubProcess) Start(cmdPath string, args []string, env []string) error { return nil } -// Kill stops the process, and does not wait for it to complete. -func (sp *SubProcess) Kill() error { - ctx, cancel := context.WithCancel(context.Background()) - cancel() // don't wait, just force it to stop immediately - return sp.GracefulStop(ctx) -} - -// Stop implements the default control-panel interface, -// and gracefully stops with a 10-second timeout. -func (sp *SubProcess) Stop() error { - // by default, for control-panel, use an interrupt and a 10-second grace - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - defer cancel() - return sp.GracefulStop(ctx) -} - -// GracefulStop sends an interrupt and waits for the process to stop. -// If the given ctx is closed, a forced shutdown (process kill) is pursued. -func (sp *SubProcess) GracefulStop(ctx context.Context) error { +// Stop waits for the process to stop, interrupting the process if it has not completed and +// interrupt is true. +func (sp *SubProcess) Stop(interrupt bool) error { sp.mu.Lock() defer sp.mu.Unlock() if sp.cmd == nil { return nil // already stopped gracefully } - if ctx.Err() == nil && sp.waitCtx.Err() == nil { - // if not force-closing, and not already done, then try an interrupt first. + // If not already done, then try an interrupt first as requested. + if sp.cmd.ProcessState == nil && interrupt { sp.p.Logger().Info("Sending interrupt") if err := sp.cmd.Process.Signal(os.Interrupt); err != nil { return err } } - select { - case <-ctx.Done(): - sp.p.Logger().Warn("Sub-process did not respond to interrupt, force-closing now") - err := sp.cmd.Process.Kill() - if err != nil { - return fmt.Errorf("failed to force-close sub-process: %w", err) - } - sp.p.Logger().Info("Successfully force-closed sub-process") - // resources of cmd.Process will be cleaned up by the Process.Wait - case <-sp.waitCtx.Done(): - if err := context.Cause(sp.waitCtx); err != nil && err != context.Canceled { - sp.p.Logger().Warn("Sub-process exited with error", "err", err) - } else { - sp.p.Logger().Info("Sub-process gracefully exited") - } + + if _, err := sp.cmd.Process.Wait(); err != nil { + sp.p.Logger().Warn("Sub-process exited with error", "err", err) + } else { + sp.p.Logger().Info("Sub-process gracefully exited") } + sp.cmd = nil - sp.waitCtx = nil return nil } - -// Wait waits for the process to complete. -func (sp *SubProcess) Wait(ctx context.Context) error { - sp.mu.Lock() - defer sp.mu.Unlock() - if sp.waitCtx == nil { - return nil - } - select { - case <-ctx.Done(): - return ctx.Err() - case <-sp.waitCtx.Done(): - if err := context.Cause(sp.waitCtx); err != nil && err != context.Canceled { - sp.p.Logger().Warn("Sub-process exited with error", "err", err) - return err - } else { - sp.p.Logger().Info("Sub-process gracefully exited") - return nil - } - } -} diff --git a/op-devstack/sysgo/subproc_test.go b/op-devstack/sysgo/subproc_test.go index 59ab24133aa0b..e44e039aa9cd4 100644 --- a/op-devstack/sysgo/subproc_test.go +++ b/op-devstack/sysgo/subproc_test.go @@ -49,8 +49,7 @@ func TestSubProcess(gt *testing.T) { func testEcho(gt *testing.T, capt *testlog.CapturingHandler, sp *SubProcess) { require.NoError(gt, sp.Start("/bin/echo", []string{"hello world"}, []string{})) gt.Log("Started sub-process") - require.NoError(gt, sp.Wait(context.Background()), "echo must complete") - require.NoError(gt, sp.Stop()) + require.NoError(gt, sp.Stop(false)) gt.Log("Stopped sub-process") require.NotNil(gt, capt.FindLog( @@ -66,12 +65,12 @@ func testSleep(gt *testing.T, capt *testlog.CapturingHandler, sp *SubProcess) { require.NoError(gt, sp.Start("/bin/sleep", []string{"10000000000"}, []string{})) gt.Log("Started sub-process") // Shut down the process before the sleep completes - require.NoError(gt, sp.Kill()) + require.NoError(gt, sp.Stop(true)) gt.Log("Killed sub-process") require.NotNil(gt, capt.FindLog( - testlog.NewMessageFilter("Sub-process did not respond to interrupt, force-closing now"))) + testlog.NewMessageFilter("Sending interrupt"))) require.NotNil(gt, capt.FindLog( - testlog.NewMessageFilter("Successfully force-closed sub-process"))) + testlog.NewMessageFilter("Sub-process gracefully exited"))) } diff --git a/op-devstack/sysgo/supervisor_kona.go b/op-devstack/sysgo/supervisor_kona.go index a42079b50c0ff..b4473d9e73d83 100644 --- a/op-devstack/sysgo/supervisor_kona.go +++ b/op-devstack/sysgo/supervisor_kona.go @@ -109,7 +109,7 @@ func (s *KonaSupervisor) Stop() { s.p.Logger().Warn("kona-supervisor already stopped") return } - err := s.sub.Stop() + err := s.sub.Stop(true) s.p.Require().NoError(err, "Must stop") s.sub = nil } diff --git a/op-service/logpipe/pipe.go b/op-service/logpipe/pipe.go index 64e7d1b89d37e..1ee6eeb2520ce 100644 --- a/op-service/logpipe/pipe.go +++ b/op-service/logpipe/pipe.go @@ -4,8 +4,6 @@ import ( "bufio" "bytes" "encoding/json" - "errors" - "io" "log/slog" oplog "github.com/ethereum-optimism/optimism/op-service/log" @@ -82,6 +80,20 @@ type LogEntry interface { type LogProcessor func(line []byte) +func (lo LogProcessor) Write(data []byte) (int, error) { + startingLength := len(data) + buf := bytes.NewBuffer(data) + scanner := bufio.NewScanner(buf) + for scanner.Scan() { + lineBytes := scanner.Bytes() + if len(lineBytes) == 0 { + continue // Skip empty lines + } + lo(lineBytes) + } + return startingLength - buf.Len(), scanner.Err() +} + type LogParser func(line []byte) LogEntry func ToLogger(logger log.Logger) func(e LogEntry) { @@ -99,34 +111,3 @@ func ToLogger(logger log.Logger) func(e LogEntry) { logger.Log(lvl, msg, attrs...) } } - -// PipeLogs reads logs from the provided io.ReadCloser (e.g., subprocess stdout), -// and outputs them to the provider logger. -// -// This: -// 1. assumes each line is a JSON object -// 2. parses it -// 3. extracts the "level" and optional "msg" -// 4. treats remaining fields as structured attributes -// 5. logs the entries using the provided log.Logger -// -// Non-JSON lines are logged as warnings. -// Crit level is mapped to error-level, to prevent untrusted crit logs from stopping the process. -// This function processes until the stream ends, and closes the reader. -// This returns the first read error (If we run into EOF, nil returned is returned instead). -func PipeLogs(r io.ReadCloser, onLog LogProcessor) (outErr error) { - defer func() { - outErr = errors.Join(outErr, r.Close()) - }() - - scanner := bufio.NewScanner(r) - for scanner.Scan() { - lineBytes := scanner.Bytes() - if len(lineBytes) == 0 { - continue // Skip empty lines - } - onLog(lineBytes) - } - - return scanner.Err() -} diff --git a/op-service/logpipe/pipe_test.go b/op-service/logpipe/pipe_test.go index 19b4a1baee405..df6b7ed9c5a2f 100644 --- a/op-service/logpipe/pipe_test.go +++ b/op-service/logpipe/pipe_test.go @@ -1,9 +1,8 @@ package logpipe import ( - "bytes" "io" - "sync" + "strings" "testing" "github.com/stretchr/testify/require" @@ -13,35 +12,18 @@ import ( "github.com/ethereum-optimism/optimism/op-service/testlog" ) -func TestPipeLogs(t *testing.T) { +func TestWriteToLogProcessor(t *testing.T) { logger, capt := testlog.CaptureLogger(t, log.LevelTrace) - wg := new(sync.WaitGroup) - wg.Add(2) - - r, w := io.Pipe() - // Write the log output to the pipe - go func() { - defer wg.Done() - _, err := io.Copy(w, bytes.NewReader([]byte(`{"level": "DEBUG", "fields": {"message": "hello", "foo": 1}}`+"\n"))) - require.NoError(t, err) - _, err = io.Copy(w, bytes.NewReader([]byte(`test invalid JSON`+"\n"))) - require.NoError(t, err) - _, err = io.Copy(w, bytes.NewReader([]byte(`{"fields": {"message": "world", "bar": "sunny"}, "level": "INFO"}`+"\n"))) - require.NoError(t, err) - require.NoError(t, w.Close()) - }() - // Read the log output from the pipe - go func() { - defer wg.Done() - toLogger := ToLogger(logger) - logProc := func(line []byte) { - toLogger(ParseRustStructuredLogs(line)) - } - err := PipeLogs(r, logProc) - require.NoError(t, err) - }() - wg.Wait() + logProc := LogProcessor(func(line []byte) { + ToLogger(logger)(ParseRustStructuredLogs(line)) + }) + _, err := io.Copy(logProc, strings.NewReader(`{"level": "DEBUG", "fields": {"message": "hello", "foo": 1}}`+"\n")) + require.NoError(t, err) + _, err = io.Copy(logProc, strings.NewReader(`test invalid JSON`+"\n")) + require.NoError(t, err) + _, err = io.Copy(logProc, strings.NewReader(`{"fields": {"message": "world", "bar": "sunny"}, "level": "INFO"}`+"\n")) + require.NoError(t, err) entry1 := capt.FindLog( testlog.NewLevelFilter(log.LevelDebug),