Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/common/prque"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
Expand All @@ -43,7 +44,6 @@ import (
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
"github.com/hashicorp/golang-lru"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)

var (
Expand Down Expand Up @@ -151,7 +151,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
chainConfig: chainConfig,
cacheConfig: cacheConfig,
db: db,
triegc: prque.New(),
triegc: prque.New(nil),
stateCache: state.NewDatabase(db),
quit: make(chan struct{}),
bodyCache: bodyCache,
Expand Down Expand Up @@ -915,7 +915,7 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
} else {
// Full but not archive node, do proper garbage collection
triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive
bc.triegc.Push(root, -float32(block.NumberU64()))
bc.triegc.Push(root, -int64(block.NumberU64()))

if current := block.NumberU64(); current > triesInMemory {
// If we exceeded our memory allowance, flush matured singleton nodes to disk
Expand Down
6 changes: 3 additions & 3 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/prque"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)

const (
Expand Down Expand Up @@ -987,11 +987,11 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
if pending > pool.config.GlobalSlots {
pendingBeforeCap := pending
// Assemble a spam order to penalize large transactors first
spammers := prque.New()
spammers := prque.New(nil)
for addr, list := range pool.pending {
// Only evict transactions from high rollers
if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
spammers.Push(addr, float32(list.Len()))
spammers.Push(addr, int64(list.Len()))
}
}
// Gradually drop transactions from offenders
Expand Down
34 changes: 17 additions & 17 deletions eth/downloader/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/prque"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)

var (
Expand Down Expand Up @@ -105,11 +105,11 @@ func newQueue() *queue {
headerPendPool: make(map[string]*fetchRequest),
headerContCh: make(chan bool),
blockTaskPool: make(map[common.Hash]*types.Header),
blockTaskQueue: prque.New(),
blockTaskQueue: prque.New(nil),
blockPendPool: make(map[string]*fetchRequest),
blockDonePool: make(map[common.Hash]struct{}),
receiptTaskPool: make(map[common.Hash]*types.Header),
receiptTaskQueue: prque.New(),
receiptTaskQueue: prque.New(nil),
receiptPendPool: make(map[string]*fetchRequest),
receiptDonePool: make(map[common.Hash]struct{}),
resultCache: make([]*fetchResult, blockCacheItems),
Expand Down Expand Up @@ -277,7 +277,7 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
}
// Schedule all the header retrieval tasks for the skeleton assembly
q.headerTaskPool = make(map[uint64]*types.Header)
q.headerTaskQueue = prque.New()
q.headerTaskQueue = prque.New(nil)
q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains
q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch)
q.headerProced = 0
Expand All @@ -288,7 +288,7 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
index := from + uint64(i*MaxHeaderFetch)

q.headerTaskPool[index] = header
q.headerTaskQueue.Push(index, -float32(index))
q.headerTaskQueue.Push(index, -int64(index))
}
}

Expand Down Expand Up @@ -334,11 +334,11 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
}
// Queue the header for content retrieval
q.blockTaskPool[hash] = header
q.blockTaskQueue.Push(header, -float32(header.Number.Uint64()))
q.blockTaskQueue.Push(header, -int64(header.Number.Uint64()))

if q.mode == FastSync {
q.receiptTaskPool[hash] = header
q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64()))
q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64()))
}
inserts = append(inserts, header)
q.headerHead = hash
Expand Down Expand Up @@ -436,7 +436,7 @@ func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest {
}
// Merge all the skipped batches back
for _, from := range skip {
q.headerTaskQueue.Push(from, -float32(from))
q.headerTaskQueue.Push(from, -int64(from))
}
// Assemble and return the block download request
if send == 0 {
Expand Down Expand Up @@ -542,7 +542,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common
}
// Merge all the skipped headers back
for _, header := range skip {
taskQueue.Push(header, -float32(header.Number.Uint64()))
taskQueue.Push(header, -int64(header.Number.Uint64()))
}
if progress {
// Wake WaitResults, resultCache was modified
Expand Down Expand Up @@ -585,10 +585,10 @@ func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool m
defer q.lock.Unlock()

if request.From > 0 {
taskQueue.Push(request.From, -float32(request.From))
taskQueue.Push(request.From, -int64(request.From))
}
for _, header := range request.Headers {
taskQueue.Push(header, -float32(header.Number.Uint64()))
taskQueue.Push(header, -int64(header.Number.Uint64()))
}
delete(pendPool, request.Peer.id)
}
Expand All @@ -602,13 +602,13 @@ func (q *queue) Revoke(peerID string) {

if request, ok := q.blockPendPool[peerID]; ok {
for _, header := range request.Headers {
q.blockTaskQueue.Push(header, -float32(header.Number.Uint64()))
q.blockTaskQueue.Push(header, -int64(header.Number.Uint64()))
}
delete(q.blockPendPool, peerID)
}
if request, ok := q.receiptPendPool[peerID]; ok {
for _, header := range request.Headers {
q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64()))
q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64()))
}
delete(q.receiptPendPool, peerID)
}
Expand Down Expand Up @@ -657,10 +657,10 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest,

// Return any non satisfied requests to the pool
if request.From > 0 {
taskQueue.Push(request.From, -float32(request.From))
taskQueue.Push(request.From, -int64(request.From))
}
for _, header := range request.Headers {
taskQueue.Push(header, -float32(header.Number.Uint64()))
taskQueue.Push(header, -int64(header.Number.Uint64()))
}
// Add the peer to the expiry report along the the number of failed requests
expiries[id] = len(request.Headers)
Expand Down Expand Up @@ -731,7 +731,7 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh
}
miss[request.From] = struct{}{}

q.headerTaskQueue.Push(request.From, -float32(request.From))
q.headerTaskQueue.Push(request.From, -int64(request.From))
return 0, errors.New("delivery not accepted")
}
// Clean up a successful fetch and try to deliver any sub-results
Expand Down Expand Up @@ -854,7 +854,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
// Return all failed or missing fetches to the queue
for _, header := range request.Headers {
if header != nil {
taskQueue.Push(header, -float32(header.Number.Uint64()))
taskQueue.Push(header, -int64(header.Number.Uint64()))
}
}
// Wake up WaitResults
Expand Down
8 changes: 4 additions & 4 deletions eth/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/prque"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)

const (
Expand Down Expand Up @@ -160,7 +160,7 @@ func New(getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBloc
fetching: make(map[common.Hash]*announce),
fetched: make(map[common.Hash][]*announce),
completing: make(map[common.Hash]*announce),
queue: prque.New(),
queue: prque.New(nil),
queues: make(map[string]int),
queued: make(map[common.Hash]*inject),
getBlock: getBlock,
Expand Down Expand Up @@ -299,7 +299,7 @@ func (f *Fetcher) loop() {
// If too high up the chain or phase, continue later
number := op.block.NumberU64()
if number > height+1 {
f.queue.Push(op, -float32(number))
f.queue.Push(op, -int64(number))
if f.queueChangeHook != nil {
f.queueChangeHook(hash, true)
}
Expand Down Expand Up @@ -624,7 +624,7 @@ func (f *Fetcher) enqueue(peer string, block *types.Block) {
}
f.queues[peer] = count
f.queued[hash] = op
f.queue.Push(op, -float32(block.NumberU64()))
f.queue.Push(op, -int64(block.NumberU64()))
if f.queueChangeHook != nil {
f.queueChangeHook(op.block.Hash(), true)
}
Expand Down
6 changes: 3 additions & 3 deletions trie/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"fmt"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/prque"
"github.com/ethereum/go-ethereum/ethdb"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)

// ErrNotRequested is returned by the trie sync when it's requested to process a
Expand Down Expand Up @@ -84,7 +84,7 @@ func NewSync(root common.Hash, database DatabaseReader, callback LeafCallback) *
database: database,
membatch: newSyncMemBatch(),
requests: make(map[common.Hash]*request),
queue: prque.New(),
queue: prque.New(nil),
}
ts.AddSubTrie(root, 0, common.Hash{}, callback)
return ts
Expand Down Expand Up @@ -242,7 +242,7 @@ func (s *Sync) schedule(req *request) {
return
}
// Schedule the request for future retrieval
s.queue.Push(req.hash, float32(req.depth))
s.queue.Push(req.hash, int64(req.depth))
s.requests[req.hash] = req
}

Expand Down
25 changes: 0 additions & 25 deletions vendor/gopkg.in/karalabe/cookiejar.v2/LICENSE

This file was deleted.

Loading