diff --git a/p2p/protocols/protocol.go b/p2p/protocols/protocol.go index 1600a11f99..164e3fa4b0 100644 --- a/p2p/protocols/protocol.go +++ b/p2p/protocols/protocol.go @@ -243,7 +243,7 @@ func (p *Peer) Run(handler func(ctx context.Context, msg interface{}) error) err // Drop disconnects a peer. // TODO: may need to implement protocol drop only? don't want to kick off the peer // if they are useful for other protocols -func (p *Peer) Drop(err error) { +func (p *Peer) Drop() { p.Disconnect(p2p.DiscSubprotocolError) } @@ -291,7 +291,7 @@ func (p *Peer) Send(ctx context.Context, msg interface{}) error { if p.spec.Hook != nil { err := p.spec.Hook.Send(p, wmsg.Size, msg) if err != nil { - p.Drop(err) + p.Drop() return err } } diff --git a/p2p/protocols/protocol_test.go b/p2p/protocols/protocol_test.go index 9ac76ea2fd..6d5ea8b923 100644 --- a/p2p/protocols/protocol_test.go +++ b/p2p/protocols/protocol_test.go @@ -126,7 +126,7 @@ func newProtocol(pp *p2ptest.TestPeerPool) func(*p2p.Peer, p2p.MsgReadWriter) er case *kill: // demonstrates use of peerPool, killing another peer connection as a response to a message id := msg.C - pp.Get(id).Drop(errors.New("killed")) + pp.Get(id).Drop() return nil case *drop: diff --git a/p2p/testing/peerpool.go b/p2p/testing/peerpool.go index 01ccce67eb..09db4b2466 100644 --- a/p2p/testing/peerpool.go +++ b/p2p/testing/peerpool.go @@ -26,7 +26,7 @@ import ( type TestPeer interface { ID() enode.ID - Drop(error) + Drop() } // TestPeerPool is an example peerPool to demonstrate registration of peer connections diff --git a/swarm/network/hive.go b/swarm/network/hive.go index a0b6b988ab..14cd8cf750 100644 --- a/swarm/network/hive.go +++ b/swarm/network/hive.go @@ -116,7 +116,7 @@ func (h *Hive) Stop() error { log.Info(fmt.Sprintf("%08x hive stopped, dropping peers", h.BaseAddr()[:4])) h.EachConn(nil, 255, func(p *Peer, _ int) bool { log.Info(fmt.Sprintf("%08x dropping peer %08x", h.BaseAddr()[:4], p.Address()[:4])) - p.Drop(nil) + p.Drop() return true }) diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go index 0596667239..a11e6a1e41 100644 --- a/swarm/network/stream/delivery.go +++ b/swarm/network/stream/delivery.go @@ -33,11 +33,6 @@ import ( olog "github.com/opentracing/opentracing-go/log" ) -const ( - swarmChunkServerStreamName = "RETRIEVE_REQUEST" - deliveryCap = 32 -) - var ( processReceivedChunksCount = metrics.NewRegisteredCounter("network.stream.received_chunks.count", nil) handleRetrieveRequestMsgCount = metrics.NewRegisteredCounter("network.stream.handle_retrieve_request_msg.count", nil) @@ -51,85 +46,15 @@ type Delivery struct { chunkStore chunk.FetchStore kad *network.Kademlia getPeer func(enode.ID) *Peer + quit chan struct{} } func NewDelivery(kad *network.Kademlia, chunkStore chunk.FetchStore) *Delivery { return &Delivery{ chunkStore: chunkStore, kad: kad, - } -} - -// SwarmChunkServer implements Server -type SwarmChunkServer struct { - deliveryC chan []byte - batchC chan []byte - chunkStore storage.ChunkStore - currentLen uint64 - quit chan struct{} -} - -// NewSwarmChunkServer is SwarmChunkServer constructor -func NewSwarmChunkServer(chunkStore storage.ChunkStore) *SwarmChunkServer { - s := &SwarmChunkServer{ - deliveryC: make(chan []byte, deliveryCap), - batchC: make(chan []byte), - chunkStore: chunkStore, quit: make(chan struct{}), } - go s.processDeliveries() - return s -} - -// processDeliveries handles delivered chunk hashes -func (s *SwarmChunkServer) processDeliveries() { - var hashes []byte - var batchC chan []byte - for { - select { - case <-s.quit: - return - case hash := <-s.deliveryC: - hashes = append(hashes, hash...) - batchC = s.batchC - case batchC <- hashes: - hashes = nil - batchC = nil - } - } -} - -// SessionIndex returns zero in all cases for SwarmChunkServer. -func (s *SwarmChunkServer) SessionIndex() (uint64, error) { - return 0, nil -} - -// SetNextBatch -func (s *SwarmChunkServer) SetNextBatch(_, _ uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error) { - select { - case hashes = <-s.batchC: - case <-s.quit: - return - } - - from = s.currentLen - s.currentLen += uint64(len(hashes)) - to = s.currentLen - return -} - -// Close needs to be called on a stream server -func (s *SwarmChunkServer) Close() { - close(s.quit) -} - -// GetData retrieves chunk data from db store -func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, error) { - ch, err := s.chunkStore.Get(ctx, chunk.ModeGetRequest, storage.Address(key)) - if err != nil { - return nil, err - } - return ch.Data(), nil } // RetrieveRequestMsg is the protocol msg for chunk retrieve requests @@ -150,12 +75,6 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * osp.LogFields(olog.String("ref", req.Addr.String())) - s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", true)) - if err != nil { - return err - } - streamer := s.Server.(*SwarmChunkServer) - var cancel func() // TODO: do something with this hardcoded timeout, maybe use TTL in the future ctx = context.WithValue(ctx, "peer", sp.ID().String()) @@ -165,7 +84,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * go func() { select { case <-ctx.Done(): - case <-streamer.quit: + case <-d.quit: } cancel() }() @@ -178,23 +97,13 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * log.Debug("ChunkStore.Get can not retrieve chunk", "peer", sp.ID().String(), "addr", req.Addr, "hopcount", req.HopCount, "err", err) return } - if req.SkipCheck { - syncing := false - osp.LogFields(olog.Bool("skipCheck", true)) + syncing := false - err = sp.Deliver(ctx, ch, s.priority, syncing) - if err != nil { - log.Warn("ERROR in handleRetrieveRequestMsg", "err", err) - } - osp.LogFields(olog.Bool("delivered", true)) - return - } - osp.LogFields(olog.Bool("skipCheck", false)) - select { - case streamer.deliveryC <- ch.Address()[:]: - case <-streamer.quit: + err = sp.Deliver(ctx, ch, Top, syncing) + if err != nil { + log.Warn("ERROR in handleRetrieveRequestMsg", "err", err) } - + osp.LogFields(olog.Bool("delivered", true)) }() return nil @@ -244,22 +153,16 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int case *ChunkDeliveryMsgSyncing: msg = (*ChunkDeliveryMsg)(r) mode = chunk.ModePutSync + case *ChunkDeliveryMsg: + msg = r + mode = chunk.ModePutSync } - // retrieve the span for the originating retrieverequest - spanID := fmt.Sprintf("stream.send.request.%v.%v", sp.ID(), msg.Addr) - span := tracing.ShiftSpanByKey(spanID) - log.Trace("handle.chunk.delivery", "ref", msg.Addr, "from peer", sp.ID()) go func() { defer osp.Finish() - if span != nil { - span.LogFields(olog.String("finish", "from handleChunkDeliveryMsg")) - defer span.Finish() - } - msg.peer = sp log.Trace("handle.chunk.delivery", "put", msg.Addr) _, err := d.chunkStore.Put(ctx, mode, storage.NewChunk(msg.Addr, msg.SData)) @@ -268,7 +171,7 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int // we removed this log because it spams the logs // TODO: Enable this log line // log.Warn("invalid chunk delivered", "peer", sp.ID(), "chunk", msg.Addr, ) - msg.peer.Drop(err) + msg.peer.Drop() } } log.Trace("handle.chunk.delivery", "done put", msg.Addr, "err", err) @@ -276,6 +179,12 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int return nil } +func (d *Delivery) Close() { + d.kad.CloseNeighbourhoodDepthC() + d.kad.CloseAddrCountC() + close(d.quit) +} + // RequestFromPeers sends a chunk retrieve request to a peer // The most eligible peer that hasn't already been sent to is chosen // TODO: define "eligible" diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index 801e6d98ac..4037243c17 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -41,64 +41,11 @@ import ( "github.com/ethereum/go-ethereum/swarm/testutil" ) -//Tests initializing a retrieve request -func TestStreamerRetrieveRequest(t *testing.T) { - regOpts := &RegistryOptions{ - Retrieval: RetrievalClientOnly, - Syncing: SyncingDisabled, - } - tester, streamer, _, teardown, err := newStreamerTester(regOpts) - if err != nil { - t.Fatal(err) - } - defer teardown() - - node := tester.Nodes[0] - - ctx := context.Background() - req := network.NewRequest( - storage.Address(hash0[:]), - true, - &sync.Map{}, - ) - streamer.delivery.RequestFromPeers(ctx, req) - - stream := NewStream(swarmChunkServerStreamName, "", true) - - err = tester.TestExchanges(p2ptest.Exchange{ - Label: "RetrieveRequestMsg", - Expects: []p2ptest.Expect{ - { //start expecting a subscription for RETRIEVE_REQUEST due to `RetrievalClientOnly` - Code: 4, - Msg: &SubscribeMsg{ - Stream: stream, - History: nil, - Priority: Top, - }, - Peer: node.ID(), - }, - { //expect a retrieve request message for the given hash - Code: 5, - Msg: &RetrieveRequestMsg{ - Addr: hash0[:], - SkipCheck: true, - }, - Peer: node.ID(), - }, - }, - }) - - if err != nil { - t.Fatalf("Expected no error, got %v", err) - } -} - //Test requesting a chunk from a peer then issuing a "empty" OfferedHashesMsg (no hashes available yet) //Should time out as the peer does not have the chunk (no syncing happened previously) func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{ - Retrieval: RetrievalEnabled, - Syncing: SyncingDisabled, //do no syncing + tester, _, _, teardown, err := newStreamerTester(&RegistryOptions{ + Syncing: SyncingDisabled, //do no syncing }) if err != nil { t.Fatal(err) @@ -109,30 +56,8 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { chunk := storage.NewChunk(storage.Address(hash0[:]), nil) - peer := streamer.getPeer(node.ID()) - - stream := NewStream(swarmChunkServerStreamName, "", true) - //simulate pre-subscription to RETRIEVE_REQUEST stream on peer - peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{ - Stream: stream, - History: nil, - Priority: Top, - }) - //test the exchange err = tester.TestExchanges(p2ptest.Exchange{ - Expects: []p2ptest.Expect{ - { //first expect a subscription to the RETRIEVE_REQUEST stream - Code: 4, - Msg: &SubscribeMsg{ - Stream: stream, - History: nil, - Priority: Top, - }, - Peer: node.ID(), - }, - }, - }, p2ptest.Exchange{ Label: "RetrieveRequestMsg", Triggers: []p2ptest.Trigger{ { //then the actual RETRIEVE_REQUEST.... @@ -159,7 +84,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { //should fail with a timeout as the peer we are requesting //the chunk from does not have the chunk - expectedError := `exchange #1 "RetrieveRequestMsg": timed out` + expectedError := `exchange #0 "RetrieveRequestMsg": timed out` if err == nil || err.Error() != expectedError { t.Fatalf("Expected error %v, got %v", expectedError, err) } @@ -168,9 +93,8 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { // upstream request server receives a retrieve Request and responds with // offered hashes or delivery if skipHash is set to true func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { - tester, streamer, localStore, teardown, err := newStreamerTester(&RegistryOptions{ - Retrieval: RetrievalEnabled, - Syncing: SyncingDisabled, + tester, _, localStore, teardown, err := newStreamerTester(&RegistryOptions{ + Syncing: SyncingDisabled, }) if err != nil { t.Fatal(err) @@ -179,36 +103,14 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { node := tester.Nodes[0] - peer := streamer.getPeer(node.ID()) - - stream := NewStream(swarmChunkServerStreamName, "", true) - - peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{ - Stream: stream, - History: nil, - Priority: Top, - }) - - hash := storage.Address(hash0[:]) - ch := storage.NewChunk(hash, hash) + hash := storage.Address(hash1[:]) + ch := storage.NewChunk(hash, hash1[:]) _, err = localStore.Put(context.TODO(), chunk.ModePutUpload, ch) if err != nil { t.Fatalf("Expected no err got %v", err) } err = tester.TestExchanges(p2ptest.Exchange{ - Expects: []p2ptest.Expect{ - { - Code: 4, - Msg: &SubscribeMsg{ - Stream: stream, - History: nil, - Priority: Top, - }, - Peer: node.ID(), - }, - }, - }, p2ptest.Exchange{ Label: "RetrieveRequestMsg", Triggers: []p2ptest.Trigger{ { @@ -219,53 +121,12 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { Peer: node.ID(), }, }, - Expects: []p2ptest.Expect{ - { - Code: 1, - Msg: &OfferedHashesMsg{ - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, - Hashes: hash, - From: 0, - // TODO: why is this 32??? - To: 32, - Stream: stream, - }, - Peer: node.ID(), - }, - }, - }) - - if err != nil { - t.Fatal(err) - } - - hash = storage.Address(hash1[:]) - ch = storage.NewChunk(hash, hash1[:]) - _, err = localStore.Put(context.TODO(), chunk.ModePutUpload, ch) - if err != nil { - t.Fatalf("Expected no err got %v", err) - } - - err = tester.TestExchanges(p2ptest.Exchange{ - Label: "RetrieveRequestMsg", - Triggers: []p2ptest.Trigger{ - { - Code: 5, - Msg: &RetrieveRequestMsg{ - Addr: hash, - SkipCheck: true, - }, - Peer: node.ID(), - }, - }, Expects: []p2ptest.Expect{ { Code: 6, Msg: &ChunkDeliveryMsg{ - Addr: hash, - SData: hash, + Addr: ch.Address(), + SData: ch.Data(), }, Peer: node.ID(), }, @@ -359,8 +220,7 @@ func TestRequestFromPeersWithLightNode(t *testing.T) { func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { tester, streamer, localStore, teardown, err := newStreamerTester(&RegistryOptions{ - Retrieval: RetrievalDisabled, - Syncing: SyncingDisabled, + Syncing: SyncingDisabled, }) if err != nil { t.Fatal(err) @@ -472,7 +332,6 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: skipCheck, Syncing: SyncingDisabled, - Retrieval: RetrievalEnabled, }, nil) bucket.Store(bucketKeyRegistry, r) @@ -623,7 +482,6 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: skipCheck, Syncing: SyncingDisabled, - Retrieval: RetrievalDisabled, SyncUpdateDelay: 0, }, nil) bucket.Store(bucketKeyRegistry, r) diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go index 1f2cdcada7..f7b8349938 100644 --- a/swarm/network/stream/intervals_test.go +++ b/swarm/network/stream/intervals_test.go @@ -67,7 +67,6 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { } r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - Retrieval: RetrievalDisabled, Syncing: SyncingRegisterOnly, SkipCheck: skipCheck, }, nil) diff --git a/swarm/network/stream/lightnode_test.go b/swarm/network/stream/lightnode_test.go index 501660fab4..eb4e73d475 100644 --- a/swarm/network/stream/lightnode_test.go +++ b/swarm/network/stream/lightnode_test.go @@ -21,95 +21,11 @@ import ( p2ptest "github.com/ethereum/go-ethereum/p2p/testing" ) -// This test checks the default behavior of the server, that is -// when it is serving Retrieve requests. -func TestLigthnodeRetrieveRequestWithRetrieve(t *testing.T) { - registryOptions := &RegistryOptions{ - Retrieval: RetrievalClientOnly, - Syncing: SyncingDisabled, - } - tester, _, _, teardown, err := newStreamerTester(registryOptions) - if err != nil { - t.Fatal(err) - } - defer teardown() - - node := tester.Nodes[0] - - stream := NewStream(swarmChunkServerStreamName, "", false) - - err = tester.TestExchanges(p2ptest.Exchange{ - Label: "SubscribeMsg", - Triggers: []p2ptest.Trigger{ - { - Code: 4, - Msg: &SubscribeMsg{ - Stream: stream, - }, - Peer: node.ID(), - }, - }, - }) - if err != nil { - t.Fatalf("Got %v", err) - } - - err = tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID()}) - if err == nil || err.Error() != "timed out waiting for peers to disconnect" { - t.Fatalf("Expected no disconnect, got %v", err) - } -} - -// This test checks the Lightnode behavior of server, when serving Retrieve -// requests are disabled -func TestLigthnodeRetrieveRequestWithoutRetrieve(t *testing.T) { - registryOptions := &RegistryOptions{ - Retrieval: RetrievalDisabled, - Syncing: SyncingDisabled, - } - tester, _, _, teardown, err := newStreamerTester(registryOptions) - if err != nil { - t.Fatal(err) - } - defer teardown() - - node := tester.Nodes[0] - - stream := NewStream(swarmChunkServerStreamName, "", false) - - err = tester.TestExchanges( - p2ptest.Exchange{ - Label: "SubscribeMsg", - Triggers: []p2ptest.Trigger{ - { - Code: 4, - Msg: &SubscribeMsg{ - Stream: stream, - }, - Peer: node.ID(), - }, - }, - Expects: []p2ptest.Expect{ - { - Code: 7, - Msg: &SubscribeErrorMsg{ - Error: "stream RETRIEVE_REQUEST not registered", - }, - Peer: node.ID(), - }, - }, - }) - if err != nil { - t.Fatalf("Got %v", err) - } -} - // This test checks the default behavior of the server, that is // when syncing is enabled. func TestLigthnodeRequestSubscriptionWithSync(t *testing.T) { registryOptions := &RegistryOptions{ - Retrieval: RetrievalDisabled, - Syncing: SyncingRegisterOnly, + Syncing: SyncingRegisterOnly, } tester, _, _, teardown, err := newStreamerTester(registryOptions) if err != nil { @@ -153,8 +69,7 @@ func TestLigthnodeRequestSubscriptionWithSync(t *testing.T) { // when syncing is disabled. func TestLigthnodeRequestSubscriptionWithoutSync(t *testing.T) { registryOptions := &RegistryOptions{ - Retrieval: RetrievalDisabled, - Syncing: SyncingDisabled, + Syncing: SyncingDisabled, } tester, _, _, teardown, err := newStreamerTester(registryOptions) if err != nil { diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go index b293724cc7..b60d2fcc9e 100644 --- a/swarm/network/stream/messages.go +++ b/swarm/network/stream/messages.go @@ -247,7 +247,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg case err := <-errC: if err != nil { log.Debug("client.handleOfferedHashesMsg() error waiting for chunk, dropping peer", "peer", p.ID(), "err", err) - p.Drop(err) + p.Drop() return } case <-ctx.Done(): @@ -289,7 +289,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg case err := <-c.next: if err != nil { log.Warn("c.next error dropping peer", "err", err) - p.Drop(err) + p.Drop() return } case <-c.quit: diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go index 152814bd4f..98b237ce2c 100644 --- a/swarm/network/stream/peer.go +++ b/swarm/network/stream/peer.go @@ -90,7 +90,7 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { err := p.Send(wmsg.Context, wmsg.Msg) if err != nil { log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err) - p.Drop(err) + p.Drop() } }) diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go index 2d5935276c..e34f87951b 100644 --- a/swarm/network/stream/snapshot_retrieval_test.go +++ b/swarm/network/stream/snapshot_retrieval_test.go @@ -119,7 +119,6 @@ var retrievalSimServiceMap = map[string]simulation.ServiceFunc{ } r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - Retrieval: RetrievalEnabled, Syncing: SyncingAutoSubscribe, SyncUpdateDelay: syncUpdateDelay, }, nil) diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go index 605c9dbeb7..fefdb7c9f8 100644 --- a/swarm/network/stream/snapshot_sync_test.go +++ b/swarm/network/stream/snapshot_sync_test.go @@ -118,7 +118,6 @@ var simServiceMap = map[string]simulation.ServiceFunc{ store := state.NewInmemoryStore() r := NewRegistry(addr.ID(), delivery, netStore, store, &RegistryOptions{ - Retrieval: RetrievalDisabled, Syncing: SyncingAutoSubscribe, SyncUpdateDelay: 3 * time.Second, }, nil) diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 0d990da5cc..10a8f7ec54 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -18,7 +18,6 @@ package stream import ( "context" - "errors" "fmt" "math" "reflect" @@ -30,11 +29,11 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/protocols" "github.com/ethereum/go-ethereum/rpc" - "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network/stream/intervals" "github.com/ethereum/go-ethereum/swarm/state" + "github.com/ethereum/go-ethereum/swarm/storage" ) const ( @@ -49,7 +48,6 @@ const ( // Enumerate options for syncing and retrieval type SyncingOption int -type RetrievalOption int // Syncing options const ( @@ -61,17 +59,6 @@ const ( SyncingAutoSubscribe ) -const ( - // Retrieval disabled. Used mostly for tests to isolate syncing features (i.e. syncing only) - RetrievalDisabled RetrievalOption = iota - // Only the client side of the retrieve request is registered. - // (light nodes do not serve retrieve requests) - // once the client is registered, subscription to retrieve request stream is always sent - RetrievalClientOnly - // Both client and server funcs are registered, subscribe sent automatically - RetrievalEnabled -) - // subscriptionFunc is used to determine what to do in order to perform subscriptions // usually we would start to really subscribe to nodes, but for tests other functionality may be needed // (see TestRequestPeerSubscriptions in streamer_test.go) @@ -90,7 +77,6 @@ type Registry struct { peers map[enode.ID]*Peer delivery *Delivery intervalsStore state.Store - autoRetrieval bool // automatically subscribe to retrieve request stream maxPeerServers int spec *protocols.Spec //this protocol's spec balance protocols.Balance //implements protocols.Balance, for accounting @@ -101,22 +87,19 @@ type Registry struct { // RegistryOptions holds optional values for NewRegistry constructor. type RegistryOptions struct { SkipCheck bool - Syncing SyncingOption // Defines syncing behavior - Retrieval RetrievalOption // Defines retrieval behavior + Syncing SyncingOption // Defines syncing behavior SyncUpdateDelay time.Duration MaxPeerServers int // The limit of servers for each peer in registry } // NewRegistry is Streamer constructor -func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore chunk.FetchStore, intervalsStore state.Store, options *RegistryOptions, balance protocols.Balance) *Registry { +func NewRegistry(localID enode.ID, delivery *Delivery, netStore *storage.NetStore, intervalsStore state.Store, options *RegistryOptions, balance protocols.Balance) *Registry { if options == nil { options = &RegistryOptions{} } if options.SyncUpdateDelay <= 0 { options.SyncUpdateDelay = 15 * time.Second } - // check if retrieval has been disabled - retrieval := options.Retrieval != RetrievalDisabled quit := make(chan struct{}) @@ -128,7 +111,6 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore chunk.Fetc peers: make(map[enode.ID]*Peer), delivery: delivery, intervalsStore: intervalsStore, - autoRetrieval: retrieval, maxPeerServers: options.MaxPeerServers, balance: balance, quit: quit, @@ -139,27 +121,10 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore chunk.Fetc streamer.api = NewAPI(streamer) delivery.getPeer = streamer.getPeer - // if retrieval is enabled, register the server func, so that retrieve requests will be served (non-light nodes only) - if options.Retrieval == RetrievalEnabled { - streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, live bool) (Server, error) { - if !live { - return nil, errors.New("only live retrieval requests supported") - } - return NewSwarmChunkServer(delivery.chunkStore), nil - }) - } - - // if retrieval is not disabled, register the client func (both light nodes and normal nodes can issue retrieve requests) - if options.Retrieval != RetrievalDisabled { - streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) { - return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live)) - }) - } - // If syncing is not disabled, the syncing functions are registered (both client and server) if options.Syncing != SyncingDisabled { - RegisterSwarmSyncerServer(streamer, syncChunkStore) - RegisterSwarmSyncerClient(streamer, syncChunkStore) + RegisterSwarmSyncerServer(streamer, netStore) + RegisterSwarmSyncerClient(streamer, netStore) } // if syncing is set to automatically subscribe to the syncing stream, start the subscription process @@ -381,7 +346,7 @@ func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8 } log.Debug("Subscribe ", "peer", peerId, "stream", s, "history", h) - return peer.SendPriority(context.TODO(), msg, priority) + return peer.Send(context.TODO(), msg) } func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error { @@ -422,8 +387,7 @@ func (r *Registry) Quit(peerId enode.ID, s Stream) error { func (r *Registry) Close() error { // Stop sending neighborhood depth change and address count // change from Kademlia that were initiated in NewRegistry constructor. - r.delivery.kad.CloseNeighbourhoodDepthC() - r.delivery.kad.CloseAddrCountC() + r.delivery.Close() close(r.quit) return r.intervalsStore.Close() } @@ -464,13 +428,6 @@ func (r *Registry) Run(p *network.BzzPeer) error { defer close(sp.quit) defer sp.close() - if r.autoRetrieval && !p.LightNode { - err := r.Subscribe(p.ID(), NewStream(swarmChunkServerStreamName, "", true), nil, Top) - if err != nil { - return err - } - } - return sp.Run(sp.HandleMsg) } @@ -619,19 +576,66 @@ func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error { return p.handleUnsubscribeMsg(msg) case *OfferedHashesMsg: - return p.handleOfferedHashesMsg(ctx, msg) + go func() { + err := p.handleOfferedHashesMsg(ctx, msg) + if err != nil { + log.Error(err.Error()) + p.Drop() + } + }() + return nil case *TakeoverProofMsg: - return p.handleTakeoverProofMsg(ctx, msg) + go func() { + err := p.handleTakeoverProofMsg(ctx, msg) + if err != nil { + log.Error(err.Error()) + p.Drop() + } + }() + return nil case *WantedHashesMsg: - return p.handleWantedHashesMsg(ctx, msg) + go func() { + err := p.handleWantedHashesMsg(ctx, msg) + if err != nil { + log.Error(err.Error()) + p.Drop() + } + }() + return nil + + case *ChunkDeliveryMsgRetrieval: + // handling chunk delivery is the same for retrieval and syncing, so let's cast the msg + go func() { + err := p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg))) + if err != nil { + log.Error(err.Error()) + p.Drop() + } + }() + return nil - case *ChunkDeliveryMsgRetrieval, *ChunkDeliveryMsgSyncing: - return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, msg) + case *ChunkDeliveryMsgSyncing: + // handling chunk delivery is the same for retrieval and syncing, so let's cast the msg + go func() { + err := p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg))) + if err != nil { + log.Error(err.Error()) + p.Drop() + } + }() + return nil case *RetrieveRequestMsg: - return p.streamer.delivery.handleRetrieveRequestMsg(ctx, p, msg) + go func() { + err := p.streamer.delivery.handleRetrieveRequestMsg(ctx, p, msg) + if err != nil { + log.Error(err.Error()) + p.Drop() + } + }() + return nil case *RequestSubscriptionMsg: return p.handleRequestSubscription(ctx, msg) @@ -762,7 +766,7 @@ func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error return err } - if err := p.SendPriority(context.TODO(), tp, c.priority); err != nil { + if err := p.Send(context.TODO(), tp); err != nil { return err } if c.to > 0 && tp.Takeover.End >= c.to { @@ -964,15 +968,13 @@ func (api *API) UnsubscribeStream(peerId enode.ID, s Stream) error { } /* -GetPeerSubscriptions is a API function which allows to query a peer for stream subscriptions it has. +GetPeerServerSubscriptions is a API function which allows to query a peer for stream subscriptions it has. It can be called via RPC. It returns a map of node IDs with an array of string representations of Stream objects. */ -func (api *API) GetPeerSubscriptions() map[string][]string { - //create the empty map +func (api *API) GetPeerServerSubscriptions() map[string][]string { pstreams := make(map[string][]string) - //iterate all streamer peers api.streamer.peersMu.RLock() defer api.streamer.peersMu.RUnlock() diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go index bdd3087bbc..c7da05014b 100644 --- a/swarm/network/stream/streamer_test.go +++ b/swarm/network/stream/streamer_test.go @@ -539,7 +539,7 @@ func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) { t.Fatal(err) } - expectedError := errors.New("Message handler error: (msg code 1): error invalid hashes length (len: 40)") + expectedError := errors.New("subprotocol error") if err := tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: expectedError}); err != nil { t.Fatal(err) } @@ -779,7 +779,6 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { func TestMaxPeerServersWithUnsubscribe(t *testing.T) { var maxPeerServers = 6 tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{ - Retrieval: RetrievalDisabled, Syncing: SyncingDisabled, MaxPeerServers: maxPeerServers, }) @@ -940,8 +939,7 @@ func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) { //`Price` interface implementation func TestHasPriceImplementation(t *testing.T) { _, r, _, teardown, err := newStreamerTester(&RegistryOptions{ - Retrieval: RetrievalDisabled, - Syncing: SyncingDisabled, + Syncing: SyncingDisabled, }) if err != nil { t.Fatal(err) @@ -1123,8 +1121,8 @@ func TestRequestPeerSubscriptions(t *testing.T) { } } -// TestGetSubscriptions is a unit test for the api.GetPeerSubscriptions() function -func TestGetSubscriptions(t *testing.T) { +// TestGetServerSubscriptions is a unit test for the api.GetPeerServerSubscriptions() function +func TestGetServerSubscriptions(t *testing.T) { // create an amount of dummy peers testPeerCount := 8 // every peer will have this amount of dummy servers @@ -1135,7 +1133,7 @@ func TestGetSubscriptions(t *testing.T) { r := &Registry{} api := NewAPI(r) // call once, at this point should be empty - regs := api.GetPeerSubscriptions() + regs := api.GetPeerServerSubscriptions() if len(regs) != 0 { t.Fatal("Expected subscription count to be 0, but it is not") } @@ -1159,7 +1157,7 @@ func TestGetSubscriptions(t *testing.T) { r.peers = peerMap // call the subscriptions again - regs = api.GetPeerSubscriptions() + regs = api.GetPeerServerSubscriptions() // count how many (fake) subscriptions there are cnt := 0 for _, reg := range regs { @@ -1175,11 +1173,11 @@ func TestGetSubscriptions(t *testing.T) { } /* -TestGetSubscriptionsRPC sets up a simulation network of `nodeCount` nodes, +TestGetServerSubscriptionsRPC sets up a simulation network of `nodeCount` nodes, starts the simulation, waits for SyncUpdateDelay in order to kick off stream registration, then tests that there are subscriptions. */ -func TestGetSubscriptionsRPC(t *testing.T) { +func TestGetServerSubscriptionsRPC(t *testing.T) { if testutil.RaceEnabled && os.Getenv("TRAVIS") == "true" { t.Skip("flaky with -race on Travis") @@ -1226,7 +1224,6 @@ func TestGetSubscriptionsRPC(t *testing.T) { // configure so that sync registrations actually happen r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - Retrieval: RetrievalEnabled, Syncing: SyncingAutoSubscribe, //enable sync registrations SyncUpdateDelay: syncUpdateDelay, }, nil) @@ -1321,7 +1318,7 @@ func TestGetSubscriptionsRPC(t *testing.T) { //ask it for subscriptions pstreams := make(map[string][]string) - err = client.Call(&pstreams, "stream_getPeerSubscriptions") + err = client.Call(&pstreams, "stream_getPeerServerSubscriptions") if err != nil { return fmt.Errorf("client call stream_getPeerSubscriptions: %v", err) } diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go index a8651f386d..b787c7bb81 100644 --- a/swarm/network/stream/syncer_test.go +++ b/swarm/network/stream/syncer_test.go @@ -83,7 +83,6 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p } r := NewRegistry(addr.ID(), delivery, netStore, store, &RegistryOptions{ - Retrieval: RetrievalDisabled, Syncing: SyncingAutoSubscribe, SkipCheck: skipCheck, }, nil) @@ -232,8 +231,7 @@ func TestSameVersionID(t *testing.T) { } r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - Retrieval: RetrievalDisabled, - Syncing: SyncingAutoSubscribe, + Syncing: SyncingAutoSubscribe, }, nil) bucket.Store(bucketKeyRegistry, r) @@ -296,8 +294,7 @@ func TestDifferentVersionID(t *testing.T) { } r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - Retrieval: RetrievalDisabled, - Syncing: SyncingAutoSubscribe, + Syncing: SyncingAutoSubscribe, }, nil) bucket.Store(bucketKeyRegistry, r) diff --git a/swarm/swarm.go b/swarm/swarm.go index 7f5ee83610..2f025d9cc9 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -204,15 +204,9 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e syncing = stream.SyncingDisabled } - retrieval := stream.RetrievalEnabled - if config.LightNodeEnabled { - retrieval = stream.RetrievalClientOnly - } - registryOptions := &stream.RegistryOptions{ SkipCheck: config.DeliverySkipCheck, Syncing: syncing, - Retrieval: retrieval, SyncUpdateDelay: config.SyncUpdateDelay, MaxPeerServers: config.MaxStreamPeerServers, }