Skip to content

Commit

Permalink
fix: dedup when listened jobs are duplicated (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
DNK90 authored Aug 22, 2022
1 parent 9d6ea6e commit be16481
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 3 deletions.
25 changes: 22 additions & 3 deletions controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,23 @@ func (c *Controller) prepareJob(job JobHandler) error {
if job == nil {
return nil
}
// deduplication: get hash from data and type and check if it exists in `processedJobs` or not.
hash := c.utilWrapper.RlpHash(struct {
Data []byte
Type int
}{
Data: job.GetData(),
Type: job.GetType(),
})
if _, ok := c.processedJobs.Load(hash); ok {
return nil
}
// save job to db if id = 0
if job.GetID() == 0 {
return job.Save()
}
// cache above hash to `processedJobs`
c.processedJobs.Store(hash, struct{}{})
return nil
}

Expand Down Expand Up @@ -408,7 +422,7 @@ func (c *Controller) startListening(listener Listener, tryCount int) {
log.Error("[Controller][startListener] error while get latest block", "err", err, "listener", listener.GetName())
// otherwise retry startListener
time.Sleep(time.Duration(tryCount+1) * time.Second)
go c.startListening(listener, tryCount+1)
c.startListening(listener, tryCount+1)
return
}
// reset fromHeight if it is out of allowed blocks range
Expand All @@ -422,7 +436,8 @@ func (c *Controller) startListening(listener Listener, tryCount int) {
if err := c.processBehindBlock(listener, currentBlock.GetHeight(), latestBlockHeight); err != nil {
log.Error("[Controller][startListener] error while processing behind block", "err", err, "height", currentBlock.GetHeight(), "latestBlockHeight", latestBlockHeight)
time.Sleep(time.Duration(tryCount+1) * time.Second)
go c.startListening(listener, tryCount+1)
c.startListening(listener, tryCount+1)
return
}
}
// start listening to block's events
Expand Down Expand Up @@ -496,7 +511,11 @@ func (c *Controller) processBatchLogs(listener Listener, fromHeight, toHeight ui
var (
contractAddresses []common.Address
)
chainId, _ := listener.GetChainID()
chainId, err := listener.GetChainID()
if err != nil {
log.Error("[Controller][processBatchLogs] error while getting chainID", "err", err, "listener", listener.GetName())
return fromHeight
}
addedContract := make(map[common.Address]struct{})
filteredMethods := make(map[*abi.ABI]map[string]struct{})
eventIds := make(map[common.Hash]string)
Expand Down
19 changes: 19 additions & 0 deletions utils/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@ import (
"context"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"
"golang.org/x/crypto/sha3"
"math/big"
"os"
"reflect"
"sync"

kmsUtils "github.com/axieinfinity/ronin-kms-client/utils"
"github.com/ethereum/go-ethereum"
Expand All @@ -21,6 +25,11 @@ import (
"golang.org/x/text/language"
)

// hasherPool holds LegacyKeccak256 hashers for rlpHash.
var hasherPool = sync.Pool{
New: func() interface{} { return sha3.NewLegacyKeccak256() },
}

type EthClient interface {
ethereum.ChainReader
ethereum.TransactionReader
Expand All @@ -43,6 +52,7 @@ type Utils interface {
SendContractTransaction(signMethod ISign, chainId *big.Int, fn func(opts *bind.TransactOpts) (*types.Transaction, error)) (*types.Transaction, error)
SignTypedData(typedData core.TypedData, signMethod ISign) (hexutil.Bytes, error)
FilterLogs(client EthClient, opts *bind.FilterOpts, contractAddresses []common.Address, filteredMethods map[*abi.ABI]map[string]struct{}) ([]types.Log, error)
RlpHash(x interface{}) (h common.Hash)
}

type utils struct{}
Expand Down Expand Up @@ -228,3 +238,12 @@ func (u *utils) FilterLogs(client EthClient, opts *bind.FilterOpts, contractAddr
}
return client.FilterLogs(opts.Context, config)
}

func (u *utils) RlpHash(x interface{}) (h common.Hash) {
sha := hasherPool.Get().(crypto.KeccakState)
defer hasherPool.Put(sha)
sha.Reset()
rlp.Encode(sha, x)
sha.Read(h[:])
return h
}

0 comments on commit be16481

Please sign in to comment.