Skip to content

Commit

Permalink
feat: Modify grpc gateway to be concurrent (#11234)
Browse files Browse the repository at this point in the history
Current grpc happens to be concurrent, while the grpc gateway itself is not, since it always uses abci query. Therefore, as the current queries are not concurrent, throughput has the room for improvement. This PR changes the grpc gateway so that when server is ran by a node daemon, it directly calls grpc to make queries concurrent. Any services that uses grpc gateway could improve throughput by fundamental amount, which has been tested and ensured in the process of running an Osmosis node using the current chagnes.

The code base has the following changes:
- GRPCClient field has been added to Client Context.
- The `Invoke` method in Client Context would use ABCI query when GRPCClient field is set to nil, otherwise use the GRPC Client to return results that have used grpc.
- If GRPC is set to enable in `startInProcess`, it sets the GRPC Client field in Client Context.
  • Loading branch information
Thunnini authored Mar 9, 2022
1 parent dc09270 commit 5356a86
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* [\#11240](https://github.com/cosmos/cosmos-sdk/pull/11240) Replace various modules `ModuleCdc` with the global `legacy.Cdc`
* [#11179](https://github.com/cosmos/cosmos-sdk/pull/11179) Add state rollback command.
* [\#10794](https://github.com/cosmos/cosmos-sdk/pull/10794) ADR-040: Add State Sync to V2 Store
* [\#11234](https://github.com/cosmos/cosmos-sdk/pull/11234) Add `GRPCClient` field to Client Context. If `GRPCClient` field is set to nil, the `Invoke` method would use ABCI query, otherwise use gprc.

### API Breaking Changes

Expand Down
10 changes: 10 additions & 0 deletions client/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (

"sigs.k8s.io/yaml"

"google.golang.org/grpc"

"github.com/gogo/protobuf/proto"
rpcclient "github.com/tendermint/tendermint/rpc/client"

Expand All @@ -23,6 +25,7 @@ import (
type Context struct {
FromAddress sdk.AccAddress
Client rpcclient.Client
GRPCClient *grpc.ClientConn
ChainID string
Codec codec.Codec
InterfaceRegistry codectypes.InterfaceRegistry
Expand Down Expand Up @@ -128,6 +131,13 @@ func (ctx Context) WithClient(client rpcclient.Client) Context {
return ctx
}

// WithGRPCClient returns a copy of the context with an updated GRPC client
// instance.
func (ctx Context) WithGRPCClient(grpcClient *grpc.ClientConn) Context {
ctx.GRPCClient = grpcClient
return ctx
}

// WithUseLedger returns a copy of the context with an updated UseLedger flag.
func (ctx Context) WithUseLedger(useLedger bool) Context {
ctx.UseLedger = useLedger
Expand Down
9 changes: 8 additions & 1 deletion client/grpc_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ var fallBackCodec = codec.NewProtoCodec(failingInterfaceRegistry{})
func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply interface{}, opts ...grpc.CallOption) (err error) {
// Two things can happen here:
// 1. either we're broadcasting a Tx, in which call we call Tendermint's broadcast endpoint directly,
// 2. or we are querying for state, in which case we call ABCI's Query.
// 2-1. or we are querying for state, in which case we call grpc if grpc client set.
// 2-2. or we are querying for state, in which case we call ABCI's Query if grpc client not set.

// In both cases, we don't allow empty request args (it will panic unexpectedly).
if reflect.ValueOf(req).IsNil() {
Expand All @@ -55,6 +56,12 @@ func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply i
return err
}

if ctx.GRPCClient != nil {
// Case 2-1. Invoke grpc.
return ctx.GRPCClient.Invoke(grpcCtx, method, req, reply, opts...)
}

// Case 2-2. Querying state via abci query.
reqBz, err := ctx.gRPCCodec().Marshal(req)
if err != nil {
return err
Expand Down
21 changes: 21 additions & 0 deletions server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package server

import (
"fmt"
"net"
"net/http"
"os"
"runtime/pprof"
Expand All @@ -18,6 +19,7 @@ import (
"github.com/tendermint/tendermint/rpc/client/local"
tmtypes "github.com/tendermint/tendermint/types"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/client/flags"
Expand Down Expand Up @@ -296,6 +298,25 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
WithHomeDir(home).
WithChainID(genDoc.ChainID)

if config.GRPC.Enable {
_, port, err := net.SplitHostPort(config.GRPC.Address)
if err != nil {
return err
}
grpcAddress := fmt.Sprintf("127.0.0.1:%s", port)
// If grpc is enabled, configure grpc client for grpc gateway.
grpcClient, err := grpc.Dial(
grpcAddress,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.ForceCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec())),
)
if err != nil {
return err
}
clientCtx = clientCtx.WithGRPCClient(grpcClient)
ctx.Logger.Debug("grpc client assigned to client context", "target", grpcAddress)
}

apiSrv = api.New(clientCtx, ctx.Logger.With("module", "api-server"))
app.RegisterAPIRoutes(apiSrv, config.API)
errCh := make(chan error)
Expand Down

0 comments on commit 5356a86

Please sign in to comment.