Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issues/231 #232

Merged
merged 3 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

# Log path
LOG_PATH=/tmp/astra
LOG_STDOUT=true
# Graph designer server port
GRAPH_DESIGNER_SERVER_PORT=49483
# The corresponding graph name based on the language
Expand Down
3 changes: 2 additions & 1 deletion server/internal/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type HttpServerConfig struct {
AppId string
AppCertificate string
LogPath string
Log2Stdout bool
PropertyJsonFile string
Port string
TTSVendorChinese string
Expand Down Expand Up @@ -161,7 +162,7 @@ func (s *HttpServer) handlerStart(c *gin.Context) {
return
}

worker := newWorker(req.ChannelName, logFile, propertyJsonFile)
worker := newWorker(req.ChannelName, logFile, s.config.Log2Stdout, propertyJsonFile)
worker.HttpServerPort = req.WorkerHttpServerPort
worker.QuitTimeoutSeconds = s.config.WorkerQuitTimeoutSeconds
if err := worker.start(&req); err != nil {
Expand Down
113 changes: 94 additions & 19 deletions server/internal/worker.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package internal

import (
"bufio"
"fmt"
"io"
"log/slog"
"net/http"
"os"
"os/exec"
"strconv"
"strings"
"sync/atomic"
"syscall"
Expand All @@ -20,6 +22,7 @@ type Worker struct {
ChannelName string
HttpServerPort int32
LogFile string
Log2Stdout bool
PropertyJsonFile string
Pid int
QuitTimeoutSeconds int
Expand Down Expand Up @@ -54,10 +57,11 @@ var (
httpServerPortMax = int32(30000)
)

func newWorker(channelName string, logFile string, propertyJsonFile string) *Worker {
func newWorker(channelName string, logFile string, log2Stdout bool, propertyJsonFile string) *Worker {
return &Worker{
ChannelName: channelName,
LogFile: logFile,
Log2Stdout: log2Stdout,
PropertyJsonFile: propertyJsonFile,
QuitTimeoutSeconds: 60,
CreateTs: time.Now().Unix(),
Expand All @@ -74,36 +78,107 @@ func getHttpServerPort() int32 {
return httpServerPort
}

// PrefixWriter is a custom writer that prefixes each line with a PID.
type PrefixWriter struct {
prefix string
writer io.Writer
}

// Write implements the io.Writer interface.
func (pw *PrefixWriter) Write(p []byte) (n int, err error) {
// Create a scanner to split input into lines
scanner := bufio.NewScanner(strings.NewReader(string(p)))
var totalWritten int

for scanner.Scan() {
// Prefix each line with the provided prefix
line := fmt.Sprintf("[%s] %s", pw.prefix, scanner.Text())
// Write the prefixed line to the underlying writer
n, err := pw.writer.Write([]byte(line + "\n"))
totalWritten += n

if err != nil {
return totalWritten, err
}
}

// Check if the scanner encountered any error
if err := scanner.Err(); err != nil {
return totalWritten, err
}

return len(p), nil
}

func (w *Worker) start(req *StartReq) (err error) {
shell := fmt.Sprintf("cd /app/agents && nohup %s --property %s > %s 2>&1 &", workerExec, w.PropertyJsonFile, w.LogFile)
shell := fmt.Sprintf("cd /app/agents && %s --property %s", workerExec, w.PropertyJsonFile)
slog.Info("Worker start", "requestId", req.RequestId, "shell", shell, logTag)
if _, err = exec.Command("sh", "-c", shell).CombinedOutput(); err != nil {
slog.Error("Worker start failed", "err", err, "requestId", req.RequestId, logTag)
return
cmd := exec.Command("sh", "-c", shell)

var stdoutWriter, stderrWriter io.Writer
var logFile *os.File

if w.Log2Stdout {
// Write logs to stdout and stderr
stdoutWriter = os.Stdout
stderrWriter = os.Stderr
} else {
// Open the log file for writing
logFile, err := os.OpenFile(w.LogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
slog.Error("Failed to open log file", "err", err, "requestId", req.RequestId, logTag)
// return err
}

// Write logs to the log file
stdoutWriter = logFile
stderrWriter = logFile
}

shell = fmt.Sprintf("ps aux | grep %s | grep -v grep | awk '{print $2}'", w.PropertyJsonFile)
slog.Info("Worker get pid", "requestId", req.RequestId, "shell", shell, logTag)
// Create PrefixWriter instances with appropriate writers
stdoutPrefixWriter := &PrefixWriter{
prefix: "-", // Initial prefix, will update after process starts
writer: stdoutWriter,
}
stderrPrefixWriter := &PrefixWriter{
prefix: "-", // Initial prefix, will update after process starts
writer: stderrWriter,
}

var pid int
for i := 0; i < 3; i++ { // retry for 3 times
output, err := exec.Command("sh", "-c", shell).CombinedOutput()
if err == nil {
pid, err = strconv.Atoi(strings.TrimSpace(string(output)))
if err == nil && pid > 0 {
break // if pid is successfully obtained, exit loop
}
}
slog.Warn("Worker get pid failed, retrying...", "attempt", i+1, "requestId", req.RequestId, logTag)
time.Sleep(500 * time.Millisecond) // wait for 500ms
cmd.Stdout = stdoutPrefixWriter
cmd.Stderr = stderrPrefixWriter

if err = cmd.Start(); err != nil {
slog.Error("Worker start failed", "err", err, "requestId", req.RequestId, logTag)
return
}

pid := cmd.Process.Pid

if pid <= 0 {
slog.Error("Worker failed to obtain valid PID after 3 attempts", "requestId", req.RequestId, logTag)
return fmt.Errorf("failed to obtain valid PID")
}

// Update the prefix with the actual PID
stdoutPrefixWriter.prefix = w.ChannelName
stderrPrefixWriter.prefix = w.ChannelName
w.Pid = pid

// Monitor the background process in a separate goroutine
go func() {
err := cmd.Wait() // Wait for the command to exit
if err != nil {
slog.Error("Worker process failed", "err", err, "requestId", req.RequestId, logTag)
} else {
slog.Info("Worker process completed successfully", "requestId", req.RequestId, logTag)
}
// Close the log file when the command finishes
if logFile != nil {
logFile.Close()
}
}()

return
}

Expand Down
7 changes: 7 additions & 0 deletions server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ func main() {
}
}

log2Stdout, err := strconv.ParseBool(os.Getenv("LOG_STDOUT"))
if err != nil {
slog.Error("environment LOG_STDOUT invalid")
log2Stdout = false
}

// Check environment
agoraAppId := os.Getenv("AGORA_APP_ID")
if len(agoraAppId) != 32 {
Expand Down Expand Up @@ -65,6 +71,7 @@ func main() {
Port: os.Getenv("SERVER_PORT"),
WorkersMax: workersMax,
WorkerQuitTimeoutSeconds: workerQuitTimeoutSeconds,
Log2Stdout: log2Stdout,
}
httpServer := internal.NewHttpServer(httpServerConfig)
httpServer.Start()
Expand Down