Skip to content

Commit

Permalink
fix(stf,server/v2/cometbft): fix default events + improve codec handl…
Browse files Browse the repository at this point in the history
…ing (#22837)
  • Loading branch information
julienrbrt authored Dec 12, 2024
1 parent a068405 commit e6948ee
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 59 deletions.
23 changes: 9 additions & 14 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"google.golang.org/protobuf/reflect/protoregistry"

"cosmossdk.io/collections"
addresscodec "cosmossdk.io/core/address"
appmodulev2 "cosmossdk.io/core/appmodule/v2"
"cosmossdk.io/core/comet"
corecontext "cosmossdk.io/core/context"
Expand All @@ -35,8 +34,6 @@ import (
"cosmossdk.io/server/v2/streaming"
"cosmossdk.io/store/v2/snapshots"
consensustypes "cosmossdk.io/x/consensus/types"

"github.com/cosmos/cosmos-sdk/codec"
)

const (
Expand All @@ -52,13 +49,12 @@ type consensus[T transaction.Tx] struct {
logger log.Logger
appName, version string
app appmanager.AppManager[T]
appCodec codec.Codec
txCodec transaction.Codec[T]
store types.Store
listener *appdata.Listener
snapshotManager *snapshots.Manager
streamingManager streaming.Manager
mempool mempool.Mempool[T]
appCodecs AppCodecs[T]

cfg Config
chainID string
Expand All @@ -84,16 +80,15 @@ type consensus[T transaction.Tx] struct {
addrPeerFilter types.PeerFilter // filter peers by address and port
idPeerFilter types.PeerFilter // filter peers by node ID

queryHandlersMap map[string]appmodulev2.Handler
getProtoRegistry func() (*protoregistry.Files, error)
consensusAddressCodec addresscodec.Codec
cfgMap server.ConfigMap
queryHandlersMap map[string]appmodulev2.Handler
getProtoRegistry func() (*protoregistry.Files, error)
cfgMap server.ConfigMap
}

// CheckTx implements types.Application.
// It is called by cometbft to verify transaction validity
func (c *consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxRequest) (*abciproto.CheckTxResponse, error) {
decodedTx, err := c.txCodec.Decode(req.Tx)
decodedTx, err := c.appCodecs.TxCodec.Decode(req.Tx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -325,7 +320,7 @@ func (c *consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRe
ctx,
br,
req.AppStateBytes,
c.txCodec)
c.appCodecs.TxCodec)
if err != nil {
return nil, fmt.Errorf("genesis state init failure: %w", err)
}
Expand Down Expand Up @@ -392,7 +387,7 @@ func (c *consensus[T]) PrepareProposal(
LastCommit: toCoreExtendedCommitInfo(req.LocalLastCommit),
})

txs, err := c.prepareProposalHandler(ciCtx, c.app, c.txCodec, req)
txs, err := c.prepareProposalHandler(ciCtx, c.app, c.appCodecs.TxCodec, req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -438,7 +433,7 @@ func (c *consensus[T]) ProcessProposal(
LastCommit: toCoreCommitInfo(req.ProposedLastCommit),
})

err := c.processProposalHandler(ciCtx, c.app, c.txCodec, req)
err := c.processProposalHandler(ciCtx, c.app, c.appCodecs.TxCodec, req)
if err != nil {
c.logger.Error("failed to process proposal", "height", req.Height, "time", req.Time, "hash", fmt.Sprintf("%X", req.Hash), "err", err)
return &abciproto.ProcessProposalResponse{
Expand Down Expand Up @@ -567,7 +562,7 @@ func (c *consensus[T]) internalFinalizeBlock(
// TODO(tip): can we expect some txs to not decode? if so, what we do in this case? this does not seem to be the case,
// considering that prepare and process always decode txs, assuming they're the ones providing txs we should never
// have a tx that fails decoding.
decodedTxs, err := decodeTxs(c.logger, req.Txs, c.txCodec)
decodedTxs, err := decodeTxs(c.logger, req.Txs, c.appCodecs.TxCodec)
if err != nil {
return nil, nil, nil, err
}
Expand Down
16 changes: 9 additions & 7 deletions server/v2/cometbft/abci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,13 +886,15 @@ func setUpConsensus(t *testing.T, gasLimit uint64, mempool mempool.Mempool[mock.
}

return &consensus[mock.Tx]{
logger: log.NewNopLogger(),
appName: "testing-app",
app: am,
mempool: mempool,
store: mockStore,
cfg: Config{AppTomlConfig: DefaultAppTomlConfig()},
txCodec: mock.TxCodec{},
logger: log.NewNopLogger(),
appName: "testing-app",
app: am,
mempool: mempool,
store: mockStore,
cfg: Config{AppTomlConfig: DefaultAppTomlConfig()},
appCodecs: AppCodecs[mock.Tx]{
TxCodec: mock.TxCodec{},
},
chainID: "test",
getProtoRegistry: sync.OnceValues(gogoproto.MergedRegistry),
queryHandlersMap: queryHandler,
Expand Down
41 changes: 20 additions & 21 deletions server/v2/cometbft/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
nodeservice "github.com/cosmos/cosmos-sdk/client/grpc/node"
"github.com/cosmos/cosmos-sdk/codec"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
"github.com/cosmos/cosmos-sdk/std"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
"github.com/cosmos/cosmos-sdk/types/query"
Expand Down Expand Up @@ -117,12 +116,13 @@ func (t txServer[T]) GetBlockWithTxs(ctx context.Context, req *txtypes.GetBlockW
}
decodeTxAt := func(i uint64) error {
tx := blockTxs[i]
txb, err := t.clientCtx.TxConfig.TxDecoder()(tx)
fmt.Println("TxDecoder", txb, err)
txb, err := t.txCodec.Decode(tx)
if err != nil {
return err
}
p, err := txb.(interface{ AsTx() (*txtypes.Tx, error) }).AsTx()

// txServer works only with sdk.Tx
p, err := any(txb).(interface{ AsTx() (*txtypes.Tx, error) }).AsTx()
if err != nil {
return err
}
Expand Down Expand Up @@ -256,13 +256,19 @@ func (t txServer[T]) Simulate(ctx context.Context, req *txtypes.SimulateRequest)
msgResponses = append(msgResponses, anyMsg)
}

event, err := intoABCIEvents(txResult.Events, map[string]struct{}{}, false)
if err != nil {
return nil, status.Errorf(codes.Unknown, "failed to convert events: %v", err)
}

return &txtypes.SimulateResponse{
GasInfo: &sdk.GasInfo{
GasUsed: txResult.GasUsed,
GasWanted: txResult.GasWanted,
},
Result: &sdk.Result{
MsgResponses: msgResponses,
Events: event,
},
}, nil
}
Expand All @@ -273,15 +279,17 @@ func (t txServer[T]) TxDecode(ctx context.Context, req *txtypes.TxDecodeRequest)
return nil, status.Error(codes.InvalidArgument, "invalid empty tx bytes")
}

txb, err := t.clientCtx.TxConfig.TxDecoder()(req.TxBytes)
txb, err := t.txCodec.Decode(req.TxBytes)
if err != nil {
return nil, err
}

tx, err := txb.(interface{ AsTx() (*txtypes.Tx, error) }).AsTx() // TODO: maybe we can break the Tx interface to add this also
// txServer works only with sdk.Tx
tx, err := any(txb).(interface{ AsTx() (*txtypes.Tx, error) }).AsTx()
if err != nil {
return nil, err
}

return &txtypes.TxDecodeResponse{
Tx: tx,
}, nil
Expand Down Expand Up @@ -350,7 +358,7 @@ func (t txServer[T]) TxEncodeAmino(_ context.Context, req *txtypes.TxEncodeAmino
var stdTx legacytx.StdTx
err := t.clientCtx.LegacyAmino.UnmarshalJSON([]byte(req.AminoJson), &stdTx)
if err != nil {
return nil, err
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("invalid request %s", err))
}

encodedBytes, err := t.clientCtx.LegacyAmino.Marshal(stdTx)
Expand Down Expand Up @@ -466,7 +474,7 @@ func (c *consensus[T]) maybeHandleExternalServices(ctx context.Context, req *abc
if strings.HasPrefix(req.Path, "/cosmos.base.tendermint.v1beta1.Service") {
rpcClient, _ := rpchttp.New(c.cfg.ConfigTomlConfig.RPC.ListenAddress)

cometQServer := cmtservice.NewQueryServer(rpcClient, c.Query, c.consensusAddressCodec)
cometQServer := cmtservice.NewQueryServer(rpcClient, c.Query, c.appCodecs.ConsensusAddressCodec)
paths := strings.Split(req.Path, "/")
if len(paths) <= 2 {
return nil, fmt.Errorf("invalid request path: %s", req.Path)
Expand Down Expand Up @@ -516,27 +524,18 @@ func (c *consensus[T]) maybeHandleExternalServices(ctx context.Context, req *abc

// Handle tx service
if strings.HasPrefix(req.Path, "/cosmos.tx.v1beta1.Service") {
// init simple client context
amino := codec.NewLegacyAmino()
std.RegisterLegacyAminoCodec(amino)
txConfig := authtx.NewTxConfig(
c.appCodec,
c.appCodec.InterfaceRegistry().SigningContext().AddressCodec(),
c.appCodec.InterfaceRegistry().SigningContext().ValidatorAddressCodec(),
authtx.DefaultSignModes,
)
rpcClient, _ := client.NewClientFromNode(c.cfg.AppTomlConfig.Address)

// init simple client context
clientCtx := client.Context{}.
WithLegacyAmino(amino).
WithCodec(c.appCodec).
WithTxConfig(txConfig).
WithLegacyAmino(c.appCodecs.LegacyAmino.(*codec.LegacyAmino)).
WithCodec(c.appCodecs.AppCodec).
WithNodeURI(c.cfg.AppTomlConfig.Address).
WithClient(rpcClient)

txService := txServer[T]{
clientCtx: clientCtx,
txCodec: c.txCodec,
txCodec: c.appCodecs.TxCodec,
app: c.app,
consensus: c,
}
Expand Down
2 changes: 1 addition & 1 deletion server/v2/cometbft/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (c *consensus[T]) handleQueryApp(ctx context.Context, path []string, req *a

switch path[1] {
case "simulate":
tx, err := c.txCodec.Decode(req.Data)
tx, err := c.appCodecs.TxCodec.Decode(req.Data)
if err != nil {
return nil, errorsmod.Wrap(err, "failed to decode tx")
}
Expand Down
23 changes: 16 additions & 7 deletions server/v2/cometbft/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

addresscodec "cosmossdk.io/core/address"
appmodulev2 "cosmossdk.io/core/appmodule/v2"
"cosmossdk.io/core/registry"
"cosmossdk.io/core/server"
"cosmossdk.io/core/transaction"
"cosmossdk.io/log"
Expand Down Expand Up @@ -66,14 +67,24 @@ type CometBFTServer[T transaction.Tx] struct {
store types.Store
}

// AppCodecs contains all codecs that the CometBFT server requires
// provided by the application. They are extracted in struct to not be API
// breaking once amino is completely deprecated or new codecs should be added.
type AppCodecs[T transaction.Tx] struct {
TxCodec transaction.Codec[T]

// The following codecs are only required for the gRPC services
AppCodec codec.Codec
LegacyAmino registry.AminoRegistrar
ConsensusAddressCodec addresscodec.Codec
}

func New[T transaction.Tx](
logger log.Logger,
appName string,
store types.Store,
app appmanager.AppManager[T],
appCodec codec.Codec,
txCodec transaction.Codec[T],
consensusAddressCodec addresscodec.Codec,
appCodecs AppCodecs[T],
queryHandlers map[string]appmodulev2.Handler,
decoderResolver decoding.DecoderResolver,
serverOptions ServerOptions[T],
Expand All @@ -84,7 +95,7 @@ func New[T transaction.Tx](
serverOptions: serverOptions,
cfgOptions: cfgOptions,
app: app,
txCodec: txCodec,
txCodec: appCodecs.TxCodec,
store: store,
}
srv.logger = logger.With(log.ModuleKey, srv.Name())
Expand Down Expand Up @@ -172,8 +183,7 @@ func New[T transaction.Tx](
cfg: srv.config,
store: store,
logger: logger,
txCodec: txCodec,
appCodec: appCodec,
appCodecs: appCodecs,
listener: listener,
snapshotManager: snapshotManager,
streamingManager: srv.serverOptions.StreamingManager,
Expand All @@ -192,7 +202,6 @@ func New[T transaction.Tx](
addrPeerFilter: srv.serverOptions.AddrPeerFilter,
idPeerFilter: srv.serverOptions.IdPeerFilter,
cfgMap: cfg,
consensusAddressCodec: consensusAddressCodec,
}

c.optimisticExec = oe.NewOptimisticExecution(
Expand Down
43 changes: 43 additions & 0 deletions server/v2/stf/stf.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package stf

import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"

appmodulev2 "cosmossdk.io/core/appmodule/v2"
corecontext "cosmossdk.io/core/context"
Expand Down Expand Up @@ -358,12 +360,53 @@ func (s STF[T]) runTxMsgs(
e.EventIndex = int32(j + 1)
events = append(events, e)
}

// add message event
events = append(events, createMessageEvent(msg, int32(i+1), int32(len(execCtx.events)+1)))
}

consumed := execCtx.meter.Limit() - execCtx.meter.Remaining()
return msgResps, consumed, events, nil
}

// Create a message event, with two kv: action, the type url of the message
// and module, the module of the message.
func createMessageEvent(msg transaction.Msg, msgIndex, eventIndex int32) event.Event {
// Assumes that module name is the second element of the msg type URL
// e.g. "cosmos.bank.v1beta1.MsgSend" => "bank"
// It returns an empty string if the input is not a valid type URL
getModuleNameFromTypeURL := func(input string) string {
moduleName := strings.Split(input, ".")
if len(moduleName) > 1 {
return moduleName[1]
}

return ""
}

return event.Event{
MsgIndex: msgIndex,
EventIndex: eventIndex,
Type: "message",
Attributes: func() ([]appdata.EventAttribute, error) {
typeURL := msgTypeURL(msg)
return []appdata.EventAttribute{
{Key: "action", Value: "/" + typeURL},
{Key: "module", Value: getModuleNameFromTypeURL(typeURL)},
}, nil
},
Data: func() (json.RawMessage, error) {
typeURL := msgTypeURL(msg)
attrs := []appdata.EventAttribute{
{Key: "action", Value: "/" + typeURL},
{Key: "module", Value: getModuleNameFromTypeURL(typeURL)},
}

return json.Marshal(attrs)
},
}
}

// preBlock executes the pre block logic.
func (s STF[T]) preBlock(
ctx *executionContext,
Expand Down
Loading

0 comments on commit e6948ee

Please sign in to comment.