diff --git a/agents/scripts/install_deps_and_build.sh b/agents/scripts/install_deps_and_build.sh index a4b510ea..9023e35d 100755 --- a/agents/scripts/install_deps_and_build.sh +++ b/agents/scripts/install_deps_and_build.sh @@ -57,6 +57,10 @@ install_python_requirements() { fi done fi + + # pre-import llama-index as it cloud download additional resources during the first import + echo "pre-import python modules..." + python3.10 -c "import llama_index.core;" } build_go_app() { diff --git a/server/internal/http_server.go b/server/internal/http_server.go index b30fdb4c..827a362b 100644 --- a/server/internal/http_server.go +++ b/server/internal/http_server.go @@ -461,6 +461,6 @@ func (s *HttpServer) Start() { slog.Info("server start", "port", s.config.Port, logTag) - go cleanWorker() + go timeoutWorkers() r.Run(fmt.Sprintf(":%s", s.config.Port)) } diff --git a/server/internal/worker.go b/server/internal/worker.go index 51789d33..2a1a2a2c 100644 --- a/server/internal/worker.go +++ b/server/internal/worker.go @@ -228,7 +228,7 @@ func (w *Worker) update(req *WorkerUpdateReq) (err error) { return } -func cleanWorker() { +func timeoutWorkers() { for { for _, channelName := range workers.Keys() { worker := workers.Get(channelName).(*Worker) @@ -248,3 +248,15 @@ func cleanWorker() { time.Sleep(workerCleanSleepSeconds * time.Second) } } + +func CleanWorkers() { + for _, channelName := range workers.Keys() { + worker := workers.Get(channelName).(*Worker) + if err := worker.stop(uuid.New().String(), channelName.(string)); err != nil { + slog.Error("Worker cleanWorker failed", "err", err, "channelName", channelName, logTag) + continue + } + + slog.Info("Worker cleanWorker success", "channelName", channelName, "worker", worker, logTag) + } +} diff --git a/server/main.go b/server/main.go index 80dcd95d..c61bf59f 100644 --- a/server/main.go +++ b/server/main.go @@ -1,9 +1,12 @@ package main import ( + "fmt" "log/slog" "os" + "os/signal" "strconv" + "syscall" "github.com/joho/godotenv" @@ -51,6 +54,17 @@ func main() { os.Exit(1) } + // Set up signal handler to clean up all workers on Ctrl+C + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + + go func() { + <-sigs + fmt.Println("Received interrupt signal, cleaning up workers...") + internal.CleanWorkers() + os.Exit(0) + }() + // Start server httpServerConfig := &internal.HttpServerConfig{ AppId: agoraAppId,