Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,9 @@ func newTestParliaHandlerAfterCancun(t *testing.T, config *params.ChainConfig, m
Alloc: types.GenesisAlloc{testAddr: {Balance: new(big.Int).SetUint64(10 * params.Ether)}},
}
engine := &mockParlia{}
chain, _ := core.NewBlockChain(db, gspec, engine, nil)
cfg := core.DefaultConfig()
cfg.StateScheme = rawdb.PathScheme
chain, _ := core.NewBlockChain(db, gspec, engine, cfg)
signer := types.LatestSigner(config)

_, bs, _ := core.GenerateChainWithGenesis(gspec, engine, int(preCancunBlks+postCancunBlks), func(i int, gen *core.BlockGen) {
Expand Down
4 changes: 2 additions & 2 deletions eth/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func testChainSyncWithBlobs(t *testing.T, mode downloader.SyncMode, preCancunBlk
// Sync up the two handlers via both `eth` and `snap`
caps := []p2p.Cap{{Name: "eth", Version: ethVer}, {Name: "snap", Version: snapVer}}

emptyPipeEth, fullPipeEth := p2p.MsgPipe()
emptyPipeEth, fullPipeEth := p2p.MsgPipe(true)
defer emptyPipeEth.Close()
defer fullPipeEth.Close()

Expand All @@ -154,7 +154,7 @@ func testChainSyncWithBlobs(t *testing.T, mode downloader.SyncMode, preCancunBlk
return eth.Handle((*ethHandler)(full.handler), peer)
})

emptyPipeSnap, fullPipeSnap := p2p.MsgPipe()
emptyPipeSnap, fullPipeSnap := p2p.MsgPipe(true)
defer emptyPipeSnap.Close()
defer fullPipeSnap.Close()

Expand Down
20 changes: 16 additions & 4 deletions p2p/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,21 @@ func (r *eofSignal) Read(buf []byte) (int, error) {
// MsgPipe creates a message pipe. Reads on one end are matched
// with writes on the other. The pipe is full-duplex, both ends
// implement MsgReadWriter.
func MsgPipe() (*MsgPipeRW, *MsgPipeRW) {
func MsgPipe(args ...any) (*MsgPipeRW, *MsgPipeRW) {
noBlock := false
if len(args) > 0 {
noBlock = args[0].(bool)
}
c1, c2 := make(chan Msg), make(chan Msg)
if noBlock {
c1 = make(chan Msg, 1)
c2 = make(chan Msg, 1)
}
var (
c1, c2 = make(chan Msg), make(chan Msg)
closing = make(chan struct{})
closed = new(atomic.Bool)
rw1 = &MsgPipeRW{c1, c2, closing, closed}
rw2 = &MsgPipeRW{c2, c1, closing, closed}
rw1 = &MsgPipeRW{c1, c2, closing, closed, noBlock}
rw2 = &MsgPipeRW{c2, c1, closing, closed, noBlock}
)
return rw1, rw2
}
Expand All @@ -173,6 +181,7 @@ type MsgPipeRW struct {
r <-chan Msg
closing chan struct{}
closed *atomic.Bool
noBlock bool
}

// WriteMsg sends a message on the pipe.
Expand All @@ -183,6 +192,9 @@ func (p *MsgPipeRW) WriteMsg(msg Msg) error {
msg.Payload = &eofSignal{msg.Payload, msg.Size, consumed}
select {
case p.w <- msg:
if p.noBlock {
return nil
}
if msg.Size > 0 {
// wait for payload read or discard
select {
Expand Down