diff --git a/.env.example b/.env.example index 3696a90a..0f4db19a 100644 --- a/.env.example +++ b/.env.example @@ -8,6 +8,7 @@ # Log path LOG_PATH=/tmp/astra +LOG_STDOUT=true # Graph designer server port GRAPH_DESIGNER_SERVER_PORT=49483 # The corresponding graph name based on the language diff --git a/server/internal/http_server.go b/server/internal/http_server.go index e41dfd29..34e6c080 100644 --- a/server/internal/http_server.go +++ b/server/internal/http_server.go @@ -33,6 +33,7 @@ type HttpServerConfig struct { AppId string AppCertificate string LogPath string + Log2Stdout bool PropertyJsonFile string Port string TTSVendorChinese string @@ -161,7 +162,7 @@ func (s *HttpServer) handlerStart(c *gin.Context) { return } - worker := newWorker(req.ChannelName, logFile, propertyJsonFile) + worker := newWorker(req.ChannelName, logFile, s.config.Log2Stdout, propertyJsonFile) worker.HttpServerPort = req.WorkerHttpServerPort worker.QuitTimeoutSeconds = s.config.WorkerQuitTimeoutSeconds if err := worker.start(&req); err != nil { diff --git a/server/internal/worker.go b/server/internal/worker.go index bab2ffe4..d90452e1 100644 --- a/server/internal/worker.go +++ b/server/internal/worker.go @@ -1,11 +1,13 @@ package internal import ( + "bufio" "fmt" + "io" "log/slog" "net/http" + "os" "os/exec" - "strconv" "strings" "sync/atomic" "syscall" @@ -20,6 +22,7 @@ type Worker struct { ChannelName string HttpServerPort int32 LogFile string + Log2Stdout bool PropertyJsonFile string Pid int QuitTimeoutSeconds int @@ -54,10 +57,11 @@ var ( httpServerPortMax = int32(30000) ) -func newWorker(channelName string, logFile string, propertyJsonFile string) *Worker { +func newWorker(channelName string, logFile string, log2Stdout bool, propertyJsonFile string) *Worker { return &Worker{ ChannelName: channelName, LogFile: logFile, + Log2Stdout: log2Stdout, PropertyJsonFile: propertyJsonFile, QuitTimeoutSeconds: 60, CreateTs: time.Now().Unix(), @@ -74,36 +78,107 @@ func getHttpServerPort() int32 { return httpServerPort } +// PrefixWriter is a custom writer that prefixes each line with a PID. +type PrefixWriter struct { + prefix string + writer io.Writer +} + +// Write implements the io.Writer interface. +func (pw *PrefixWriter) Write(p []byte) (n int, err error) { + // Create a scanner to split input into lines + scanner := bufio.NewScanner(strings.NewReader(string(p))) + var totalWritten int + + for scanner.Scan() { + // Prefix each line with the provided prefix + line := fmt.Sprintf("[%s] %s", pw.prefix, scanner.Text()) + // Write the prefixed line to the underlying writer + n, err := pw.writer.Write([]byte(line + "\n")) + totalWritten += n + + if err != nil { + return totalWritten, err + } + } + + // Check if the scanner encountered any error + if err := scanner.Err(); err != nil { + return totalWritten, err + } + + return len(p), nil +} + func (w *Worker) start(req *StartReq) (err error) { - shell := fmt.Sprintf("cd /app/agents && nohup %s --property %s > %s 2>&1 &", workerExec, w.PropertyJsonFile, w.LogFile) + shell := fmt.Sprintf("cd /app/agents && %s --property %s", workerExec, w.PropertyJsonFile) slog.Info("Worker start", "requestId", req.RequestId, "shell", shell, logTag) - if _, err = exec.Command("sh", "-c", shell).CombinedOutput(); err != nil { - slog.Error("Worker start failed", "err", err, "requestId", req.RequestId, logTag) - return + cmd := exec.Command("sh", "-c", shell) + + var stdoutWriter, stderrWriter io.Writer + var logFile *os.File + + if w.Log2Stdout { + // Write logs to stdout and stderr + stdoutWriter = os.Stdout + stderrWriter = os.Stderr + } else { + // Open the log file for writing + logFile, err := os.OpenFile(w.LogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + slog.Error("Failed to open log file", "err", err, "requestId", req.RequestId, logTag) + // return err + } + + // Write logs to the log file + stdoutWriter = logFile + stderrWriter = logFile } - shell = fmt.Sprintf("ps aux | grep %s | grep -v grep | awk '{print $2}'", w.PropertyJsonFile) - slog.Info("Worker get pid", "requestId", req.RequestId, "shell", shell, logTag) + // Create PrefixWriter instances with appropriate writers + stdoutPrefixWriter := &PrefixWriter{ + prefix: "-", // Initial prefix, will update after process starts + writer: stdoutWriter, + } + stderrPrefixWriter := &PrefixWriter{ + prefix: "-", // Initial prefix, will update after process starts + writer: stderrWriter, + } - var pid int - for i := 0; i < 3; i++ { // retry for 3 times - output, err := exec.Command("sh", "-c", shell).CombinedOutput() - if err == nil { - pid, err = strconv.Atoi(strings.TrimSpace(string(output))) - if err == nil && pid > 0 { - break // if pid is successfully obtained, exit loop - } - } - slog.Warn("Worker get pid failed, retrying...", "attempt", i+1, "requestId", req.RequestId, logTag) - time.Sleep(500 * time.Millisecond) // wait for 500ms + cmd.Stdout = stdoutPrefixWriter + cmd.Stderr = stderrPrefixWriter + + if err = cmd.Start(); err != nil { + slog.Error("Worker start failed", "err", err, "requestId", req.RequestId, logTag) + return } + pid := cmd.Process.Pid + if pid <= 0 { slog.Error("Worker failed to obtain valid PID after 3 attempts", "requestId", req.RequestId, logTag) return fmt.Errorf("failed to obtain valid PID") } + // Update the prefix with the actual PID + stdoutPrefixWriter.prefix = w.ChannelName + stderrPrefixWriter.prefix = w.ChannelName w.Pid = pid + + // Monitor the background process in a separate goroutine + go func() { + err := cmd.Wait() // Wait for the command to exit + if err != nil { + slog.Error("Worker process failed", "err", err, "requestId", req.RequestId, logTag) + } else { + slog.Info("Worker process completed successfully", "requestId", req.RequestId, logTag) + } + // Close the log file when the command finishes + if logFile != nil { + logFile.Close() + } + }() + return } diff --git a/server/main.go b/server/main.go index dcd3880d..188ce196 100644 --- a/server/main.go +++ b/server/main.go @@ -29,6 +29,12 @@ func main() { } } + log2Stdout, err := strconv.ParseBool(os.Getenv("LOG_STDOUT")) + if err != nil { + slog.Error("environment LOG_STDOUT invalid") + log2Stdout = false + } + // Check environment agoraAppId := os.Getenv("AGORA_APP_ID") if len(agoraAppId) != 32 { @@ -65,6 +71,7 @@ func main() { Port: os.Getenv("SERVER_PORT"), WorkersMax: workersMax, WorkerQuitTimeoutSeconds: workerQuitTimeoutSeconds, + Log2Stdout: log2Stdout, } httpServer := internal.NewHttpServer(httpServerConfig) httpServer.Start()