Skip to content

Commit

Permalink
Revert "Make gRPC requests go through tendermint Query (cosmos#8549)"
Browse files Browse the repository at this point in the history
This reverts commit e306e5b.
  • Loading branch information
tac0turtle authored and JeancarloBarrios committed Sep 28, 2024
1 parent 2f20f1a commit 836d352
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 102 deletions.
57 changes: 37 additions & 20 deletions baseapp/grpcrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,9 @@ type QueryRouter interface {

// GRPCQueryRouter routes ABCI Query requests to GRPC handlers
type GRPCQueryRouter struct {
// routes maps query handlers used in ABCIQuery.
routes map[string]GRPCQueryHandler
// hybridHandlers maps the request name to the handler. It is a hybrid handler which seamlessly
// handles both gogo and protov2 messages.
hybridHandlers map[string][]func(ctx context.Context, req, resp protoiface.MessageV1) error
// responseByRequestName maps the request name to the response name.
responseByRequestName map[string]string
// binaryCodec is used to encode/decode binary protobuf messages.
binaryCodec codec.BinaryCodec
// cdc is the gRPC codec used by the router to correctly unmarshal messages.
cdc encoding.Codec
// serviceData contains the gRPC services and their handlers.
serviceData []serviceData
routes map[string]GRPCQueryHandler
interfaceRegistry codectypes.InterfaceRegistry
serviceData []serviceData
}

// serviceData represents a gRPC service, along with its handler.
Expand All @@ -56,9 +46,7 @@ var (
// NewGRPCQueryRouter creates a new GRPCQueryRouter
func NewGRPCQueryRouter() *GRPCQueryRouter {
return &GRPCQueryRouter{
routes: map[string]GRPCQueryHandler{},
hybridHandlers: map[string][]func(ctx context.Context, req, resp protoiface.MessageV1) error{},
responseByRequestName: map[string]string{},
routes: map[string]GRPCQueryHandler{},
}
}

Expand Down Expand Up @@ -88,9 +76,35 @@ func (qrt *GRPCQueryRouter) RegisterService(sd *grpc.ServiceDesc, handler interf
if err != nil {
panic(err)
}
err = qrt.registerHybridHandler(sd, method, handler)
if err != nil {
panic(err)

qrt.routes[fqName] = func(ctx sdk.Context, req abci.RequestQuery) (abci.ResponseQuery, error) {
// call the method handler from the service description with the handler object,
// a wrapped sdk.Context with proto-unmarshaled data from the ABCI request data
res, err := methodHandler(handler, sdk.WrapSDKContext(ctx), func(i interface{}) error {
err := protoCodec.Unmarshal(req.Data, i)
if err != nil {
return err
}
if qrt.interfaceRegistry != nil {
return codectypes.UnpackInterfaces(i, qrt.interfaceRegistry)
}
return nil
}, nil)
if err != nil {
return abci.ResponseQuery{}, err
}

// proto marshal the result bytes
resBytes, err := protoCodec.Marshal(res)
if err != nil {
return abci.ResponseQuery{}, err
}

// return the result bytes as the response value
return abci.ResponseQuery{
Height: req.Height,
Value: resBytes,
}, nil
}
}

Expand Down Expand Up @@ -179,5 +193,8 @@ func (qrt *GRPCQueryRouter) SetInterfaceRegistry(interfaceRegistry codectypes.In
qrt.cdc = codec.NewProtoCodec(interfaceRegistry).GRPCCodec()
// Once we have an interface registry, we can register the interface
// registry reflection gRPC service.
reflection.RegisterReflectionServiceServer(qrt, reflection.NewReflectionServiceServer(interfaceRegistry))
reflection.RegisterReflectionServiceServer(
qrt,
reflection.NewReflectionServiceServer(interfaceRegistry),
)
}
35 changes: 7 additions & 28 deletions baseapp/grpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package baseapp

import (
"context"
"fmt"
"strconv"

gogogrpc "github.com/cosmos/gogoproto/grpc"
Expand All @@ -13,9 +12,6 @@ import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

errorsmod "cosmossdk.io/errors"
storetypes "cosmossdk.io/store/types"

sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
grpctypes "github.com/cosmos/cosmos-sdk/types/grpc"
Expand All @@ -34,10 +30,10 @@ func (app *BaseApp) RegisterGRPCServer(server gogogrpc.Server) {

// Get height header from the request context, if present.
var height int64
if heightHeaders := md.Get(grpctypes.GRPCBlockHeightHeader); len(heightHeaders) == 1 {
if heightHeaders := md.Get(grpctypes.GRPCBlockHeightHeader); len(heightHeaders) > 0 {
height, err = strconv.ParseInt(heightHeaders[0], 10, 64)
if err != nil {
return nil, errorsmod.Wrapf(
return nil, sdkerrors.Wrapf(
sdkerrors.ErrInvalidRequest,
"Baseapp.RegisterGRPCServer: invalid height header %q: %v", grpctypes.GRPCBlockHeightHeader, err)
}
Expand All @@ -48,37 +44,20 @@ func (app *BaseApp) RegisterGRPCServer(server gogogrpc.Server) {

// Create the sdk.Context. Passing false as 2nd arg, as we can't
// actually support proofs with gRPC right now.
sdkCtx, err := app.CreateQueryContext(height, false)
sdkCtx, err := app.createQueryContext(height, false)
if err != nil {
return nil, err
}

// Attach the sdk.Context into the gRPC's context.Context.
grpcCtx = context.WithValue(grpcCtx, sdk.SdkContextKey, sdkCtx)

// Add relevant gRPC headers
if height == 0 {
height = sdkCtx.BlockHeight() // If height was not set in the request, set it to the latest
}

// Attach the sdk.Context into the gRPC's context.Context.
grpcCtx = context.WithValue(grpcCtx, sdk.SdkContextKey, sdkCtx)

md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(height, 10))
if err = grpc.SetHeader(grpcCtx, md); err != nil {
app.logger.Error("failed to set gRPC header", "err", err)
}

app.logger.Debug("gRPC query received of type: " + fmt.Sprintf("%#v", req))

// Catch an OutOfGasPanic caused in the query handlers
defer func() {
if r := recover(); r != nil {
switch rType := r.(type) {
case storetypes.ErrorOutOfGas:
err = errorsmod.Wrapf(sdkerrors.ErrOutOfGas, "Query gas limit exceeded: %v, out of gas in location: %v", sdkCtx.GasMeter().Limit(), rType.Descriptor)
default:
panic(r)
}
}
}()
grpc.SetHeader(grpcCtx, md)

return handler(grpcCtx, req)
}
Expand Down
50 changes: 19 additions & 31 deletions client/grpc_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,29 @@ var _ gogogrpc.ClientConn = Context{}
var fallBackCodec = codec.NewProtoCodec(types.NewInterfaceRegistry())

// Invoke implements the grpc ClientConn.Invoke method
func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply interface{}, opts ...grpc.CallOption) (err error) {
func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, args, reply interface{}, opts ...grpc.CallOption) (err error) {
// Two things can happen here:
// 1. either we're broadcasting a Tx, in which call we call CometBFT's broadcast endpoint directly,
// 2-1. or we are querying for state, in which case we call grpc if grpc client set.
// 2-2. or we are querying for state, in which case we call ABCI's Query if grpc client not set.

// In both cases, we don't allow empty request args (it will panic unexpectedly).
if reflect.ValueOf(req).IsNil() {
return errorsmod.Wrap(sdkerrors.ErrInvalidRequest, "request cannot be nil")
if reflect.ValueOf(args).IsNil() {
return sdkerrors.Wrap(sdkerrors.ErrInvalidRequest, "request cannot be nil")
}

// Case 1. Broadcasting a Tx.
if reqProto, ok := req.(*tx.BroadcastTxRequest); ok {
if isBroadcast(method) {
req, ok := args.(*tx.BroadcastTxRequest)
if !ok {
return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxRequest)(nil), args)
}
res, ok := reply.(*tx.BroadcastTxResponse)
if !ok {
return errorsmod.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxResponse)(nil), req)
return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxResponse)(nil), args)
}

broadcastRes, err := TxServiceBroadcast(grpcCtx, ctx, reqProto)
broadcastRes, err := TxServiceBroadcast(grpcCtx, ctx, req)
if err != nil {
return err
}
Expand All @@ -56,13 +60,8 @@ func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply i
return err
}

if ctx.GRPCClient != nil {
// Case 2-1. Invoke grpc.
return ctx.GRPCClient.Invoke(grpcCtx, method, req, reply, opts...)
}

// Case 2-2. Querying state via abci query.
reqBz, err := ctx.gRPCCodec().Marshal(req)
// Case 2. Querying state.
reqBz, err := protoCodec.Marshal(args)
if err != nil {
return err
}
Expand All @@ -75,26 +74,26 @@ func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply i
return err
}
if height < 0 {
return errorsmod.Wrapf(
return sdkerrors.Wrapf(
sdkerrors.ErrInvalidRequest,
"client.Context.Invoke: height (%d) from %q must be >= 0", height, grpctypes.GRPCBlockHeightHeader)
}

ctx = ctx.WithHeight(height)
}

abciReq := abci.QueryRequest{
req := abci.RequestQuery{
Path: method,
Data: reqBz,
Height: ctx.Height,
}

res, err := ctx.QueryABCI(abciReq)
res, err := ctx.QueryABCI(req)
if err != nil {
return err
}

err = ctx.gRPCCodec().Unmarshal(res.Value, reply)
err = protoCodec.Unmarshal(res.Value, reply)
if err != nil {
return err
}
Expand Down Expand Up @@ -123,20 +122,9 @@ func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply i

// NewStream implements the grpc ClientConn.NewStream method
func (Context) NewStream(gocontext.Context, *grpc.StreamDesc, string, ...grpc.CallOption) (grpc.ClientStream, error) {
return nil, errors.New("streaming rpc not supported")
return nil, fmt.Errorf("streaming rpc not supported")
}

// gRPCCodec checks if Context's Codec is codec.GRPCCodecProvider
// otherwise it returns fallBackCodec.
func (ctx Context) gRPCCodec() encoding.Codec {
if ctx.Codec == nil {
return fallBackCodec.GRPCCodec()
}

pc, ok := ctx.Codec.(codec.GRPCCodecProvider)
if !ok {
return fallBackCodec.GRPCCodec()
}

return pc.GRPCCodec()
func isBroadcast(method string) bool {
return method == "/cosmos.tx.v1beta1.Service/BroadcastTx"
}
28 changes: 5 additions & 23 deletions server/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,12 @@ import (
_ "github.com/cosmos/cosmos-sdk/types/tx/amino" // Import amino.proto file for reflection
)

// NewGRPCServer returns a correctly configured and initialized gRPC server.
// Note, the caller is responsible for starting the server. See StartGRPCServer.
func NewGRPCServer(clientCtx client.Context, app types.Application, cfg config.GRPCConfig) (*grpc.Server, error) {
maxSendMsgSize := cfg.MaxSendMsgSize
if maxSendMsgSize == 0 {
maxSendMsgSize = config.DefaultGRPCMaxSendMsgSize
}

maxRecvMsgSize := cfg.MaxRecvMsgSize
if maxRecvMsgSize == 0 {
maxRecvMsgSize = config.DefaultGRPCMaxRecvMsgSize
}

grpcSrv := grpc.NewServer(
grpc.ForceServerCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec()),
grpc.MaxSendMsgSize(maxSendMsgSize),
grpc.MaxRecvMsgSize(maxRecvMsgSize),
)

// StartGRPCServer starts a gRPC server on the given address.
func StartGRPCServer(clientCtx client.Context, app types.Application, address string) (*grpc.Server, error) {
grpcSrv := grpc.NewServer()
app.RegisterGRPCServer(grpcSrv)

// Reflection allows consumers to build dynamic clients that can write to any
// Cosmos SDK application without relying on application packages at compile
// time.
// reflection allows consumers to build dynamic clients that can write
// to any cosmos-sdk application without relying on application packages at compile time
err := reflection.Register(grpcSrv, reflection.Config{
SigningModes: func() map[string]int32 {
supportedModes := clientCtx.TxConfig.SignModeHandler().SupportedModes()
Expand Down
2 changes: 2 additions & 0 deletions tests/integration/server/grpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ func (s *IntegrationTestSuite) TestGRPCServer_GetTxsEvent() {
Query: "message.action='send'",
},
)
// TODO Once https://github.com/cosmos/cosmos-sdk/pull/8029 is merged, this
// should not error anymore.
s.Require().NoError(err)
}

Expand Down

0 comments on commit 836d352

Please sign in to comment.