Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions network/stream/delivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,9 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
{ //to which the peer responds with offered hashes
Code: 1,
Msg: &OfferedHashesMsg{
HandoverProof: nil,
Hashes: nil,
From: 0,
To: 0,
Hashes: nil,
From: 0,
To: 0,
},
Peer: node.ID(),
},
Expand Down
8 changes: 2 additions & 6 deletions network/stream/intervals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,10 +314,6 @@ func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func(con
return wait
}

func (c *testExternalClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) {
return nil
}

func (c *testExternalClient) Close() {}

type testExternalServer struct {
Expand All @@ -343,15 +339,15 @@ func (s *testExternalServer) SessionIndex() (uint64, error) {
return s.sessionAt, nil
}

func (s *testExternalServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
func (s *testExternalServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, error) {
if to > s.maxKeys {
to = s.maxKeys
}
b := make([]byte, HashSize*(to-from+1))
for i := from; i <= to; i++ {
s.keyFunc(b[(i-from)*HashSize:(i-from+1)*HashSize], i)
}
return b, from, to, nil, nil
return b, from, to, nil
}

func (s *testExternalServer) GetData(context.Context, []byte) ([]byte, error) {
Expand Down
15 changes: 4 additions & 11 deletions network/stream/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,9 @@ func (p *Peer) handleQuitMsg(req *QuitMsg) error {
// OfferedHashesMsg is the protocol msg for offering to hand over a
// stream section
type OfferedHashesMsg struct {
Stream Stream // name of Stream
From, To uint64 // peer and db-specific entry count
Hashes []byte // stream of hashes (128)
*HandoverProof // HandoverProof
Stream Stream // name of Stream
From, To uint64 // peer and db-specific entry count
Hashes []byte // stream of hashes (128)
}

// String pretty prints OfferedHashesMsg
Expand Down Expand Up @@ -265,7 +264,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
}
}
select {
case c.next <- c.batchDone(p, req, hashes):
case c.next <- c.AddInterval(req.From, req.To):
case <-c.quit:
log.Debug("client.handleOfferedHashesMsg() quit")
case <-ctx.Done():
Expand Down Expand Up @@ -385,12 +384,6 @@ type Handover struct {
Root []byte // Root hash for indexed segment inclusion proofs
}

// HandoverProof represents a signed statement that the upstream peer handed over the stream section
type HandoverProof struct {
Sig []byte // Sign(Hash(Serialisation(Handover)))
*Handover
}

// Takeover represents a statement that downstream peer took over (stored all data)
// handed over
type Takeover Handover
Expand Down
16 changes: 5 additions & 11 deletions network/stream/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,26 +183,20 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error {

defer metrics.GetOrRegisterResettingTimer("send.offered.hashes", nil).UpdateSince(time.Now())

hashes, from, to, proof, err := s.setNextBatch(f, t)
hashes, from, to, err := s.setNextBatch(f, t)
if err != nil {
return err
}
// true only when quitting
if len(hashes) == 0 {
return nil
}
if proof == nil {
proof = &HandoverProof{
Handover: &Handover{},
}
}
s.currentBatch = hashes
msg := &OfferedHashesMsg{
HandoverProof: proof,
Hashes: hashes,
From: from,
To: to,
Stream: s.stream,
Hashes: hashes,
From: from,
To: to,
Stream: s.stream,
}
log.Trace("Swarm syncer offer batch", "peer", p.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to)
ctx = context.WithValue(ctx, "stream_send_tag", "send.offered.hashes")
Expand Down
25 changes: 3 additions & 22 deletions network/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ type server struct {
// setNextBatch adjusts passed interval based on session index and whether
// stream is live or history. It calls Server SetNextBatch with adjusted
// interval and returns batch hashes and their interval.
func (s *server) setNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
func (s *server) setNextBatch(from, to uint64) ([]byte, uint64, uint64, error) {
if s.stream.Live {
if from == 0 {
from = s.sessionIndex
Expand All @@ -484,7 +484,7 @@ func (s *server) setNextBatch(from, to uint64) ([]byte, uint64, uint64, *Handove
}
} else {
if (to < from && to != 0) || from > s.sessionIndex {
return nil, 0, 0, nil, nil
return nil, 0, 0, nil
}
if to == 0 || to > s.sessionIndex {
to = s.sessionIndex
Expand All @@ -500,7 +500,7 @@ type Server interface {
// Based on this index, live and history stream intervals
// will be adjusted before calling SetNextBatch.
SessionIndex() (uint64, error)
SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error)
SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, err error)
GetData(context.Context, []byte) ([]byte, error)
Close()
}
Expand Down Expand Up @@ -544,7 +544,6 @@ func (c *client) NextInterval() (start, end uint64, err error) {
// Client interface for incoming peer Streamer
type Client interface {
NeedData(context.Context, []byte) func(context.Context) error
BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error)
Close()
}

Expand Down Expand Up @@ -574,24 +573,6 @@ func (c *client) nextBatch(from uint64) (nextFrom uint64, nextTo uint64) {
return
}

func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error {
if tf := c.BatchDone(req.Stream, req.From, hashes, req.Root); tf != nil {
tp, err := tf()
if err != nil {
return err
}

if err := p.Send(context.TODO(), tp); err != nil {
return err
}
if c.to > 0 && tp.Takeover.End >= c.to {
return p.streamer.Unsubscribe(p.Peer.ID(), req.Stream)
}
return nil
}
return c.AddInterval(req.From, req.To)
}

func (c *client) close() {
select {
case <-c.quit:
Expand Down
63 changes: 3 additions & 60 deletions network/stream/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ type testClient struct {
t string
wait0 chan bool
wait2 chan bool
batchDone chan bool
receivedHashes map[string][]byte
}

Expand All @@ -90,7 +89,6 @@ func newTestClient(t string) *testClient {
t: t,
wait0: make(chan bool),
wait2: make(chan bool),
batchDone: make(chan bool),
receivedHashes: make(map[string][]byte),
}
}
Expand All @@ -111,11 +109,6 @@ func (self *testClient) NeedData(ctx context.Context, hash []byte) func(context.
return nil
}

func (self *testClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) {
close(self.batchDone)
return nil
}

func (self *testClient) Close() {}

type testServer struct {
Expand All @@ -134,8 +127,8 @@ func (s *testServer) SessionIndex() (uint64, error) {
return s.sessionIndex, nil
}

func (self *testServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
return make([]byte, HashSize), from + 1, to + 1, nil, nil
func (self *testServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, error) {
return make([]byte, HashSize), from + 1, to + 1, nil
}

func (self *testServer) GetData(context.Context, []byte) ([]byte, error) {
Expand Down Expand Up @@ -186,9 +179,6 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
{
Code: 1,
Msg: &OfferedHashesMsg{
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: hashes,
From: 5,
To: 8,
Expand Down Expand Up @@ -271,9 +261,6 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
Code: 1,
Msg: &OfferedHashesMsg{
Stream: stream,
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: make([]byte, HashSize),
From: 6,
To: 9,
Expand Down Expand Up @@ -337,9 +324,6 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
Code: 1,
Msg: &OfferedHashesMsg{
Stream: stream,
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: make([]byte, HashSize),
From: 1,
To: 0,
Expand Down Expand Up @@ -448,9 +432,6 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
Code: 1,
Msg: &OfferedHashesMsg{
Stream: NewStream("foo", "", false),
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: make([]byte, HashSize),
From: 6,
To: 9,
Expand All @@ -461,9 +442,6 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
Code: 1,
Msg: &OfferedHashesMsg{
Stream: stream,
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
From: 11,
To: 0,
Hashes: make([]byte, HashSize),
Expand Down Expand Up @@ -521,9 +499,6 @@ func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) {
{
Code: 1,
Msg: &OfferedHashesMsg{
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: corruptHashes,
From: 5,
To: 8,
Expand Down Expand Up @@ -586,9 +561,6 @@ func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
{
Code: 1,
Msg: &OfferedHashesMsg{
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: hashes,
From: 5,
To: 8,
Expand Down Expand Up @@ -620,26 +592,9 @@ func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {

close(tc.wait0)

timeout := time.NewTimer(100 * time.Millisecond)
defer timeout.Stop()

select {
case <-tc.batchDone:
t.Fatal("batch done early")
case <-timeout.C:
}
time.Sleep(100 * time.Millisecond)

close(tc.wait2)

timeout2 := time.NewTimer(10000 * time.Millisecond)
defer timeout2.Stop()

select {
case <-tc.batchDone:
case <-timeout2.C:
t.Fatal("timeout waiting batchdone call")
}

}

func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
Expand Down Expand Up @@ -694,9 +649,6 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
Code: 1,
Msg: &OfferedHashesMsg{
Stream: NewStream("foo", "", false),
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: make([]byte, HashSize),
From: 6,
To: 9,
Expand All @@ -707,9 +659,6 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
Code: 1,
Msg: &OfferedHashesMsg{
Stream: stream,
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
From: 11,
To: 0,
Hashes: make([]byte, HashSize),
Expand Down Expand Up @@ -811,9 +760,6 @@ func TestMaxPeerServersWithUnsubscribe(t *testing.T) {
Code: 1,
Msg: &OfferedHashesMsg{
Stream: stream,
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: make([]byte, HashSize),
From: 1,
To: 0,
Expand Down Expand Up @@ -915,9 +861,6 @@ func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) {
Code: 1,
Msg: &OfferedHashesMsg{
Stream: stream,
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: make([]byte, HashSize),
From: 1,
To: 0,
Expand Down
15 changes: 3 additions & 12 deletions network/stream/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (s *SwarmSyncerServer) SessionIndex() (uint64, error) {
// chunk addresses. If at least one chunk is added to the batch and no new chunks
// are added in batchTimeout period, the batch will be returned. This function
// will block until new chunks are received from localstore pull subscription.
func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, error) {
batchStart := time.Now()
descriptors, stop := s.netStore.SubscribePull(context.Background(), s.po, from, to)
defer stop()
Expand Down Expand Up @@ -131,7 +131,7 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6
if err != nil {
metrics.GetOrRegisterCounter("syncer.set-next-batch.set-sync-err", nil).Inc(1)
log.Debug("syncer pull subscription - err setting chunk as synced", "correlateId", s.correlateId, "err", err)
return nil, 0, 0, nil, err
return nil, 0, 0, err
}
batchSize++
if batchStartID == nil {
Expand Down Expand Up @@ -171,7 +171,7 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6
// if batch start id is not set, return 0
batchStartID = new(uint64)
}
return batch, *batchStartID, batchEndID, nil, nil
return batch, *batchStartID, batchEndID, nil
}

// SwarmSyncerClient
Expand Down Expand Up @@ -203,15 +203,6 @@ func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func
return s.netStore.FetchFunc(ctx, key)
}

// BatchDone
func (s *SwarmSyncerClient) BatchDone(stream Stream, from uint64, hashes []byte, root []byte) func() (*TakeoverProof, error) {
// TODO: reenable this with putter/getter refactored code
// if s.chunker != nil {
// return func() (*TakeoverProof, error) { return s.TakeoverProof(stream, from, hashes, root) }
// }
return nil
}

func (s *SwarmSyncerClient) Close() {}

// base for parsing and formating sync bin key
Expand Down