diff --git a/swarm/chunk/chunk.go b/swarm/chunk/chunk.go index c8551814c2..2455904f3c 100644 --- a/swarm/chunk/chunk.go +++ b/swarm/chunk/chunk.go @@ -172,12 +172,6 @@ type Store interface { Close() (err error) } -// FetchStore is a Store which supports syncing -type FetchStore interface { - Store - FetchFunc(ctx context.Context, addr Address) func(context.Context) error -} - // Validator validates a chunk. type Validator interface { Validate(ch Chunk) bool diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go index 344612369b..aa2c817ea6 100644 --- a/swarm/network/stream/delivery.go +++ b/swarm/network/stream/delivery.go @@ -46,17 +46,17 @@ var ( ) type Delivery struct { - chunkStore chunk.FetchStore - kad *network.Kademlia - getPeer func(enode.ID) *Peer - quit chan struct{} + netStore *storage.NetStore + kad *network.Kademlia + getPeer func(enode.ID) *Peer + quit chan struct{} } -func NewDelivery(kad *network.Kademlia, chunkStore chunk.FetchStore) *Delivery { +func NewDelivery(kad *network.Kademlia, netStore *storage.NetStore) *Delivery { return &Delivery{ - chunkStore: chunkStore, - kad: kad, - quit: make(chan struct{}), + netStore: netStore, + kad: kad, + quit: make(chan struct{}), } } @@ -94,7 +94,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * go func() { defer osp.Finish() - ch, err := d.chunkStore.Get(ctx, chunk.ModeGetRequest, req.Addr) + ch, err := d.netStore.Get(ctx, chunk.ModeGetRequest, req.Addr) if err != nil { retrieveChunkFail.Inc(1) log.Debug("ChunkStore.Get can not retrieve chunk", "peer", sp.ID().String(), "addr", req.Addr, "hopcount", req.HopCount, "err", err) @@ -171,7 +171,7 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int msg.peer = sp log.Trace("handle.chunk.delivery", "put", msg.Addr) - _, err := d.chunkStore.Put(ctx, mode, storage.NewChunk(msg.Addr, msg.SData)) + _, err := d.netStore.Put(ctx, mode, storage.NewChunk(msg.Addr, msg.SData)) if err != nil { if err == storage.ErrChunkInvalid { // we removed this log because it spams the logs diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go index f7b8349938..660954857e 100644 --- a/swarm/network/stream/intervals_test.go +++ b/swarm/network/stream/intervals_test.go @@ -29,7 +29,6 @@ import ( "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" - "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" @@ -287,20 +286,20 @@ func enableNotifications(r *Registry, peerID enode.ID, s Stream) error { type testExternalClient struct { hashes chan []byte - store chunk.FetchStore + netStore *storage.NetStore enableNotificationsC chan struct{} } -func newTestExternalClient(store chunk.FetchStore) *testExternalClient { +func newTestExternalClient(netStore *storage.NetStore) *testExternalClient { return &testExternalClient{ hashes: make(chan []byte), - store: store, + netStore: netStore, enableNotificationsC: make(chan struct{}), } } func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error { - wait := c.store.FetchFunc(ctx, storage.Address(hash)) + wait := c.netStore.FetchFunc(ctx, storage.Address(hash)) if wait == nil { return nil } diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go index c573da5d25..79b04a3078 100644 --- a/swarm/network/stream/syncer.go +++ b/swarm/network/stream/syncer.go @@ -34,27 +34,27 @@ const ( // * live request delivery with or without checkback // * (live/non-live historical) chunk syncing per proximity bin type SwarmSyncerServer struct { - po uint8 - store chunk.FetchStore - quit chan struct{} + po uint8 + netStore *storage.NetStore + quit chan struct{} } // NewSwarmSyncerServer is constructor for SwarmSyncerServer -func NewSwarmSyncerServer(po uint8, syncChunkStore chunk.FetchStore) (*SwarmSyncerServer, error) { +func NewSwarmSyncerServer(po uint8, netStore *storage.NetStore) (*SwarmSyncerServer, error) { return &SwarmSyncerServer{ - po: po, - store: syncChunkStore, - quit: make(chan struct{}), + po: po, + netStore: netStore, + quit: make(chan struct{}), }, nil } -func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore chunk.FetchStore) { +func RegisterSwarmSyncerServer(streamer *Registry, netStore *storage.NetStore) { streamer.RegisterServerFunc("SYNC", func(_ *Peer, t string, _ bool) (Server, error) { po, err := ParseSyncBinKey(t) if err != nil { return nil, err } - return NewSwarmSyncerServer(po, syncChunkStore) + return NewSwarmSyncerServer(po, netStore) }) // streamer.RegisterServerFunc(stream, func(p *Peer) (Server, error) { // return NewOutgoingProvableSwarmSyncer(po, db) @@ -68,7 +68,7 @@ func (s *SwarmSyncerServer) Close() { // GetData retrieves the actual chunk from netstore func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error) { - ch, err := s.store.Get(ctx, chunk.ModeGetSync, storage.Address(key)) + ch, err := s.netStore.Get(ctx, chunk.ModeGetSync, storage.Address(key)) if err != nil { return nil, err } @@ -77,7 +77,7 @@ func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, er // SessionIndex returns current storage bin (po) index. func (s *SwarmSyncerServer) SessionIndex() (uint64, error) { - return s.store.LastPullSubscriptionBinID(s.po) + return s.netStore.LastPullSubscriptionBinID(s.po) } // SetNextBatch retrieves the next batch of hashes from the localstore. @@ -88,7 +88,7 @@ func (s *SwarmSyncerServer) SessionIndex() (uint64, error) { // are added in batchTimeout period, the batch will be returned. This function // will block until new chunks are received from localstore pull subscription. func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { - descriptors, stop := s.store.SubscribePull(context.Background(), s.po, from, to) + descriptors, stop := s.netStore.SubscribePull(context.Background(), s.po, from, to) defer stop() const batchTimeout = 2 * time.Second @@ -118,7 +118,7 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 // This is the most naive approach to label the chunk as synced // allowing it to be garbage collected. A proper way requires // validating that the chunk is successfully stored by the peer. - err := s.store.Set(context.Background(), chunk.ModeSetSync, d.Address) + err := s.netStore.Set(context.Background(), chunk.ModeSetSync, d.Address) if err != nil { return nil, 0, 0, nil, err } @@ -158,67 +158,31 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 // SwarmSyncerClient type SwarmSyncerClient struct { - store chunk.FetchStore - peer *Peer - stream Stream + netStore *storage.NetStore + peer *Peer + stream Stream } // NewSwarmSyncerClient is a contructor for provable data exchange syncer -func NewSwarmSyncerClient(p *Peer, store chunk.FetchStore, stream Stream) (*SwarmSyncerClient, error) { +func NewSwarmSyncerClient(p *Peer, netStore *storage.NetStore, stream Stream) (*SwarmSyncerClient, error) { return &SwarmSyncerClient{ - store: store, - peer: p, - stream: stream, + netStore: netStore, + peer: p, + stream: stream, }, nil } -// // NewIncomingProvableSwarmSyncer is a contructor for provable data exchange syncer -// func NewIncomingProvableSwarmSyncer(po int, priority int, index uint64, sessionAt uint64, intervals []uint64, sessionRoot storage.Address, chunker *storage.PyramidChunker, store storage.ChunkStore, p Peer) *SwarmSyncerClient { -// retrieveC := make(storage.Chunk, chunksCap) -// RunChunkRequestor(p, retrieveC) -// storeC := make(storage.Chunk, chunksCap) -// RunChunkStorer(store, storeC) -// s := &SwarmSyncerClient{ -// po: po, -// priority: priority, -// sessionAt: sessionAt, -// start: index, -// end: index, -// nextC: make(chan struct{}, 1), -// intervals: intervals, -// sessionRoot: sessionRoot, -// sessionReader: chunker.Join(sessionRoot, retrieveC), -// retrieveC: retrieveC, -// storeC: storeC, -// } -// return s -// } - -// // StartSyncing is called on the Peer to start the syncing process -// // the idea is that it is called only after kademlia is close to healthy -// func StartSyncing(s *Streamer, peerId enode.ID, po uint8, nn bool) { -// lastPO := po -// if nn { -// lastPO = maxPO -// } -// -// for i := po; i <= lastPO; i++ { -// s.Subscribe(peerId, "SYNC", newSyncLabel("LIVE", po), 0, 0, High, true) -// s.Subscribe(peerId, "SYNC", newSyncLabel("HISTORY", po), 0, 0, Mid, false) -// } -// } - // RegisterSwarmSyncerClient registers the client constructor function for // to handle incoming sync streams -func RegisterSwarmSyncerClient(streamer *Registry, store chunk.FetchStore) { +func RegisterSwarmSyncerClient(streamer *Registry, netStore *storage.NetStore) { streamer.RegisterClientFunc("SYNC", func(p *Peer, t string, live bool) (Client, error) { - return NewSwarmSyncerClient(p, store, NewStream("SYNC", t, live)) + return NewSwarmSyncerClient(p, netStore, NewStream("SYNC", t, live)) }) } // NeedData func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func(context.Context) error) { - return s.store.FetchFunc(ctx, key) + return s.netStore.FetchFunc(ctx, key) } // BatchDone