diff --git a/eth/handler_test.go b/eth/handler_test.go index 9c91a1a9cf..03cbe7803d 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -300,7 +300,9 @@ func newTestParliaHandlerAfterCancun(t *testing.T, config *params.ChainConfig, m Alloc: types.GenesisAlloc{testAddr: {Balance: new(big.Int).SetUint64(10 * params.Ether)}}, } engine := &mockParlia{} - chain, _ := core.NewBlockChain(db, gspec, engine, nil) + cfg := core.DefaultConfig() + cfg.StateScheme = rawdb.PathScheme + chain, _ := core.NewBlockChain(db, gspec, engine, cfg) signer := types.LatestSigner(config) _, bs, _ := core.GenerateChainWithGenesis(gspec, engine, int(preCancunBlks+postCancunBlks), func(i int, gen *core.BlockGen) { diff --git a/eth/sync_test.go b/eth/sync_test.go index cd6461c102..7bc5667437 100644 --- a/eth/sync_test.go +++ b/eth/sync_test.go @@ -138,7 +138,7 @@ func testChainSyncWithBlobs(t *testing.T, mode downloader.SyncMode, preCancunBlk // Sync up the two handlers via both `eth` and `snap` caps := []p2p.Cap{{Name: "eth", Version: ethVer}, {Name: "snap", Version: snapVer}} - emptyPipeEth, fullPipeEth := p2p.MsgPipe() + emptyPipeEth, fullPipeEth := p2p.MsgPipe(true) defer emptyPipeEth.Close() defer fullPipeEth.Close() @@ -154,7 +154,7 @@ func testChainSyncWithBlobs(t *testing.T, mode downloader.SyncMode, preCancunBlk return eth.Handle((*ethHandler)(full.handler), peer) }) - emptyPipeSnap, fullPipeSnap := p2p.MsgPipe() + emptyPipeSnap, fullPipeSnap := p2p.MsgPipe(true) defer emptyPipeSnap.Close() defer fullPipeSnap.Close() diff --git a/p2p/message.go b/p2p/message.go index 3ab56ee350..c61409cd8b 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -152,13 +152,21 @@ func (r *eofSignal) Read(buf []byte) (int, error) { // MsgPipe creates a message pipe. Reads on one end are matched // with writes on the other. The pipe is full-duplex, both ends // implement MsgReadWriter. -func MsgPipe() (*MsgPipeRW, *MsgPipeRW) { +func MsgPipe(args ...any) (*MsgPipeRW, *MsgPipeRW) { + noBlock := false + if len(args) > 0 { + noBlock = args[0].(bool) + } + c1, c2 := make(chan Msg), make(chan Msg) + if noBlock { + c1 = make(chan Msg, 1) + c2 = make(chan Msg, 1) + } var ( - c1, c2 = make(chan Msg), make(chan Msg) closing = make(chan struct{}) closed = new(atomic.Bool) - rw1 = &MsgPipeRW{c1, c2, closing, closed} - rw2 = &MsgPipeRW{c2, c1, closing, closed} + rw1 = &MsgPipeRW{c1, c2, closing, closed, noBlock} + rw2 = &MsgPipeRW{c2, c1, closing, closed, noBlock} ) return rw1, rw2 } @@ -173,6 +181,7 @@ type MsgPipeRW struct { r <-chan Msg closing chan struct{} closed *atomic.Bool + noBlock bool } // WriteMsg sends a message on the pipe. @@ -183,6 +192,9 @@ func (p *MsgPipeRW) WriteMsg(msg Msg) error { msg.Payload = &eofSignal{msg.Payload, msg.Size, consumed} select { case p.w <- msg: + if p.noBlock { + return nil + } if msg.Size > 0 { // wait for payload read or discard select {