Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(stf,server/v2/cometbft): fix default events + improve codec handling #22837

Merged
merged 6 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 9 additions & 14 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"google.golang.org/protobuf/reflect/protoregistry"

"cosmossdk.io/collections"
addresscodec "cosmossdk.io/core/address"
appmodulev2 "cosmossdk.io/core/appmodule/v2"
"cosmossdk.io/core/comet"
corecontext "cosmossdk.io/core/context"
Expand All @@ -35,8 +34,6 @@ import (
"cosmossdk.io/server/v2/streaming"
"cosmossdk.io/store/v2/snapshots"
consensustypes "cosmossdk.io/x/consensus/types"

"github.com/cosmos/cosmos-sdk/codec"
)

const (
Expand All @@ -52,13 +49,12 @@ type consensus[T transaction.Tx] struct {
logger log.Logger
appName, version string
app appmanager.AppManager[T]
appCodec codec.Codec
txCodec transaction.Codec[T]
store types.Store
listener *appdata.Listener
snapshotManager *snapshots.Manager
streamingManager streaming.Manager
mempool mempool.Mempool[T]
appCodecs AppCodecs[T]

cfg Config
chainID string
Expand All @@ -84,16 +80,15 @@ type consensus[T transaction.Tx] struct {
addrPeerFilter types.PeerFilter // filter peers by address and port
idPeerFilter types.PeerFilter // filter peers by node ID

queryHandlersMap map[string]appmodulev2.Handler
getProtoRegistry func() (*protoregistry.Files, error)
consensusAddressCodec addresscodec.Codec
cfgMap server.ConfigMap
queryHandlersMap map[string]appmodulev2.Handler
getProtoRegistry func() (*protoregistry.Files, error)
cfgMap server.ConfigMap
}

// CheckTx implements types.Application.
// It is called by cometbft to verify transaction validity
func (c *consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxRequest) (*abciproto.CheckTxResponse, error) {
decodedTx, err := c.txCodec.Decode(req.Tx)
decodedTx, err := c.appCodecs.TxCodec.Decode(req.Tx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -325,7 +320,7 @@ func (c *consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRe
ctx,
br,
req.AppStateBytes,
c.txCodec)
c.appCodecs.TxCodec)
if err != nil {
return nil, fmt.Errorf("genesis state init failure: %w", err)
}
Expand Down Expand Up @@ -392,7 +387,7 @@ func (c *consensus[T]) PrepareProposal(
LastCommit: toCoreExtendedCommitInfo(req.LocalLastCommit),
})

txs, err := c.prepareProposalHandler(ciCtx, c.app, c.txCodec, req)
txs, err := c.prepareProposalHandler(ciCtx, c.app, c.appCodecs.TxCodec, req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -438,7 +433,7 @@ func (c *consensus[T]) ProcessProposal(
LastCommit: toCoreCommitInfo(req.ProposedLastCommit),
})

err := c.processProposalHandler(ciCtx, c.app, c.txCodec, req)
err := c.processProposalHandler(ciCtx, c.app, c.appCodecs.TxCodec, req)
if err != nil {
c.logger.Error("failed to process proposal", "height", req.Height, "time", req.Time, "hash", fmt.Sprintf("%X", req.Hash), "err", err)
return &abciproto.ProcessProposalResponse{
Expand Down Expand Up @@ -567,7 +562,7 @@ func (c *consensus[T]) internalFinalizeBlock(
// TODO(tip): can we expect some txs to not decode? if so, what we do in this case? this does not seem to be the case,
// considering that prepare and process always decode txs, assuming they're the ones providing txs we should never
// have a tx that fails decoding.
decodedTxs, err := decodeTxs(c.logger, req.Txs, c.txCodec)
decodedTxs, err := decodeTxs(c.logger, req.Txs, c.appCodecs.TxCodec)
if err != nil {
return nil, nil, nil, err
}
Expand Down
16 changes: 9 additions & 7 deletions server/v2/cometbft/abci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,13 +886,15 @@ func setUpConsensus(t *testing.T, gasLimit uint64, mempool mempool.Mempool[mock.
}

return &consensus[mock.Tx]{
logger: log.NewNopLogger(),
appName: "testing-app",
app: am,
mempool: mempool,
store: mockStore,
cfg: Config{AppTomlConfig: DefaultAppTomlConfig()},
txCodec: mock.TxCodec{},
logger: log.NewNopLogger(),
appName: "testing-app",
app: am,
mempool: mempool,
store: mockStore,
cfg: Config{AppTomlConfig: DefaultAppTomlConfig()},
appCodecs: AppCodecs[mock.Tx]{
TxCodec: mock.TxCodec{},
},
chainID: "test",
getProtoRegistry: sync.OnceValues(gogoproto.MergedRegistry),
queryHandlersMap: queryHandler,
Expand Down
41 changes: 20 additions & 21 deletions server/v2/cometbft/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
nodeservice "github.com/cosmos/cosmos-sdk/client/grpc/node"
"github.com/cosmos/cosmos-sdk/codec"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
"github.com/cosmos/cosmos-sdk/std"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
"github.com/cosmos/cosmos-sdk/types/query"
Expand Down Expand Up @@ -117,12 +116,13 @@ func (t txServer[T]) GetBlockWithTxs(ctx context.Context, req *txtypes.GetBlockW
}
decodeTxAt := func(i uint64) error {
tx := blockTxs[i]
txb, err := t.clientCtx.TxConfig.TxDecoder()(tx)
fmt.Println("TxDecoder", txb, err)
txb, err := t.txCodec.Decode(tx)
if err != nil {
return err
}
p, err := txb.(interface{ AsTx() (*txtypes.Tx, error) }).AsTx()

// txServer works only with sdk.Tx
p, err := any(txb).(interface{ AsTx() (*txtypes.Tx, error) }).AsTx()
if err != nil {
return err
}
Expand Down Expand Up @@ -256,13 +256,19 @@ func (t txServer[T]) Simulate(ctx context.Context, req *txtypes.SimulateRequest)
msgResponses = append(msgResponses, anyMsg)
}

event, err := intoABCIEvents(txResult.Events, map[string]struct{}{}, false)
if err != nil {
return nil, status.Errorf(codes.Unknown, "failed to convert events: %v", err)
}

return &txtypes.SimulateResponse{
GasInfo: &sdk.GasInfo{
GasUsed: txResult.GasUsed,
GasWanted: txResult.GasWanted,
},
Result: &sdk.Result{
MsgResponses: msgResponses,
Events: event,
},
}, nil
}
Expand All @@ -273,15 +279,17 @@ func (t txServer[T]) TxDecode(ctx context.Context, req *txtypes.TxDecodeRequest)
return nil, status.Error(codes.InvalidArgument, "invalid empty tx bytes")
}

txb, err := t.clientCtx.TxConfig.TxDecoder()(req.TxBytes)
txb, err := t.txCodec.Decode(req.TxBytes)
if err != nil {
return nil, err
}

tx, err := txb.(interface{ AsTx() (*txtypes.Tx, error) }).AsTx() // TODO: maybe we can break the Tx interface to add this also
// txServer works only with sdk.Tx
tx, err := any(txb).(interface{ AsTx() (*txtypes.Tx, error) }).AsTx()
if err != nil {
return nil, err
}

return &txtypes.TxDecodeResponse{
Tx: tx,
}, nil
Expand Down Expand Up @@ -350,7 +358,7 @@ func (t txServer[T]) TxEncodeAmino(_ context.Context, req *txtypes.TxEncodeAmino
var stdTx legacytx.StdTx
err := t.clientCtx.LegacyAmino.UnmarshalJSON([]byte(req.AminoJson), &stdTx)
if err != nil {
return nil, err
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("invalid request %s", err))
}

encodedBytes, err := t.clientCtx.LegacyAmino.Marshal(stdTx)
Expand Down Expand Up @@ -466,7 +474,7 @@ func (c *consensus[T]) maybeHandleExternalServices(ctx context.Context, req *abc
if strings.HasPrefix(req.Path, "/cosmos.base.tendermint.v1beta1.Service") {
rpcClient, _ := rpchttp.New(c.cfg.ConfigTomlConfig.RPC.ListenAddress)

cometQServer := cmtservice.NewQueryServer(rpcClient, c.Query, c.consensusAddressCodec)
cometQServer := cmtservice.NewQueryServer(rpcClient, c.Query, c.appCodecs.ConsensusAddressCodec)
paths := strings.Split(req.Path, "/")
if len(paths) <= 2 {
return nil, fmt.Errorf("invalid request path: %s", req.Path)
Expand Down Expand Up @@ -516,27 +524,18 @@ func (c *consensus[T]) maybeHandleExternalServices(ctx context.Context, req *abc

// Handle tx service
if strings.HasPrefix(req.Path, "/cosmos.tx.v1beta1.Service") {
// init simple client context
amino := codec.NewLegacyAmino()
std.RegisterLegacyAminoCodec(amino)
txConfig := authtx.NewTxConfig(
c.appCodec,
c.appCodec.InterfaceRegistry().SigningContext().AddressCodec(),
c.appCodec.InterfaceRegistry().SigningContext().ValidatorAddressCodec(),
authtx.DefaultSignModes,
)
rpcClient, _ := client.NewClientFromNode(c.cfg.AppTomlConfig.Address)

// init simple client context
clientCtx := client.Context{}.
WithLegacyAmino(amino).
WithCodec(c.appCodec).
WithTxConfig(txConfig).
WithLegacyAmino(c.appCodecs.LegacyAmino.(*codec.LegacyAmino)).
WithCodec(c.appCodecs.AppCodec).
WithNodeURI(c.cfg.AppTomlConfig.Address).
WithClient(rpcClient)

txService := txServer[T]{
clientCtx: clientCtx,
txCodec: c.txCodec,
txCodec: c.appCodecs.TxCodec,
app: c.app,
consensus: c,
}
Expand Down
2 changes: 1 addition & 1 deletion server/v2/cometbft/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (c *consensus[T]) handleQueryApp(ctx context.Context, path []string, req *a

switch path[1] {
case "simulate":
tx, err := c.txCodec.Decode(req.Data)
tx, err := c.appCodecs.TxCodec.Decode(req.Data)
if err != nil {
return nil, errorsmod.Wrap(err, "failed to decode tx")
}
Expand Down
23 changes: 16 additions & 7 deletions server/v2/cometbft/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

addresscodec "cosmossdk.io/core/address"
appmodulev2 "cosmossdk.io/core/appmodule/v2"
"cosmossdk.io/core/registry"
"cosmossdk.io/core/server"
"cosmossdk.io/core/transaction"
"cosmossdk.io/log"
Expand Down Expand Up @@ -66,14 +67,24 @@ type CometBFTServer[T transaction.Tx] struct {
store types.Store
}

// AppCodecs contains all codecs that the CometBFT server requires
// provided by the application. They are extracted in struct to not be API
// breaking once amino is completely deprecated or new codecs should be added.
type AppCodecs[T transaction.Tx] struct {
TxCodec transaction.Codec[T]

// The following codecs are only required for the gRPC services
AppCodec codec.Codec
LegacyAmino registry.AminoRegistrar
ConsensusAddressCodec addresscodec.Codec
}

func New[T transaction.Tx](
logger log.Logger,
appName string,
store types.Store,
app appmanager.AppManager[T],
appCodec codec.Codec,
txCodec transaction.Codec[T],
consensusAddressCodec addresscodec.Codec,
appCodecs AppCodecs[T],
queryHandlers map[string]appmodulev2.Handler,
decoderResolver decoding.DecoderResolver,
serverOptions ServerOptions[T],
Expand All @@ -84,7 +95,7 @@ func New[T transaction.Tx](
serverOptions: serverOptions,
cfgOptions: cfgOptions,
app: app,
txCodec: txCodec,
txCodec: appCodecs.TxCodec,
store: store,
}
srv.logger = logger.With(log.ModuleKey, srv.Name())
Expand Down Expand Up @@ -172,8 +183,7 @@ func New[T transaction.Tx](
cfg: srv.config,
store: store,
logger: logger,
txCodec: txCodec,
appCodec: appCodec,
appCodecs: appCodecs,
listener: listener,
snapshotManager: snapshotManager,
streamingManager: srv.serverOptions.StreamingManager,
Expand All @@ -192,7 +202,6 @@ func New[T transaction.Tx](
addrPeerFilter: srv.serverOptions.AddrPeerFilter,
idPeerFilter: srv.serverOptions.IdPeerFilter,
cfgMap: cfg,
consensusAddressCodec: consensusAddressCodec,
}

c.optimisticExec = oe.NewOptimisticExecution(
Expand Down
43 changes: 43 additions & 0 deletions server/v2/stf/stf.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package stf

import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"

appmodulev2 "cosmossdk.io/core/appmodule/v2"
corecontext "cosmossdk.io/core/context"
Expand Down Expand Up @@ -358,12 +360,53 @@ func (s STF[T]) runTxMsgs(
e.EventIndex = int32(j + 1)
events = append(events, e)
}

// add message event
events = append(events, createMessageEvent(msg, int32(i+1), int32(len(execCtx.events)+1)))
}

consumed := execCtx.meter.Limit() - execCtx.meter.Remaining()
return msgResps, consumed, events, nil
}

// Create a message event, with two kv: action, the type url of the message
// and module, the module of the message.
func createMessageEvent(msg transaction.Msg, msgIndex, eventIndex int32) event.Event {
// Assumes that module name is the second element of the msg type URL
// e.g. "cosmos.bank.v1beta1.MsgSend" => "bank"
// It returns an empty string if the input is not a valid type URL
getModuleNameFromTypeURL := func(input string) string {
moduleName := strings.Split(input, ".")
if len(moduleName) > 1 {
return moduleName[1]
}

return ""
}
julienrbrt marked this conversation as resolved.
Show resolved Hide resolved

return event.Event{
MsgIndex: msgIndex,
EventIndex: eventIndex,
Type: "message",
Attributes: func() ([]appdata.EventAttribute, error) {
typeURL := msgTypeURL(msg)
return []appdata.EventAttribute{
{Key: "action", Value: "/" + typeURL},
{Key: "module", Value: getModuleNameFromTypeURL(typeURL)},
}, nil
},
Comment on lines +392 to +397
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for msgTypeURL.

The function should handle potential errors from msgTypeURL.

Add error handling:

 Attributes: func() ([]appdata.EventAttribute, error) {
   typeURL := msgTypeURL(msg)
+  if typeURL == "" {
+    return nil, fmt.Errorf("failed to get type URL for message")
+  }
   return []appdata.EventAttribute{
     {Key: "action", Value: "/" + typeURL},
     {Key: "module", Value: getModuleNameFromTypeURL(typeURL)},
   }, nil
 },

Committable suggestion skipped: line range outside the PR's diff.

Data: func() (json.RawMessage, error) {
typeURL := msgTypeURL(msg)
attrs := []appdata.EventAttribute{
{Key: "action", Value: "/" + typeURL},
{Key: "module", Value: getModuleNameFromTypeURL(typeURL)},
}

return json.Marshal(attrs)
},
}
}

// preBlock executes the pre block logic.
func (s STF[T]) preBlock(
ctx *executionContext,
Expand Down
Loading
Loading