Skip to content

Commit

Permalink
fix: withold gravity batches until valset update (#1195)
Browse files Browse the repository at this point in the history
# Related Github tickets

- #1464

# Background

This change adds a new constraint when handing out gravity batches to be
relayed. They will now be withheld while valset updates are pending on
the target chain, just like with SLC. I had to cut some corners to get
this out sooner rather than later, but I also added a more decentralised
event bus system for decoupling systems. It's currently very bare bone
and as singleton hard to test, but if it grows, we can refactor into
something more injectible.

# Testing completed

- [x] test coverage exists or has been added/updated
- [ ] tested in a private testnet

# Breaking changes

- [x] I have checked my code for breaking changes
- [x] If there are breaking changes, there is a supporting migration.
  • Loading branch information
byte-bandit authored Jun 17, 2024
1 parent 3ce9062 commit 91cd199
Show file tree
Hide file tree
Showing 15 changed files with 212 additions and 40 deletions.
1 change: 1 addition & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,7 @@ func New(
app.DistrKeeper,
app.TransferKeeper,
app.EvmKeeper,
app.ConsensusKeeper,
gravitymodulekeeper.NewGravityStoreGetter(keys[gravitymoduletypes.StoreKey]),
authorityAddress,
authcodec.NewBech32Codec(chainparams.ValidatorAddressPrefix),
Expand Down
10 changes: 5 additions & 5 deletions tests/integration/evm/keeper/keeper_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func TestEndToEndForEvmArbitraryCall(t *testing.T) {
})

require.NoError(t, err)
queue := consensustypes.Queue(keeper.ConsensusTurnstoneMessage, chainType, chainReferenceID)
queue := consensustypes.Queue(types.ConsensusTurnstoneMessage, chainType, chainReferenceID)
msgs, err := f.consensusKeeper.GetMessagesForSigning(ctx, queue, operator)
require.NoError(t, err)

Expand Down Expand Up @@ -208,7 +208,7 @@ func TestFirstSnapshot_OnSnapshotBuilt(t *testing.T) {
require.NoError(t, err)
}

queue := fmt.Sprintf("evm/%s/%s", newChain.GetChainReferenceID(), keeper.ConsensusTurnstoneMessage)
queue := fmt.Sprintf("evm/%s/%s", newChain.GetChainReferenceID(), types.ConsensusTurnstoneMessage)
msgs, err := f.consensusKeeper.GetMessagesFromQueue(ctx, queue, 100)
require.NoError(t, err)
require.Empty(t, msgs)
Expand Down Expand Up @@ -279,7 +279,7 @@ func TestRecentPublishedSnapshot_OnSnapshotBuilt(t *testing.T) {
require.NoError(t, err)
}

queue := fmt.Sprintf("evm/%s/%s", newChain.GetChainReferenceID(), keeper.ConsensusTurnstoneMessage)
queue := fmt.Sprintf("evm/%s/%s", newChain.GetChainReferenceID(), types.ConsensusTurnstoneMessage)

msgs, err := f.consensusKeeper.GetMessagesFromQueue(ctx, queue, 1)
require.NoError(t, err)
Expand Down Expand Up @@ -386,7 +386,7 @@ func TestOldPublishedSnapshot_OnSnapshotBuilt(t *testing.T) {
require.NoError(t, err)
}

queue := fmt.Sprintf("evm/%s/%s", newChain.GetChainReferenceID(), keeper.ConsensusTurnstoneMessage)
queue := fmt.Sprintf("evm/%s/%s", newChain.GetChainReferenceID(), types.ConsensusTurnstoneMessage)

msgs, err := f.consensusKeeper.GetMessagesFromQueue(ctx, queue, 1)
require.NoError(t, err)
Expand Down Expand Up @@ -463,7 +463,7 @@ func TestInactiveChain_OnSnapshotBuilt(t *testing.T) {
f.stakingKeeper.SetValidator(ctx, val)
}

queue := fmt.Sprintf("evm/%s/%s", "bob", keeper.ConsensusTurnstoneMessage)
queue := fmt.Sprintf("evm/%s/%s", "bob", types.ConsensusTurnstoneMessage)

_, err := f.valsetKeeper.TriggerSnapshotBuild(ctx)
require.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions tests/integration/evm/keeper/test_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ func initFixture(t ginkgo.FullGinkgoTInterface) *fixture {
distrKeeper,
transferKeeper,
evmKeeper,
consensusKeeper,
gravitymodulekeeper.NewGravityStoreGetter(keys[gravitymoduletypes.StoreKey]),
authtypes.NewModuleAddress(govtypes.ModuleName).String(),
authcodec.NewBech32Codec(params2.ValidatorAddressPrefix),
Expand Down
60 changes: 60 additions & 0 deletions util/eventbus/bus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package eventbus

import (
"context"
"sort"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/palomachain/paloma/util/liblog"
)

var gravityBatchBuilt = newEvent[GravityBatchBuiltEvent]()

type (
EventHandler[E any] func(context.Context, E) error
Event[E any] struct {
subscribers map[string]EventHandler[E]
}
)

func newEvent[E any]() Event[E] {
return Event[E]{
subscribers: make(map[string]EventHandler[E]),
}
}

func (e Event[E]) Publish(ctx context.Context, event E) {
keys := make([]string, 0, len(e.subscribers))
for k := range e.subscribers {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
if e.subscribers[k] != nil {
logger := liblog.FromSDKLogger(sdk.UnwrapSDKContext(ctx).Logger()).
WithComponent("eventbus").
WithFields("event", event).
WithFields("subscriber", k)
logger.Debug("Handling event")
if err := e.subscribers[k](ctx, event); err != nil {
logger.WithError(err).Error("Failed to handle event")
}
}
}
}

func (e Event[E]) Subscribe(id string, fn EventHandler[E]) {
e.subscribers[id] = fn
}

func (e Event[E]) Unsubscribe(id string) {
e.subscribers[id] = nil
}

type GravityBatchBuiltEvent struct {
ChainReferenceID string
}

func GravityBatchBuilt() *Event[GravityBatchBuiltEvent] {
return &gravityBatchBuilt
}
48 changes: 48 additions & 0 deletions util/eventbus/bus_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package eventbus_test

import (
"context"
"testing"

"cosmossdk.io/log"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/palomachain/paloma/util/eventbus"
"github.com/stretchr/testify/require"
)

func TestEventBus(t *testing.T) {
ctx := sdk.Context{}.
WithLogger(log.NewNopLogger()).
WithContext(context.Background())
eventbus.GravityBatchBuilt().Publish(ctx, eventbus.GravityBatchBuiltEvent{
ChainReferenceID: "test-chain",
})

calls := make(map[string]int)
fn := func(_ context.Context, e eventbus.GravityBatchBuiltEvent) error {
calls[e.ChainReferenceID]++
return nil
}

eventbus.GravityBatchBuilt().Subscribe("test-1", fn)
require.Len(t, calls, 0, "should be empty")

eventbus.GravityBatchBuilt().Publish(ctx, eventbus.GravityBatchBuiltEvent{
ChainReferenceID: "test-chain",
})

require.NotNil(t, calls["test-chain"], "should have executed one.")
require.Equal(t, 1, calls["test-chain"], "should have executed one.")

eventbus.GravityBatchBuilt().Subscribe("test-2", fn)
eventbus.GravityBatchBuilt().Publish(ctx, eventbus.GravityBatchBuiltEvent{
ChainReferenceID: "test-chain",
})
require.Equal(t, 3, calls["test-chain"], "should execute both subscribers.")

eventbus.GravityBatchBuilt().Unsubscribe("test-1")
eventbus.GravityBatchBuilt().Publish(ctx, eventbus.GravityBatchBuiltEvent{
ChainReferenceID: "test-chain",
})
require.Equal(t, 4, calls["test-chain"], "should have removed one subscriber.")
}
61 changes: 34 additions & 27 deletions x/consensus/keeper/concensus_keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,39 +115,47 @@ func (k Keeper) GetMessagesForSigning(ctx context.Context, queueTypeName string,
return msgs, nil
}

// GetMessagesForRelaying returns messages for a single validator to relay.
func (k Keeper) GetMessagesForRelaying(ctx context.Context, queueTypeName string, valAddress sdk.ValAddress) (msgs []types.QueuedSignedMessageI, err error) {
sdkCtx := sdk.UnwrapSDKContext(ctx)
msgs, err = k.GetMessagesFromQueue(sdkCtx, queueTypeName, 0)
// TODO: The infusion of EVM types into the consensus module is a bit of a code smell.
// We should consider moving the entire logic of message assignment and retrieval
// to the EVM module to keep the consensus module content-agnostic.
func (k Keeper) GetPendingValsetUpdates(ctx context.Context, queueTypeName string) ([]types.QueuedSignedMessageI, error) {
msgs, err := k.GetMessagesFromQueue(ctx, queueTypeName, 0)
if err != nil {
return nil, err
}

// Check for existing valset update messages on any target chains
valsetUpdatesOnChainLkUp := make(map[string]uint64)
for _, v := range msgs {
cm, err := v.ConsensusMsg(k.cdc)
msgs = slice.Filter(msgs, func(msg types.QueuedSignedMessageI) bool {
cm, err := msg.ConsensusMsg(k.cdc)
if err != nil {
liblog.FromSDKLogger(k.Logger(sdkCtx)).WithError(err).Error("Failed to get consensus msg")
continue
liblog.FromKeeper(ctx, k).WithError(err).Error("Failed to get consensus msg")
return false
}

m, ok := cm.(*evmtypes.Message)
if !ok {
continue
return false
}

action := m.GetAction()
_, ok = action.(*evmtypes.Message_UpdateValset)
if ok {
if _, found := valsetUpdatesOnChainLkUp[m.GetChainReferenceID()]; found {
// Looks like we already have a pending valset update for this chain,
// we want to keep the earlierst message ID for a valset update we found,
// so we can skip here.
continue
}
valsetUpdatesOnChainLkUp[m.GetChainReferenceID()] = v.GetId()
if _, ok = m.GetAction().(*evmtypes.Message_UpdateValset); !ok {
return false
}

return true
})

return msgs, nil
}

// GetMessagesForRelaying returns messages for a single validator to relay.
func (k Keeper) GetMessagesForRelaying(ctx context.Context, queueTypeName string, valAddress sdk.ValAddress) (msgs []types.QueuedSignedMessageI, err error) {
sdkCtx := sdk.UnwrapSDKContext(ctx)
msgs, err = k.GetMessagesFromQueue(sdkCtx, queueTypeName, 0)
if err != nil {
return nil, err
}

// Check for existing valset update messages on any target chains
valsetUpdatesOnChain, err := k.GetPendingValsetUpdates(ctx, queueTypeName)
if err != nil {
return nil, err
}

// Filter down to just messages for target chains without pending valset updates on them
Expand All @@ -159,21 +167,20 @@ func (k Keeper) GetMessagesForRelaying(ctx context.Context, queueTypeName string
return true
}

m, ok := cm.(*evmtypes.Message)
_, ok := cm.(*evmtypes.Message)
if !ok {
// NO cross chain message, just return true
return true
}

// Cross chain message for relaying, return only if no pending valset update on target chain
vuMid, found := valsetUpdatesOnChainLkUp[m.GetChainReferenceID()]
if !found {
if len(valsetUpdatesOnChain) < 1 {
return true
}

// Looks like there is a valset update for the target chain,
// only return true if this message is younger than the valset update
return msg.GetId() <= vuMid
return msg.GetId() <= valsetUpdatesOnChain[0].GetId()
})

// Filter down to just messages assigned to this validator
Expand Down
20 changes: 16 additions & 4 deletions x/evm/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
xchain "github.com/palomachain/paloma/internal/x-chain"
"github.com/palomachain/paloma/util/eventbus"
keeperutil "github.com/palomachain/paloma/util/keeper"
"github.com/palomachain/paloma/util/libcons"
"github.com/palomachain/paloma/util/liblog"
Expand All @@ -43,7 +44,6 @@ const (
)

const (
ConsensusTurnstoneMessage = "evm-turnstone-message"
ConsensusGetValidatorBalances = "validators-balances"
ConsensusGetReferenceBlock = "reference-block"
ConsensusCollectFundEvents = "collect-fund-events"
Expand All @@ -61,7 +61,7 @@ type supportedChainInfo struct {

var SupportedConsensusQueues = []supportedChainInfo{
{
subqueue: ConsensusTurnstoneMessage,
subqueue: types.ConsensusTurnstoneMessage,
batch: false,
msgType: &types.Message{},
processAttesationFunc: func(k Keeper) func(ctx context.Context, q consensus.Queuer, msg consensustypes.QueuedSignedMessageI) error {
Expand Down Expand Up @@ -149,6 +149,18 @@ func NewKeeper(
k.deploymentCache = deployment.NewCache(provideDeploymentCacheBootstrapper(k))
k.ider = keeperutil.NewIDGenerator(keeperutil.StoreGetterFn(k.provideSmartContractStore), []byte("id-key"))
k.consensusChecker = libcons.New(k.Valset.GetCurrentSnapshot, k.cdc)

eventbus.GravityBatchBuilt().Subscribe(
"gravity-keeper",
func(ctx context.Context, e eventbus.GravityBatchBuiltEvent) error {
ci, err := k.GetChainInfo(ctx, e.ChainReferenceID)
if err != nil {
return err
}

return k.justInTimeValsetUpdate(ctx, ci)
})

return k
}

Expand Down Expand Up @@ -608,7 +620,7 @@ func (m msgSender) SendValsetMsgForChain(ctx context.Context, chainInfo *types.C

// clear all other instances of the update valset from the queue
m.Logger(sdkCtx).Info("clearing previous instances of the update valset from the queue")
queueName := consensustypes.Queue(ConsensusTurnstoneMessage, xchainType, xchain.ReferenceID(chainInfo.GetChainReferenceID()))
queueName := consensustypes.Queue(types.ConsensusTurnstoneMessage, xchainType, xchain.ReferenceID(chainInfo.GetChainReferenceID()))
messages, err := m.ConsensusKeeper.GetMessagesFromQueue(ctx, queueName, 0)
if err != nil {
m.Logger(sdkCtx).Error("unable to get messages from queue", "err", err)
Expand Down Expand Up @@ -639,7 +651,7 @@ func (m msgSender) SendValsetMsgForChain(ctx context.Context, chainInfo *types.C
// put update valset message into the queue
msgID, err := m.ConsensusKeeper.PutMessageInQueue(
ctx,
consensustypes.Queue(ConsensusTurnstoneMessage, xchainType, xchain.ReferenceID(chainInfo.GetChainReferenceID())),
consensustypes.Queue(types.ConsensusTurnstoneMessage, xchainType, xchain.ReferenceID(chainInfo.GetChainReferenceID())),
&types.Message{
TurnstoneID: string(chainInfo.GetSmartContractUniqueID()),
ChainReferenceID: chainInfo.GetChainReferenceID(),
Expand Down
4 changes: 2 additions & 2 deletions x/evm/keeper/smart_contract_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (k Keeper) AddSmartContractExecutionToConsensus(
return k.ConsensusKeeper.PutMessageInQueue(
ctx,
consensustypes.Queue(
ConsensusTurnstoneMessage,
types.ConsensusTurnstoneMessage,
xchainType,
chainReferenceID,
),
Expand Down Expand Up @@ -311,7 +311,7 @@ func (k Keeper) AddUploadSmartContractToConsensus(
return k.ConsensusKeeper.PutMessageInQueue(
ctx,
consensustypes.Queue(
ConsensusTurnstoneMessage,
types.ConsensusTurnstoneMessage,
xchainType,
chainReferenceID,
),
Expand Down
2 changes: 2 additions & 0 deletions x/evm/types/types.go
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
package types

const ConsensusTurnstoneMessage = "evm-turnstone-message"
5 changes: 5 additions & 0 deletions x/gravity/keeper/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"cosmossdk.io/store/prefix"
"github.com/VolumeFi/whoops"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/palomachain/paloma/util/eventbus"
"github.com/palomachain/paloma/x/gravity/types"
)

Expand Down Expand Up @@ -77,6 +78,10 @@ func (k Keeper) BuildOutgoingTXBatch(
return nil, err
}

eventbus.GravityBatchBuilt().Publish(ctx, eventbus.GravityBatchBuiltEvent{
ChainReferenceID: chainReferenceID,
})

return batch, sdkCtx.EventManager().EmitTypedEvent(
&types.EventOutgoingBatch{
BridgeContract: bridgeContract.GetAddress().Hex(),
Expand Down
Loading

0 comments on commit 91cd199

Please sign in to comment.