Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"math"
"math/big"
"runtime"
"sort"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -305,6 +306,8 @@ type TxPool struct {
reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop

cacheAccountNeedPromoted *accountSet

addTxCh chan []*types.Transaction
}

type txpoolResetRequest struct {
Expand Down Expand Up @@ -338,6 +341,8 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain txPoo
queueTxEventCh: make(chan *types.Transaction),
reorgDoneCh: make(chan chan struct{}),
reorgShutdownCh: make(chan struct{}),
//Maintain a redundant transaction cache in the channel
addTxCh: make(chan []*types.Transaction, runtime.NumCPU()+3),
}

pool.cacheAccountNeedPromoted = newAccountSet(pool.signer)
Expand Down Expand Up @@ -370,6 +375,10 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain txPoo
pool.wg.Add(1)
go pool.loop()

for i := 0; i < runtime.NumCPU(); i++ {
go pool.parallelAddTx()
}

return pool
}

Expand Down Expand Up @@ -933,7 +942,8 @@ func (pool *TxPool) AddLocal(tx *types.Transaction) error {
// This method is used to add transactions from the p2p network and does not wait for pool
// reorganization and internal event propagation.
func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error {
return pool.addTxs(txs, false, false)
pool.addTxCh <- txs
return nil
}

// This is like AddRemotes, but waits for pool reorganization. Tests use this method.
Expand All @@ -952,10 +962,16 @@ func (pool *TxPool) addRemoteSync(tx *types.Transaction) error {
//
// Deprecated: use AddRemotes
func (pool *TxPool) AddRemote(tx *types.Transaction) error {
errs := pool.AddRemotes([]*types.Transaction{tx})
errs := pool.addTxs([]*types.Transaction{tx}, false, true)
return errs[0]
}

func (pool *TxPool) parallelAddTx() {
for task := range pool.addTxCh {
pool.addTxs(task, false, false)
}
}

// addTxs attempts to queue a batch of transactions if they are valid.
func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {
// Filter out known ones without obtaining the pool lock or recovering signatures
Expand Down
17 changes: 13 additions & 4 deletions crypto/secp256k1/secp256.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,19 @@ package secp256k1
/*
#cgo CFLAGS: -I./libsecp256k1
#cgo CFLAGS: -I./libsecp256k1/src/

#ifdef __SIZEOF_INT128__
# define HAVE___INT128
# define USE_FIELD_5X52
# define USE_SCALAR_4X64
#else
# define USE_FIELD_10X26
# define USE_SCALAR_8X32
#endif

#define USE_ENDOMORPHISM
#define USE_NUM_NONE
#define USE_FIELD_10X26
#define USE_FIELD_INV_BUILTIN
#define USE_SCALAR_8X32
#define USE_SCALAR_INV_BUILTIN
#define NDEBUG
#include "./libsecp256k1/src/secp256k1.c"
Expand All @@ -28,6 +37,7 @@ import (
"errors"
"math/big"
"unsafe"

"github.com/PlatONnetwork/PlatON-Go/common/math"
)

Expand Down Expand Up @@ -173,6 +183,5 @@ func PubkeyNotInfinity(x, y *big.Int) bool {
math.ReadBits(y, point[32:])
pointPtr := (*C.uchar)(unsafe.Pointer(&point[0]))
res := C.secp256k1_pubkey_is_infinity(context, pointPtr)
return res ==0
return res == 0
}

4 changes: 2 additions & 2 deletions eth/api_tps_cal.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ func AnalyzeStressTest(configPaths []string, output string, t int) error {
latencyCell := row.AddCell()
latencyCell.Value = strconv.FormatInt(d[1], 10)
tpsCell := row.AddCell()
tpsCell.Value = strconv.FormatInt(d[2], 10)
tpsCell.SetInt64(d[2])
ttfCell := row.AddCell()
ttfCell.Value = strconv.FormatInt(d[3], 10)
ttfCell.SetInt64(d[3])
if i == 0 {
totalReceive := row.AddCell()
totalReceive.Value = strconv.FormatInt(int64(total), 10)
Expand Down
3 changes: 1 addition & 2 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
p.MarkTransaction(tx.Hash())
}

go pm.txpool.AddRemotes(txs)
pm.txpool.AddRemotes(txs)

default:
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
Expand Down Expand Up @@ -871,7 +871,6 @@ func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) {
txset[peer] = append(txset[peer], tx)
}
} else {
rand.Seed(time.Now().UnixNano())
indexes := rand.Perm(len(peers))
for i := 0; i < numBroadcastTxPeers; i++ {
peer := peers[indexes[i]]
Expand Down