Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ clean:
rm -rf $(BUILD_DIR)

test:
go test -short -v ./proxy
go test -short -v -count=1 ./proxy

test-all:
go test -v ./proxy
go test -v -count=1 ./proxy

# Build OSX binary
mac:
Expand Down
37 changes: 34 additions & 3 deletions misc/simple-responder/simple-responder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ func main() {

silent := flag.Bool("silent", false, "disable all logging")

ignoreSigTerm := flag.Bool("ignore-sig-term", false, "ignore SIGTERM signal")

flag.Parse() // Parse the command-line flags

// Create a new Gin router
Expand Down Expand Up @@ -190,6 +192,10 @@ func main() {
log.SetOutput(io.Discard)
}

if !*silent {
fmt.Printf("My PID: %d\n", os.Getpid())
}

go func() {
log.Printf("simple-responder listening on %s\n", address)
// service connections
Expand All @@ -200,11 +206,36 @@ func main() {

// Wait for interrupt signal to gracefully shutdown the server with
// a timeout of 5 seconds.
quit := make(chan os.Signal, 1)
sigChan := make(chan os.Signal, 1)
// kill (no param) default send syscall.SIGTERM
// kill -2 is syscall.SIGINT
// kill -9 is syscall.SIGKILL but can't be catch, so don't need add it
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

countSigInt := 0

runloop:
for {
signal := <-sigChan
switch signal {
case syscall.SIGINT:
countSigInt++
if countSigInt > 1 {
break runloop
} else {
log.Println("Recieved SIGINT, send another SIGINT to shutdown")
}
case syscall.SIGTERM:
if *ignoreSigTerm {
log.Println("Ignoring SIGTERM")
} else {
log.Println("Recieved SIGTERM, shutting down")
break runloop
}
default:
break runloop
}
}
Comment on lines +215 to +238
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Graceful HTTP shutdown missing

Breaking the loop logs “shutting down” but never closes the http.Server, so existing connections may hang until the process is killed. A small addition ensures a tidy exit:

@@
 	default:
 		break runloop
 	}
 }
 
 log.Println("simple-responder shutting down")
+
+// attempt graceful shutdown of the HTTP server
+ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+defer cancel()
+if err := srv.Shutdown(ctx); err != nil {
+	log.Printf("server shutdown error: %v\n", err)
+}

Remember to add import "context" at the top.


log.Println("simple-responder shutting down")
}
8 changes: 6 additions & 2 deletions proxy/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,18 @@ func getSimpleResponderPath() string {
return filepath.Join("..", "build", fmt.Sprintf("simple-responder_%s_%s", goos, goarch))
}

func getTestSimpleResponderConfig(expectedMessage string) ModelConfig {
func getTestPort() int {
portMutex.Lock()
defer portMutex.Unlock()

port := nextTestPort
nextTestPort++

return getTestSimpleResponderConfigPort(expectedMessage, port)
return port
}

func getTestSimpleResponderConfig(expectedMessage string) ModelConfig {
return getTestSimpleResponderConfigPort(expectedMessage, getTestPort())
}

func getTestSimpleResponderConfigPort(expectedMessage string, port int) ModelConfig {
Expand Down
10 changes: 8 additions & 2 deletions proxy/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ type Process struct {

// for managing concurrency limits
concurrencyLimitSemaphore chan struct{}

// stop timeout waiting for graceful shutdown
gracefulStopTimeout time.Duration
}

func NewProcess(ID string, healthCheckTimeout int, config ModelConfig, processLogger *LogMonitor, proxyLogger *LogMonitor) *Process {
Expand All @@ -92,6 +95,9 @@ func NewProcess(ID string, healthCheckTimeout int, config ModelConfig, processLo

// concurrency limit
concurrencyLimitSemaphore: make(chan struct{}, concurrentLimit),

// stop timeout
gracefulStopTimeout: 5 * time.Second,
}
}

Expand Down Expand Up @@ -348,7 +354,7 @@ func (p *Process) StopImmediately() {
}

// stop the process with a graceful exit timeout
p.stopCommand(5 * time.Second)
p.stopCommand(p.gracefulStopTimeout)

if curState, err := p.swapState(StateStopping, StateStopped); err != nil {
p.proxyLogger.Infof("<%s> Stop() StateStopping -> StateStopped err: %v, current state: %v", p.ID, err, curState)
Expand All @@ -361,7 +367,7 @@ func (p *Process) StopImmediately() {
// the StateShutdown state, it can not be started again.
func (p *Process) Shutdown() {
p.shutdownCancel()
p.stopCommand(5 * time.Second)
p.stopCommand(p.gracefulStopTimeout)
p.state = StateShutdown
}

Expand Down
50 changes: 50 additions & 0 deletions proxy/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,3 +393,53 @@ func TestProcess_StopImmediately(t *testing.T) {
process.StopImmediately()
assert.Equal(t, process.CurrentState(), StateStopped)
}

// Test that SIGKILL is sent when gracefulStopTimeout is reached and properly terminates
// the upstream command
func TestProcess_ForceStopWithKill(t *testing.T) {

expectedMessage := "test_sigkill"
binaryPath := getSimpleResponderPath()
port := getTestPort()

config := ModelConfig{
// note --ignore-sig-term which ignores the SIGTERM signal so a SIGKILL must be sent
// to force the process to exit
Cmd: fmt.Sprintf("%s --port %d --respond %s --silent --ignore-sig-term", binaryPath, port, expectedMessage),
Proxy: fmt.Sprintf("http://127.0.0.1:%d", port),
CheckEndpoint: "/health",
}

process := NewProcess("stop_immediate", 2, config, debugLogger, debugLogger)
defer process.Stop()

// reduce to make testing go faster
process.gracefulStopTimeout = time.Second

err := process.start()
assert.Nil(t, err)
assert.Equal(t, process.CurrentState(), StateReady)

waitChan := make(chan struct{})
go func() {
// slow, but will get killed by StopImmediate
req := httptest.NewRequest("GET", "/slow-respond?echo=12345&delay=2s", nil)
w := httptest.NewRecorder()
process.ProxyRequest(w, req)

// StatusOK because that was already sent before the kill
assert.Equal(t, http.StatusOK, w.Code)

// unexpected EOF because the kill happened, the "1" is sent before the kill
// then the unexpected EOF is sent after the kill
assert.Equal(t, "1unexpected EOF\n", w.Body.String())
close(waitChan)
}()

<-time.After(time.Millisecond)
process.StopImmediately()
assert.Equal(t, process.CurrentState(), StateStopped)

// the request should have been interrupted by SIGKILL
<-waitChan
}
2 changes: 1 addition & 1 deletion proxy/proxymanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ func TestProxyManager_RunningEndpoint(t *testing.T) {
"model1": getTestSimpleResponderConfig("model1"),
"model2": getTestSimpleResponderConfig("model2"),
},
LogLevel: "debug",
LogLevel: "warn",
})

// Define a helper struct to parse the JSON response.
Expand Down