Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into yuri/grpc-cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
duricanikolic committed Jan 31, 2025
2 parents 959241b + 72eb578 commit ad74ad7
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@
* [ENHANCEMENT] Memberlist: Notifications can now be processed once per interval specified by `-memberlist.notify-interval` to reduce notify storms in large clusters. #592
* [ENHANCEMENT] Memberlist: Implemented the `Delete` operation in the memberlist backed KV store. How frequently deleted entries are cleaned up is specified by the `-memberlist.obsolete-entries-timeout` flag. #612
* [ENHANCEMENT] KV: Add `MockCountingClient`, which wraps the `kv.client` and can be used in order to count calls at specific functions of the interface. #618
* [ENHANCEMENT] Server: Add interceptor support to `GrpcInflightMethodLimiter`. #643
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
* [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109
Expand Down
32 changes: 32 additions & 0 deletions server/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"strings"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/tap"
Expand All @@ -19,6 +20,12 @@ type GrpcInflightMethodLimiter interface {
// otherwise gRPC-server implementation-specific error will be returned to the client (codes.PermissionDenied in [email protected]).
RPCCallStarting(ctx context.Context, methodName string, md metadata.MD) (context.Context, error)

// RPCCallProcessing is called by a server interceptor, allowing request pre-processing or request blocking to be
// performed. The returned function will be applied after the request is handled, providing any error that occurred while
// handling the request.
RPCCallProcessing(ctx context.Context, methodName string) (func(error), error)

// RPCCallFinished is called when an RPC call is finished being handled.
RPCCallFinished(ctx context.Context)
}

Expand Down Expand Up @@ -47,6 +54,31 @@ func (g *grpcInflightLimitCheck) TapHandle(ctx context.Context, info *tap.Info)
return g.methodLimiter.RPCCallStarting(ctx, info.FullMethodName, info.Header)
}

func (g *grpcInflightLimitCheck) UnaryServerInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
finish, err := g.methodLimiter.RPCCallProcessing(ctx, info.FullMethod)
if err != nil {
return nil, err
}
result, err := handler(ctx, req)
if finish != nil {
finish(err)
}
return result, err

}

func (g *grpcInflightLimitCheck) StreamServerInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
finish, err := g.methodLimiter.RPCCallProcessing(ss.Context(), info.FullMethod)
if err != nil {
return err
}
err = handler(srv, ss)
if finish != nil {
finish(err)
}
return err
}

func (g *grpcInflightLimitCheck) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context {
return ctx
}
Expand Down
4 changes: 4 additions & 0 deletions server/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,10 @@ func (m *methodLimiter) RPCCallStarting(ctx context.Context, methodName string,
return context.WithValue(ctx, ctxMethodName, methodName), nil
}

func (m *methodLimiter) RPCCallProcessing(_ context.Context, _ string) (func(error), error) {
return nil, nil
}

func (m *methodLimiter) RPCCallFinished(ctx context.Context) {
m.allInflight.Dec()

Expand Down
15 changes: 12 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,13 @@ func newServer(cfg Config, metrics *Metrics) (*Server, error) {
PermitWithoutStream: cfg.GRPCServerPingWithoutStreamAllowed,
}

var grpcServerLimit *grpcInflightLimitCheck
if cfg.GrpcMethodLimiter != nil {
grpcServerLimit = newGrpcInflightLimitCheck(cfg.GrpcMethodLimiter)
grpcMiddleware = append(grpcMiddleware, grpcServerLimit.UnaryServerInterceptor)
grpcStreamMiddleware = append(grpcStreamMiddleware, grpcServerLimit.StreamServerInterceptor)
}

grpcOptions := []grpc.ServerOption{
grpc.ChainUnaryInterceptor(grpcMiddleware...),
grpc.ChainStreamInterceptor(grpcStreamMiddleware...),
Expand All @@ -436,9 +443,11 @@ func newServer(cfg Config, metrics *Metrics) (*Server, error) {
grpc.NumStreamWorkers(uint32(cfg.GRPCServerNumWorkers)),
}

if cfg.GrpcMethodLimiter != nil {
grpcServerLimit := newGrpcInflightLimitCheck(cfg.GrpcMethodLimiter)
grpcOptions = append(grpcOptions, grpc.InTapHandle(grpcServerLimit.TapHandle), grpc.StatsHandler(grpcServerLimit))
if grpcServerLimit != nil {
grpcOptions = append(grpcOptions,
grpc.StatsHandler(grpcServerLimit),
grpc.InTapHandle(grpcServerLimit.TapHandle),
)
}

if cfg.GRPCServerStatsTrackingEnabled {
Expand Down

0 comments on commit ad74ad7

Please sign in to comment.