Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: flyteadmin doesn't shutdown servers gracefully #6289

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix: flyteadmin doesn't shutdown servers gracefully
Signed-off-by: Ryan Lo <wenchih@apache.org>
lowc1012 committed Feb 28, 2025
commit 490e2899739d51408c0f5526c68ad22f70900593
47 changes: 34 additions & 13 deletions flyteadmin/pkg/server/service.go
Original file line number Diff line number Diff line change
@@ -6,7 +6,10 @@ import (
"fmt"
"net"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"

"github.com/gorilla/handlers"
@@ -81,8 +84,8 @@ func SetMetricKeys(appConfig *runtimeIfaces.ApplicationConfig) {

// Creates a new gRPC Server with all the configuration
func newGRPCServer(ctx context.Context, pluginRegistry *plugins.Registry, cfg *config.ServerConfig,
storageCfg *storage.Config, authCtx interfaces.AuthenticationContext,
scope promutils.Scope, sm core.SecretManager, opts ...grpc.ServerOption) (*grpc.Server, error) {
storageCfg *storage.Config, authCtx interfaces.AuthenticationContext,
scope promutils.Scope, sm core.SecretManager, opts ...grpc.ServerOption) (*grpc.Server, error) {

logger.Infof(ctx, "Registering default middleware with blanket auth validation")
pluginRegistry.RegisterDefault(plugins.PluginIDUnaryServiceMiddleware, grpcmiddleware.ChainUnaryServer(
@@ -200,8 +203,8 @@ func healthCheckFunc(w http.ResponseWriter, _ *http.Request) {
}

func newHTTPServer(ctx context.Context, pluginRegistry *plugins.Registry, cfg *config.ServerConfig, _ *authConfig.Config, authCtx interfaces.AuthenticationContext,
additionalHandlers map[string]func(http.ResponseWriter, *http.Request),
grpcAddress string, grpcConnectionOpts ...grpc.DialOption) (*http.ServeMux, error) {
additionalHandlers map[string]func(http.ResponseWriter, *http.Request),
grpcAddress string, grpcConnectionOpts ...grpc.DialOption) (*http.ServeMux, error) {

// Register the server that will serve HTTP/REST Traffic
mux := http.NewServeMux()
@@ -329,9 +332,9 @@ func generateRequestID() string {
}

func serveGatewayInsecure(ctx context.Context, pluginRegistry *plugins.Registry, cfg *config.ServerConfig,
authCfg *authConfig.Config, storageConfig *storage.Config,
additionalHandlers map[string]func(http.ResponseWriter, *http.Request),
scope promutils.Scope) error {
authCfg *authConfig.Config, storageConfig *storage.Config,
additionalHandlers map[string]func(http.ResponseWriter, *http.Request),
scope promutils.Scope) error {
logger.Infof(ctx, "Serving Flyte Admin Insecure")

// This will parse configuration and create the necessary objects for dealing with auth
@@ -422,11 +425,29 @@ func serveGatewayInsecure(ctx context.Context, pluginRegistry *plugins.Registry,
ReadHeaderTimeout: time.Duration(cfg.ReadHeaderTimeoutSeconds) * time.Second,
}

err = server.ListenAndServe()
if err != nil {
return errors.Wrapf(err, "failed to Start HTTP Server")
go func() {
err = server.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
logger.Fatalf(ctx, "Failed to Start HTTP Server: %v", err)
}
}()

// Gracefully shutdown the servers
sigCh := make(chan os.Signal, 1)
Copy link
Contributor

@Sovietaced Sovietaced Mar 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to have two signal listeners? Should just be able to use one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean just SIGINT or SIGTERM?

Copy link
Contributor

@Sovietaced Sovietaced Mar 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean that in this diff you have two signal channels (one for grpc and one for http). You only need one to know whether or not the app is shutting down.

signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh

// Create a context with timeout for the shutdown process
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

if err := server.Shutdown(shutdownCtx); err != nil {
logger.Errorf(ctx, "Failed to shut down HTTP server: %v", err)
}

grpcServer.GracefulStop()

logger.Infof(ctx, "Servers gracefully stopped")
return nil
}

@@ -445,9 +466,9 @@ func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Ha
}

func serveGatewaySecure(ctx context.Context, pluginRegistry *plugins.Registry, cfg *config.ServerConfig, authCfg *authConfig.Config,
storageCfg *storage.Config,
additionalHandlers map[string]func(http.ResponseWriter, *http.Request),
scope promutils.Scope) error {
storageCfg *storage.Config,
additionalHandlers map[string]func(http.ResponseWriter, *http.Request),
scope promutils.Scope) error {
certPool, cert, err := GetSslCredentials(ctx, cfg.Security.Ssl.CertificateFile, cfg.Security.Ssl.KeyFile)
sm := secretmanager.NewFileEnvSecretManager(secretmanager.GetConfig())