Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions swarm/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,6 @@ type Store interface {
Close() (err error)
}

// FetchStore is a Store which supports syncing
type FetchStore interface {
Store
FetchFunc(ctx context.Context, addr Address) func(context.Context) error
}

// Validator validates a chunk.
type Validator interface {
Validate(ch Chunk) bool
Expand Down
20 changes: 10 additions & 10 deletions swarm/network/stream/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,17 @@ var (
)

type Delivery struct {
chunkStore chunk.FetchStore
kad *network.Kademlia
getPeer func(enode.ID) *Peer
quit chan struct{}
netStore *storage.NetStore
kad *network.Kademlia
getPeer func(enode.ID) *Peer
quit chan struct{}
}

func NewDelivery(kad *network.Kademlia, chunkStore chunk.FetchStore) *Delivery {
func NewDelivery(kad *network.Kademlia, netStore *storage.NetStore) *Delivery {
return &Delivery{
chunkStore: chunkStore,
kad: kad,
quit: make(chan struct{}),
netStore: netStore,
kad: kad,
quit: make(chan struct{}),
}
}

Expand Down Expand Up @@ -94,7 +94,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *

go func() {
defer osp.Finish()
ch, err := d.chunkStore.Get(ctx, chunk.ModeGetRequest, req.Addr)
ch, err := d.netStore.Get(ctx, chunk.ModeGetRequest, req.Addr)
if err != nil {
retrieveChunkFail.Inc(1)
log.Debug("ChunkStore.Get can not retrieve chunk", "peer", sp.ID().String(), "addr", req.Addr, "hopcount", req.HopCount, "err", err)
Expand Down Expand Up @@ -171,7 +171,7 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int

msg.peer = sp
log.Trace("handle.chunk.delivery", "put", msg.Addr)
_, err := d.chunkStore.Put(ctx, mode, storage.NewChunk(msg.Addr, msg.SData))
_, err := d.netStore.Put(ctx, mode, storage.NewChunk(msg.Addr, msg.SData))
if err != nil {
if err == storage.ErrChunkInvalid {
// we removed this log because it spams the logs
Expand Down
9 changes: 4 additions & 5 deletions swarm/network/stream/intervals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
Expand Down Expand Up @@ -287,20 +286,20 @@ func enableNotifications(r *Registry, peerID enode.ID, s Stream) error {

type testExternalClient struct {
hashes chan []byte
store chunk.FetchStore
netStore *storage.NetStore
enableNotificationsC chan struct{}
}

func newTestExternalClient(store chunk.FetchStore) *testExternalClient {
func newTestExternalClient(netStore *storage.NetStore) *testExternalClient {
return &testExternalClient{
hashes: make(chan []byte),
store: store,
netStore: netStore,
enableNotificationsC: make(chan struct{}),
}
}

func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error {
wait := c.store.FetchFunc(ctx, storage.Address(hash))
wait := c.netStore.FetchFunc(ctx, storage.Address(hash))
if wait == nil {
return nil
}
Expand Down
32 changes: 16 additions & 16 deletions swarm/network/stream/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,27 @@ const (
// * live request delivery with or without checkback
// * (live/non-live historical) chunk syncing per proximity bin
type SwarmSyncerServer struct {
po uint8
store chunk.FetchStore
quit chan struct{}
po uint8
netStore *storage.NetStore
quit chan struct{}
}

// NewSwarmSyncerServer is constructor for SwarmSyncerServer
func NewSwarmSyncerServer(po uint8, syncChunkStore chunk.FetchStore) (*SwarmSyncerServer, error) {
func NewSwarmSyncerServer(po uint8, netStore *storage.NetStore) (*SwarmSyncerServer, error) {
return &SwarmSyncerServer{
po: po,
store: syncChunkStore,
quit: make(chan struct{}),
po: po,
netStore: netStore,
quit: make(chan struct{}),
}, nil
}

func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore chunk.FetchStore) {
func RegisterSwarmSyncerServer(streamer *Registry, netStore *storage.NetStore) {
streamer.RegisterServerFunc("SYNC", func(_ *Peer, t string, _ bool) (Server, error) {
po, err := ParseSyncBinKey(t)
if err != nil {
return nil, err
}
return NewSwarmSyncerServer(po, syncChunkStore)
return NewSwarmSyncerServer(po, netStore)
})
// streamer.RegisterServerFunc(stream, func(p *Peer) (Server, error) {
// return NewOutgoingProvableSwarmSyncer(po, db)
Expand All @@ -68,7 +68,7 @@ func (s *SwarmSyncerServer) Close() {

// GetData retrieves the actual chunk from netstore
func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error) {
ch, err := s.store.Get(ctx, chunk.ModeGetSync, storage.Address(key))
ch, err := s.netStore.Get(ctx, chunk.ModeGetSync, storage.Address(key))
if err != nil {
return nil, err
}
Expand All @@ -77,7 +77,7 @@ func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, er

// SessionIndex returns current storage bin (po) index.
func (s *SwarmSyncerServer) SessionIndex() (uint64, error) {
return s.store.LastPullSubscriptionBinID(s.po)
return s.netStore.LastPullSubscriptionBinID(s.po)
}

// SetNextBatch retrieves the next batch of hashes from the localstore.
Expand All @@ -88,7 +88,7 @@ func (s *SwarmSyncerServer) SessionIndex() (uint64, error) {
// are added in batchTimeout period, the batch will be returned. This function
// will block until new chunks are received from localstore pull subscription.
func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
descriptors, stop := s.store.SubscribePull(context.Background(), s.po, from, to)
descriptors, stop := s.netStore.SubscribePull(context.Background(), s.po, from, to)
defer stop()

const batchTimeout = 2 * time.Second
Expand Down Expand Up @@ -118,7 +118,7 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6
// This is the most naive approach to label the chunk as synced
// allowing it to be garbage collected. A proper way requires
// validating that the chunk is successfully stored by the peer.
err := s.store.Set(context.Background(), chunk.ModeSetSync, d.Address)
err := s.netStore.Set(context.Background(), chunk.ModeSetSync, d.Address)
if err != nil {
return nil, 0, 0, nil, err
}
Expand Down Expand Up @@ -158,13 +158,13 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6

// SwarmSyncerClient
type SwarmSyncerClient struct {
store chunk.FetchStore
store *storage.NetStore
peer *Peer
stream Stream
}

// NewSwarmSyncerClient is a contructor for provable data exchange syncer
func NewSwarmSyncerClient(p *Peer, store chunk.FetchStore, stream Stream) (*SwarmSyncerClient, error) {
func NewSwarmSyncerClient(p *Peer, store *storage.NetStore, stream Stream) (*SwarmSyncerClient, error) {
return &SwarmSyncerClient{
store: store,
peer: p,
Expand Down Expand Up @@ -210,7 +210,7 @@ func NewSwarmSyncerClient(p *Peer, store chunk.FetchStore, stream Stream) (*Swar

// RegisterSwarmSyncerClient registers the client constructor function for
// to handle incoming sync streams
func RegisterSwarmSyncerClient(streamer *Registry, store chunk.FetchStore) {
func RegisterSwarmSyncerClient(streamer *Registry, store *storage.NetStore) {
streamer.RegisterClientFunc("SYNC", func(p *Peer, t string, live bool) (Client, error) {
return NewSwarmSyncerClient(p, store, NewStream("SYNC", t, live))
})
Expand Down