Skip to content

Commit

Permalink
issues/231 (#232)
Browse files Browse the repository at this point in the history
* issues/231
there should be a flag to control if we would like to print the logs to file or STDOUT
because we may have multiple works running, we should add a prefix process id for STDOUT logs

* fix: use channel name instead of pid to distinguish processes

* fix: remove sprintf as channel name is already string
  • Loading branch information
plutoless authored Aug 22, 2024
1 parent 37b03ec commit e9c0d77
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 20 deletions.
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

# Log path
LOG_PATH=/tmp/astra
LOG_STDOUT=true
# Graph designer server port
GRAPH_DESIGNER_SERVER_PORT=49483
# The corresponding graph name based on the language
Expand Down
3 changes: 2 additions & 1 deletion server/internal/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type HttpServerConfig struct {
AppId string
AppCertificate string
LogPath string
Log2Stdout bool
PropertyJsonFile string
Port string
TTSVendorChinese string
Expand Down Expand Up @@ -161,7 +162,7 @@ func (s *HttpServer) handlerStart(c *gin.Context) {
return
}

worker := newWorker(req.ChannelName, logFile, propertyJsonFile)
worker := newWorker(req.ChannelName, logFile, s.config.Log2Stdout, propertyJsonFile)
worker.HttpServerPort = req.WorkerHttpServerPort
worker.QuitTimeoutSeconds = s.config.WorkerQuitTimeoutSeconds
if err := worker.start(&req); err != nil {
Expand Down
113 changes: 94 additions & 19 deletions server/internal/worker.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package internal

import (
"bufio"
"fmt"
"io"
"log/slog"
"net/http"
"os"
"os/exec"
"strconv"
"strings"
"sync/atomic"
"syscall"
Expand All @@ -20,6 +22,7 @@ type Worker struct {
ChannelName string
HttpServerPort int32
LogFile string
Log2Stdout bool
PropertyJsonFile string
Pid int
QuitTimeoutSeconds int
Expand Down Expand Up @@ -54,10 +57,11 @@ var (
httpServerPortMax = int32(30000)
)

func newWorker(channelName string, logFile string, propertyJsonFile string) *Worker {
func newWorker(channelName string, logFile string, log2Stdout bool, propertyJsonFile string) *Worker {
return &Worker{
ChannelName: channelName,
LogFile: logFile,
Log2Stdout: log2Stdout,
PropertyJsonFile: propertyJsonFile,
QuitTimeoutSeconds: 60,
CreateTs: time.Now().Unix(),
Expand All @@ -74,36 +78,107 @@ func getHttpServerPort() int32 {
return httpServerPort
}

// PrefixWriter is a custom writer that prefixes each line with a PID.
type PrefixWriter struct {
prefix string
writer io.Writer
}

// Write implements the io.Writer interface.
func (pw *PrefixWriter) Write(p []byte) (n int, err error) {
// Create a scanner to split input into lines
scanner := bufio.NewScanner(strings.NewReader(string(p)))
var totalWritten int

for scanner.Scan() {
// Prefix each line with the provided prefix
line := fmt.Sprintf("[%s] %s", pw.prefix, scanner.Text())
// Write the prefixed line to the underlying writer
n, err := pw.writer.Write([]byte(line + "\n"))
totalWritten += n

if err != nil {
return totalWritten, err
}
}

// Check if the scanner encountered any error
if err := scanner.Err(); err != nil {
return totalWritten, err
}

return len(p), nil
}

func (w *Worker) start(req *StartReq) (err error) {
shell := fmt.Sprintf("cd /app/agents && nohup %s --property %s > %s 2>&1 &", workerExec, w.PropertyJsonFile, w.LogFile)
shell := fmt.Sprintf("cd /app/agents && %s --property %s", workerExec, w.PropertyJsonFile)
slog.Info("Worker start", "requestId", req.RequestId, "shell", shell, logTag)
if _, err = exec.Command("sh", "-c", shell).CombinedOutput(); err != nil {
slog.Error("Worker start failed", "err", err, "requestId", req.RequestId, logTag)
return
cmd := exec.Command("sh", "-c", shell)

var stdoutWriter, stderrWriter io.Writer
var logFile *os.File

if w.Log2Stdout {
// Write logs to stdout and stderr
stdoutWriter = os.Stdout
stderrWriter = os.Stderr
} else {
// Open the log file for writing
logFile, err := os.OpenFile(w.LogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
slog.Error("Failed to open log file", "err", err, "requestId", req.RequestId, logTag)
// return err
}

// Write logs to the log file
stdoutWriter = logFile
stderrWriter = logFile
}

shell = fmt.Sprintf("ps aux | grep %s | grep -v grep | awk '{print $2}'", w.PropertyJsonFile)
slog.Info("Worker get pid", "requestId", req.RequestId, "shell", shell, logTag)
// Create PrefixWriter instances with appropriate writers
stdoutPrefixWriter := &PrefixWriter{
prefix: "-", // Initial prefix, will update after process starts
writer: stdoutWriter,
}
stderrPrefixWriter := &PrefixWriter{
prefix: "-", // Initial prefix, will update after process starts
writer: stderrWriter,
}

var pid int
for i := 0; i < 3; i++ { // retry for 3 times
output, err := exec.Command("sh", "-c", shell).CombinedOutput()
if err == nil {
pid, err = strconv.Atoi(strings.TrimSpace(string(output)))
if err == nil && pid > 0 {
break // if pid is successfully obtained, exit loop
}
}
slog.Warn("Worker get pid failed, retrying...", "attempt", i+1, "requestId", req.RequestId, logTag)
time.Sleep(500 * time.Millisecond) // wait for 500ms
cmd.Stdout = stdoutPrefixWriter
cmd.Stderr = stderrPrefixWriter

if err = cmd.Start(); err != nil {
slog.Error("Worker start failed", "err", err, "requestId", req.RequestId, logTag)
return
}

pid := cmd.Process.Pid

if pid <= 0 {
slog.Error("Worker failed to obtain valid PID after 3 attempts", "requestId", req.RequestId, logTag)
return fmt.Errorf("failed to obtain valid PID")
}

// Update the prefix with the actual PID
stdoutPrefixWriter.prefix = w.ChannelName
stderrPrefixWriter.prefix = w.ChannelName
w.Pid = pid

// Monitor the background process in a separate goroutine
go func() {
err := cmd.Wait() // Wait for the command to exit
if err != nil {
slog.Error("Worker process failed", "err", err, "requestId", req.RequestId, logTag)
} else {
slog.Info("Worker process completed successfully", "requestId", req.RequestId, logTag)
}
// Close the log file when the command finishes
if logFile != nil {
logFile.Close()
}
}()

return
}

Expand Down
7 changes: 7 additions & 0 deletions server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ func main() {
}
}

log2Stdout, err := strconv.ParseBool(os.Getenv("LOG_STDOUT"))
if err != nil {
slog.Error("environment LOG_STDOUT invalid")
log2Stdout = false
}

// Check environment
agoraAppId := os.Getenv("AGORA_APP_ID")
if len(agoraAppId) != 32 {
Expand Down Expand Up @@ -65,6 +71,7 @@ func main() {
Port: os.Getenv("SERVER_PORT"),
WorkersMax: workersMax,
WorkerQuitTimeoutSeconds: workerQuitTimeoutSeconds,
Log2Stdout: log2Stdout,
}
httpServer := internal.NewHttpServer(httpServerConfig)
httpServer.Start()
Expand Down

0 comments on commit e9c0d77

Please sign in to comment.