Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion op-devstack/sysgo/l1_nodes_subprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (n *ExternalL1Geth) Start() {
func (n *ExternalL1Geth) Stop() {
n.mu.Lock()
defer n.mu.Unlock()
err := n.sub.Stop()
err := n.sub.Stop(true)
n.p.Require().NoError(err, "Must stop")
n.sub = nil
}
Expand Down
2 changes: 1 addition & 1 deletion op-devstack/sysgo/l2_cl_kona.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (k *KonaNode) Stop() {
k.p.Logger().Warn("kona-node already stopped")
return
}
err := k.sub.Stop()
err := k.sub.Stop(true)
k.p.Require().NoError(err, "Must stop")
k.sub = nil
}
Expand Down
2 changes: 1 addition & 1 deletion op-devstack/sysgo/l2_el_opreth.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (n *OpReth) Start() {
func (n *OpReth) Stop() {
n.mu.Lock()
defer n.mu.Unlock()
err := n.sub.Stop()
err := n.sub.Stop(true)
n.p.Require().NoError(err, "Must stop")
n.sub = nil
}
Expand Down
113 changes: 15 additions & 98 deletions op-devstack/sysgo/subproc.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,23 @@
package sysgo

import (
"context"
"fmt"
"os"
"os/exec"
"sync"
"time"

"github.com/ethereum-optimism/optimism/op-devstack/devtest"
"github.com/ethereum-optimism/optimism/op-service/logpipe"
)

// SubProcess is a process that can be started, and stopped, and restarted.
//
// If at any point the process fails to start or exit successfully,
// the failure is reported to the devtest.P.
//
// If the sub-process exits by itself, the exit is detected,
// and if not successful (non-zero exit code on unix) it also reports failure to the devtest.P.
//
// Sub-process logs are assumed to be structured JSON logs, and are piped to the logger.
// SubProcess is a process that can be started and stopped.
type SubProcess struct {
p devtest.P
cmd *exec.Cmd

stdOutLogs logpipe.LogProcessor
stdErrLogs logpipe.LogProcessor

waitCtx context.Context // closed when process-Wait completes

mu sync.Mutex
}

Expand All @@ -49,115 +37,44 @@ func (sp *SubProcess) Start(cmdPath string, args []string, env []string) error {
}
cmd := exec.Command(cmdPath, args...)
cmd.Env = append(os.Environ(), env...)
stdout, err := cmd.StdoutPipe()
sp.p.Require().NoError(err, "stdout err")
stderr, err := cmd.StderrPipe()
sp.p.Require().NoError(err, "stderr err")
go func() {
err := logpipe.PipeLogs(stdout, sp.stdOutLogs)
sp.p.Require().NoError(err, "stdout logging error")
}()
go func() {
err := logpipe.PipeLogs(stderr, sp.stdErrLogs)
sp.p.Require().NoError(err, "stderr logging error")
}()
cmd.Stdout = sp.stdOutLogs
cmd.Stderr = sp.stdErrLogs
if err := cmd.Start(); err != nil {
return err
}
sp.cmd = cmd

subCtx, subCancel := context.WithCancelCause(context.Background())
go func() {
state, err := cmd.Process.Wait()
subCancel(err)
sp.p.Require().NoError(err, "Sub-process failed to be closed")
sp.p.Logger().Info("Sub-process stopped", "exitCode", state.ExitCode(), "pid", state.Pid())
// if it exited on its own, then we care about the error. If not, we (or the user) signaled it.
if state.Exited() {
sp.p.Require().True(state.Success(), "Sub-process closed with error status: %s", state.String())
}
}()
sp.waitCtx = subCtx

sp.p.Cleanup(func() {
err := sp.Stop()
err := sp.Stop(true)
if err != nil {
sp.p.Logger().Error("Shutdown error", "err", err)
}
})
return nil
}

// Kill stops the process, and does not wait for it to complete.
func (sp *SubProcess) Kill() error {
ctx, cancel := context.WithCancel(context.Background())
cancel() // don't wait, just force it to stop immediately
return sp.GracefulStop(ctx)
}

// Stop implements the default control-panel interface,
// and gracefully stops with a 10-second timeout.
func (sp *SubProcess) Stop() error {
// by default, for control-panel, use an interrupt and a 10-second grace
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
return sp.GracefulStop(ctx)
}

// GracefulStop sends an interrupt and waits for the process to stop.
// If the given ctx is closed, a forced shutdown (process kill) is pursued.
func (sp *SubProcess) GracefulStop(ctx context.Context) error {
// Stop waits for the process to stop, interrupting the process if it has not completed and
// interrupt is true.
func (sp *SubProcess) Stop(interrupt bool) error {
sp.mu.Lock()
defer sp.mu.Unlock()
if sp.cmd == nil {
return nil // already stopped gracefully
}

if ctx.Err() == nil && sp.waitCtx.Err() == nil {
// if not force-closing, and not already done, then try an interrupt first.
// If not already done, then try an interrupt first as requested.
if sp.cmd.ProcessState == nil && interrupt {
sp.p.Logger().Info("Sending interrupt")
if err := sp.cmd.Process.Signal(os.Interrupt); err != nil {
return err
}
}
select {
case <-ctx.Done():
sp.p.Logger().Warn("Sub-process did not respond to interrupt, force-closing now")
err := sp.cmd.Process.Kill()
if err != nil {
return fmt.Errorf("failed to force-close sub-process: %w", err)
}
sp.p.Logger().Info("Successfully force-closed sub-process")
// resources of cmd.Process will be cleaned up by the Process.Wait
case <-sp.waitCtx.Done():
if err := context.Cause(sp.waitCtx); err != nil && err != context.Canceled {
sp.p.Logger().Warn("Sub-process exited with error", "err", err)
} else {
sp.p.Logger().Info("Sub-process gracefully exited")
}

if _, err := sp.cmd.Process.Wait(); err != nil {
sp.p.Logger().Warn("Sub-process exited with error", "err", err)
} else {
sp.p.Logger().Info("Sub-process gracefully exited")
}

sp.cmd = nil
sp.waitCtx = nil
return nil
}

// Wait waits for the process to complete.
func (sp *SubProcess) Wait(ctx context.Context) error {
sp.mu.Lock()
defer sp.mu.Unlock()
if sp.waitCtx == nil {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-sp.waitCtx.Done():
if err := context.Cause(sp.waitCtx); err != nil && err != context.Canceled {
sp.p.Logger().Warn("Sub-process exited with error", "err", err)
return err
} else {
sp.p.Logger().Info("Sub-process gracefully exited")
return nil
}
}
}
9 changes: 4 additions & 5 deletions op-devstack/sysgo/subproc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ func TestSubProcess(gt *testing.T) {
func testEcho(gt *testing.T, capt *testlog.CapturingHandler, sp *SubProcess) {
require.NoError(gt, sp.Start("/bin/echo", []string{"hello world"}, []string{}))
gt.Log("Started sub-process")
require.NoError(gt, sp.Wait(context.Background()), "echo must complete")
require.NoError(gt, sp.Stop())
require.NoError(gt, sp.Stop(false))
gt.Log("Stopped sub-process")

require.NotNil(gt, capt.FindLog(
Expand All @@ -66,12 +65,12 @@ func testSleep(gt *testing.T, capt *testlog.CapturingHandler, sp *SubProcess) {
require.NoError(gt, sp.Start("/bin/sleep", []string{"10000000000"}, []string{}))
gt.Log("Started sub-process")
// Shut down the process before the sleep completes
require.NoError(gt, sp.Kill())
require.NoError(gt, sp.Stop(true))
gt.Log("Killed sub-process")

require.NotNil(gt, capt.FindLog(
testlog.NewMessageFilter("Sub-process did not respond to interrupt, force-closing now")))
testlog.NewMessageFilter("Sending interrupt")))

require.NotNil(gt, capt.FindLog(
testlog.NewMessageFilter("Successfully force-closed sub-process")))
testlog.NewMessageFilter("Sub-process gracefully exited")))
}
2 changes: 1 addition & 1 deletion op-devstack/sysgo/supervisor_kona.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (s *KonaSupervisor) Stop() {
s.p.Logger().Warn("kona-supervisor already stopped")
return
}
err := s.sub.Stop()
err := s.sub.Stop(true)
s.p.Require().NoError(err, "Must stop")
s.sub = nil
}
Expand Down
47 changes: 14 additions & 33 deletions op-service/logpipe/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"bufio"
"bytes"
"encoding/json"
"errors"
"io"
"log/slog"

oplog "github.com/ethereum-optimism/optimism/op-service/log"
Expand Down Expand Up @@ -82,6 +80,20 @@ type LogEntry interface {

type LogProcessor func(line []byte)

func (lo LogProcessor) Write(data []byte) (int, error) {
startingLength := len(data)
buf := bytes.NewBuffer(data)
scanner := bufio.NewScanner(buf)
for scanner.Scan() {
lineBytes := scanner.Bytes()
if len(lineBytes) == 0 {
continue // Skip empty lines
}
lo(lineBytes)
}
return startingLength - buf.Len(), scanner.Err()
}

type LogParser func(line []byte) LogEntry

func ToLogger(logger log.Logger) func(e LogEntry) {
Expand All @@ -99,34 +111,3 @@ func ToLogger(logger log.Logger) func(e LogEntry) {
logger.Log(lvl, msg, attrs...)
}
}

// PipeLogs reads logs from the provided io.ReadCloser (e.g., subprocess stdout),
// and outputs them to the provider logger.
//
// This:
// 1. assumes each line is a JSON object
// 2. parses it
// 3. extracts the "level" and optional "msg"
// 4. treats remaining fields as structured attributes
// 5. logs the entries using the provided log.Logger
//
// Non-JSON lines are logged as warnings.
// Crit level is mapped to error-level, to prevent untrusted crit logs from stopping the process.
// This function processes until the stream ends, and closes the reader.
// This returns the first read error (If we run into EOF, nil returned is returned instead).
func PipeLogs(r io.ReadCloser, onLog LogProcessor) (outErr error) {
defer func() {
outErr = errors.Join(outErr, r.Close())
}()

scanner := bufio.NewScanner(r)
for scanner.Scan() {
lineBytes := scanner.Bytes()
if len(lineBytes) == 0 {
continue // Skip empty lines
}
onLog(lineBytes)
}

return scanner.Err()
}
40 changes: 11 additions & 29 deletions op-service/logpipe/pipe_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package logpipe

import (
"bytes"
"io"
"sync"
"strings"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -13,35 +12,18 @@ import (
"github.com/ethereum-optimism/optimism/op-service/testlog"
)

func TestPipeLogs(t *testing.T) {
func TestWriteToLogProcessor(t *testing.T) {
logger, capt := testlog.CaptureLogger(t, log.LevelTrace)

wg := new(sync.WaitGroup)
wg.Add(2)

r, w := io.Pipe()
// Write the log output to the pipe
go func() {
defer wg.Done()
_, err := io.Copy(w, bytes.NewReader([]byte(`{"level": "DEBUG", "fields": {"message": "hello", "foo": 1}}`+"\n")))
require.NoError(t, err)
_, err = io.Copy(w, bytes.NewReader([]byte(`test invalid JSON`+"\n")))
require.NoError(t, err)
_, err = io.Copy(w, bytes.NewReader([]byte(`{"fields": {"message": "world", "bar": "sunny"}, "level": "INFO"}`+"\n")))
require.NoError(t, err)
require.NoError(t, w.Close())
}()
// Read the log output from the pipe
go func() {
defer wg.Done()
toLogger := ToLogger(logger)
logProc := func(line []byte) {
toLogger(ParseRustStructuredLogs(line))
}
err := PipeLogs(r, logProc)
require.NoError(t, err)
}()
wg.Wait()
logProc := LogProcessor(func(line []byte) {
ToLogger(logger)(ParseRustStructuredLogs(line))
})
_, err := io.Copy(logProc, strings.NewReader(`{"level": "DEBUG", "fields": {"message": "hello", "foo": 1}}`+"\n"))
require.NoError(t, err)
_, err = io.Copy(logProc, strings.NewReader(`test invalid JSON`+"\n"))
require.NoError(t, err)
_, err = io.Copy(logProc, strings.NewReader(`{"fields": {"message": "world", "bar": "sunny"}, "level": "INFO"}`+"\n"))
require.NoError(t, err)

entry1 := capt.FindLog(
testlog.NewLevelFilter(log.LevelDebug),
Expand Down