Skip to content
116 changes: 84 additions & 32 deletions catchup/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,14 +770,60 @@ func TestGetFutureBlock(t *testing.T) {

// implement network.UnicastPeer
type testUnicastPeer struct {
gn network.GossipNode
t *testing.T
gn network.GossipNode
version string
responseChannels map[uint64]chan *network.Response
t *testing.T
}

func (p *testUnicastPeer) GetAddress() string {
return "test"
}

func (p *testUnicastPeer) Request(ctx context.Context, tag protocol.Tag, topics network.Topics) (resp *network.Response, e error) {

responseChannel := make(chan *network.Response, 1)
p.responseChannels[0] = responseChannel

ps := p.gn.(*httpTestPeerSource)
var dispather network.MessageHandler
for _, v := range ps.dispatchHandlers {
if v.Tag == tag {
dispather = v.MessageHandler
break
}
}
require.NotNil(p.t, dispather)
dispather.Handle(network.IncomingMessage{Tag: tag, Data: topics.MarshallTopics(), Sender: p, Net: p.gn})

// wait for the channel.
select {
case resp = <-responseChannel:
return resp, nil
case <-ctx.Done():
return resp, ctx.Err()
}
}

func (p *testUnicastPeer) Respond(ctx context.Context, reqMsg network.IncomingMessage, responseTopics network.Topics) (e error) {

hashKey := uint64(0)
channel, found := p.responseChannels[hashKey]
if !found {
}

select {
case channel <- &network.Response{Topics: responseTopics}:
default:
}

return nil
}

func (p *testUnicastPeer) Version() string {
return p.version
}

func (p *testUnicastPeer) Unicast(ctx context.Context, msg []byte, tag protocol.Tag) error {
ps := p.gn.(*httpTestPeerSource)
var dispather network.MessageHandler
Expand All @@ -792,10 +838,12 @@ func (p *testUnicastPeer) Unicast(ctx context.Context, msg []byte, tag protocol.
return nil
}

func makeTestUnicastPeer(gn network.GossipNode, t *testing.T) network.UnicastPeer {
func makeTestUnicastPeer(gn network.GossipNode, version string, t *testing.T) network.UnicastPeer {
wsp := testUnicastPeer{}
wsp.gn = gn
wsp.t = t
wsp.version = version
wsp.responseChannels = make(map[uint64]chan *network.Response)
return &wsp
}

Expand All @@ -816,40 +864,44 @@ func TestGetBlockWS(t *testing.T) {
return
}

net := buildTestHTTPPeerSource()
ledgerServiceConfig := config.GetDefaultLocal()
ledgerServiceConfig.CatchupParallelBlocks = 5
ls := rpcs.RegisterLedgerService(ledgerServiceConfig, ledger, net, "test genesisID")
versions := []string{"1", "2.1"}
for _, version := range versions { // range network.SupportedProtocolVersions {

ls.Start()
net := buildTestHTTPPeerSource()
ledgerServiceConfig := config.GetDefaultLocal()
ledgerServiceConfig.CatchupParallelBlocks = 5
ls := rpcs.RegisterLedgerService(ledgerServiceConfig, ledger, net, "test genesisID")

up := makeTestUnicastPeer(net, t)
net.peers = append(net.peers, up)
ls.Start()

fs := rpcs.RegisterWsFetcherService(logging.TestingLog(t), net)
up := makeTestUnicastPeer(net, version, t)
net.peers = append(net.peers, up)

_, ok := net.GetPeers(network.PeersConnectedIn)[0].(network.UnicastPeer)
require.True(t, ok)
factory := MakeNetworkFetcherFactory(net, numberOfPeers, fs)
factory.log = logging.TestingLog(t)
fetcher := factory.NewOverGossip(protocol.UniCatchupReqTag)
// we have one peer, the Ws block server
require.Equal(t, fetcher.NumPeers(), 1)
fs := rpcs.RegisterWsFetcherService(logging.TestingLog(t), net)

var block *bookkeeping.Block
var cert *agreement.Certificate
var client FetcherClient
_, ok := net.GetPeers(network.PeersConnectedIn)[0].(network.UnicastPeer)
require.True(t, ok)
factory := MakeNetworkFetcherFactory(net, numberOfPeers, fs)
factory.log = logging.TestingLog(t)
fetcher := factory.NewOverGossip(protocol.UniCatchupReqTag)
// we have one peer, the Ws block server
require.Equal(t, fetcher.NumPeers(), 1)

start := time.Now()
block, cert, client, err = fetcher.FetchBlock(context.Background(), next)
require.NotNil(t, client)
require.NoError(t, err)
end := time.Now()
require.True(t, end.Sub(start) < 10*time.Second)
require.Equal(t, &b, block)
if err == nil {
require.NotEqual(t, nil, block)
require.NotEqual(t, nil, cert)
var block *bookkeeping.Block
var cert *agreement.Certificate
var client FetcherClient

// start := time.Now()
block, cert, client, err = fetcher.FetchBlock(context.Background(), next)
require.NotNil(t, client)
require.NoError(t, err)
// end := time.Now()
// require.True(t, end.Sub(start) < 10*time.Second)
require.Equal(t, &b, block)
if err == nil {
require.NotEqual(t, nil, block)
require.NotEqual(t, nil, cert)
}
fetcher.Close()
}
fetcher.Close()
}
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,9 @@ type Local struct {
// DisableOutgoingConnectionThrottling disables the connection throttling of the network library, which
// allow the network library to continuesly disconnect relays based on their relative ( and absolute ) performance.
DisableOutgoingConnectionThrottling bool

// NetworkProtocolVersion overrides network protocol version ( if present )
NetworkProtocolVersion string
}

// Filenames of config files within the configdir (e.g. ~/.algorand)
Expand Down
1 change: 1 addition & 0 deletions config/local_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ var defaultLocalV6 = Local{
LogSizeLimit: 1073741824,
MaxConnectionsPerIP: 30,
NetAddress: "",
NetworkProtocolVersion: "",
NodeExporterListenAddress: ":9100",
NodeExporterPath: "./node_exporter",
OutgoingMessageFilterBucketCount: 3,
Expand Down
11 changes: 11 additions & 0 deletions network/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,23 @@ import (
"github.com/algorand/go-algorand/crypto"
)

// Constant strings used as keys for topics
const (
requestHashKey = "RequestHash"
ErrorKey = "Error" // used for passing an error message
)

// Topic is a key-value pair
type Topic struct {
key string
data []byte
}

// MakeTopic Creates a Topic
func MakeTopic(key string, data []byte) Topic {
return Topic{key: key, data: data}
}

// Topics is an array of type Topic
// The maximum number of topics allowed is 32
// Each topic key can be 64 characters long and cannot be size 0
Expand Down
8 changes: 6 additions & 2 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,10 @@ func (wn *WebsocketNetwork) setup() {
wn.connPerfMonitor = makeConnectionPerformanceMonitor([]Tag{protocol.AgreementVoteTag, protocol.TxnTag})
wn.lastNetworkAdvance = time.Now().UTC()
wn.handlers.log = wn.log

if wn.config.NetworkProtocolVersion != "" {
SupportedProtocolVersions = []string{wn.config.NetworkProtocolVersion}
}
}

func (wn *WebsocketNetwork) rlimitIncomingConnections() error {
Expand Down Expand Up @@ -1056,7 +1060,7 @@ func (wn *WebsocketNetwork) messageHandlerThread() {
case Broadcast:
wn.Broadcast(wn.ctx, msg.Tag, msg.Data, false, msg.Sender)
case Respond:
msg.Sender.(*wsPeer).Respond(wn.ctx, msg, outmsg)
msg.Sender.(*wsPeer).Respond(wn.ctx, msg, outmsg.Topics)
default:
}
case <-inactivityCheckTicker.C:
Expand Down Expand Up @@ -1618,7 +1622,7 @@ const ProtocolVersionHeader = "X-Algorand-Version"
const ProtocolAcceptVersionHeader = "X-Algorand-Accept-Version"

// SupportedProtocolVersions contains the list of supported protocol versions by this node ( in order of preference ).
var SupportedProtocolVersions = []string{ /*"2",*/ "1"}
var SupportedProtocolVersions = []string{"2.1", "1"}

// ProtocolVersion is the current version attached to the ProtocolVersionHeader header
const ProtocolVersion = "1"
Expand Down
20 changes: 14 additions & 6 deletions network/wsPeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,10 @@ type UnicastPeer interface {
GetAddress() string
// Unicast sends the given bytes to this specific peer. Does not wait for message to be sent.
Unicast(ctx context.Context, data []byte, tag protocol.Tag) error
// Version returns the matching version from network.SupportedProtocolVersions
Version() string
Request(ctx context.Context, tag Tag, topics Topics) (resp *Response, e error)
Respond(ctx context.Context, reqMsg IncomingMessage, topics Topics) (e error)
}

// Create a wsPeerCore object
Expand Down Expand Up @@ -252,6 +256,11 @@ func (wp *wsPeerCore) PrepareURL(rawURL string) string {
return strings.Replace(rawURL, "{genesisID}", wp.net.GenesisID, -1)
}

// Version returns the matching version from network.SupportedProtocolVersions
func (wp *wsPeer) Version() string {
return wp.version
}

// Unicast sends the given bytes to this specific peer. Does not wait for message to be sent.
// (Implements UnicastPeer)
func (wp *wsPeer) Unicast(ctx context.Context, msg []byte, tag protocol.Tag) error {
Expand All @@ -276,19 +285,18 @@ func (wp *wsPeer) Unicast(ctx context.Context, msg []byte, tag protocol.Tag) err
}

// Respond sends the response of a request message
func (wp *wsPeer) Respond(ctx context.Context, reqMsg IncomingMessage, respMsg OutgoingMessage) (e error) {
func (wp *wsPeer) Respond(ctx context.Context, reqMsg IncomingMessage, responseTopics Topics) (e error) {

// Get the hash/key of the request message
requestHash := hashTopics(reqMsg.Data)

topics := respMsg.Topics
// Add the request hash
requestHashData := make([]byte, binary.MaxVarintLen64)
binary.PutUvarint(requestHashData, requestHash)
topics = append(topics, Topic{key: "RequestHash", data: requestHashData})
responseTopics = append(responseTopics, Topic{key: requestHashKey, data: requestHashData})

// Serialize the topics
serializedMsg := topics.MarshallTopics()
serializedMsg := responseTopics.MarshallTopics()

// Send serializedMsg
select {
Expand Down Expand Up @@ -418,9 +426,9 @@ func (wp *wsPeer) readLoop() {
wp.net.log.Warnf("wsPeer readLoop: could not read the message from: %s %s", wp.conn.RemoteAddr().String(), err)
continue
}
requestHash, found := topics.GetValue("RequestHash")
requestHash, found := topics.GetValue(requestHashKey)
if !found {
wp.net.log.Warnf("wsPeer readLoop: message from %s is missing the RequestHash", wp.conn.RemoteAddr().String())
wp.net.log.Warnf("wsPeer readLoop: message from %s is missing the %s", wp.conn.RemoteAddr().String(), requestHashKey)
continue
}
hashKey, _ := binary.Uvarint(requestHash)
Expand Down
Loading