Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agreement/fuzzer/networkFacade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func (n *NetworkFacade) ReceiveMessage(sourceNode int, tag protocol.Tag, data []
n.pushPendingReceivedMessage()
}

func (n *NetworkFacade) Disconnect(sender network.DisconnectablePeer) {
func (n *NetworkFacade) Disconnect(sender network.DeadlineSettableConn) {
sourceNode := n.peerToNode[sender.(*facadePeer)]
n.fuzzer.Disconnect(n.nodeID, sourceNode)
}
Expand Down
2 changes: 1 addition & 1 deletion agreement/gossip/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (w *whiteholeNetwork) Relay(ctx context.Context, tag protocol.Tag, data []b
func (w *whiteholeNetwork) BroadcastSimple(tag protocol.Tag, data []byte) error {
return w.Broadcast(context.Background(), tag, data, true, nil)
}
func (w *whiteholeNetwork) Disconnect(badnode network.DisconnectablePeer) {
func (w *whiteholeNetwork) Disconnect(badnode network.DeadlineSettableConn) {
return
}
func (w *whiteholeNetwork) DisconnectPeers() {
Expand Down
2 changes: 1 addition & 1 deletion components/mocks/mockNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (network *MockNetwork) RequestConnectOutgoing(replace bool, quit <-chan str
}

// Disconnect - unused function
func (network *MockNetwork) Disconnect(badpeer network.DisconnectablePeer) {
func (network *MockNetwork) Disconnect(badpeer network.DeadlineSettableConn) {
}

// DisconnectPeers - unused function
Expand Down
4 changes: 2 additions & 2 deletions config/localTemplate.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,8 +603,8 @@ type Local struct {
// EnableP2PHybridMode turns on both websockets and P2P networking.
EnableP2PHybridMode bool `version[34]:"false"`

// P2PListenAddress sets the listen address used for P2P networking, if hybrid mode is set.
P2PListenAddress string `version[34]:""`
// P2PNetAddress sets the listen address used for P2P networking, if hybrid mode is set.
P2PNetAddress string `version[34]:""`

// EnableDHT will turn on the hash table for use with capabilities advertisement
EnableDHTProviders bool `version[34]:"false"`
Expand Down
2 changes: 1 addition & 1 deletion config/local_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ var defaultLocal = Local{
OptimizeAccountsDatabaseOnStartup: false,
OutgoingMessageFilterBucketCount: 3,
OutgoingMessageFilterBucketSize: 128,
P2PListenAddress: "",
P2PNetAddress: "",
P2PPersistPeerID: false,
P2PPrivateKeyLocation: "",
ParticipationKeysRefreshInterval: 60000000000,
Expand Down
2 changes: 1 addition & 1 deletion daemon/algod/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func (s *Server) Start() {
fmt.Print("Initializing the Algorand node... ")
err := s.node.Start()
if err != nil {
msg := fmt.Sprintf("Failed to start alg Algorand node: %v", err)
msg := fmt.Sprintf("Failed to start an Algorand node: %v", err)
s.log.Error(msg)
fmt.Println(msg)
os.Exit(1)
Expand Down
32 changes: 15 additions & 17 deletions data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,12 @@ func (handler *TxHandler) Start() {
if handler.msgCache != nil {
handler.msgCache.Start(handler.ctx, 60*time.Second)
}
// wsNetwork handler
handler.net.RegisterHandlers([]network.TaggedMessageHandler{
{Tag: protocol.TxnTag, MessageHandler: network.HandlerFunc(handler.processIncomingTxn)},
})

// libp2p pubsub validator and handler abstracted as TaggedMessageProcessor
handler.net.RegisterProcessors([]network.TaggedMessageProcessor{
{
Tag: protocol.TxnTag,
Expand Down Expand Up @@ -595,7 +597,7 @@ func (handler *TxHandler) dedupCanonical(unverifiedTxGroup []transactions.Signed
// - the key used for insertion if the message was not found in the cache
// - the capacity guard returned by the elastic rate limiter
// - a boolean indicating if the message was a duplicate or the sender is rate limited
func (handler *TxHandler) incomingMsgDupErlCheck(data []byte, sender network.DisconnectablePeer) (*crypto.Digest, *util.ErlCapacityGuard, bool) {
func (handler *TxHandler) incomingMsgDupErlCheck(data []byte, sender network.DeadlineSettableConn) (*crypto.Digest, *util.ErlCapacityGuard, bool) {
var msgKey *crypto.Digest
var capguard *util.ErlCapacityGuard
var isDup bool
Expand Down Expand Up @@ -679,7 +681,7 @@ func decodeMsg(data []byte) (unverifiedTxGroup []transactions.SignedTxn, consume
// incomingTxGroupDupRateLimit checks
// - if the incoming transaction group has been seen before after reencoding to canonical representation, and
// - if the sender is rate limited by the per-application rate limiter.
func (handler *TxHandler) incomingTxGroupDupRateLimit(unverifiedTxGroup []transactions.SignedTxn, encodedExpectedSize int, sender network.DisconnectablePeer) (*crypto.Digest, bool) {
func (handler *TxHandler) incomingTxGroupDupRateLimit(unverifiedTxGroup []transactions.SignedTxn, encodedExpectedSize int, sender network.DeadlineSettableConn) (*crypto.Digest, bool) {
var canonicalKey *crypto.Digest
if handler.txCanonicalCache != nil {
var isDup bool
Expand Down Expand Up @@ -711,11 +713,9 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net
accepted := false
defer func() {
// if we failed to put the item onto the backlog, we should release the capacity if any
if !accepted {
if capguard != nil {
if capErr := capguard.Release(); capErr != nil {
logging.Base().Warnf("processIncomingTxn: failed to release capacity to ElasticRateLimiter: %v", capErr)
}
if !accepted && capguard != nil {
if capErr := capguard.Release(); capErr != nil {
logging.Base().Warnf("processIncomingTxn: failed to release capacity to ElasticRateLimiter: %v", capErr)
}
}
}()
Expand Down Expand Up @@ -779,38 +779,36 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa
accepted := false
defer func() {
// if we failed to put the item onto the backlog, we should release the capacity if any
if !accepted {
if capguard != nil {
if capErr := capguard.Release(); capErr != nil {
logging.Base().Warnf("validateIncomingTxMessage: failed to release capacity to ElasticRateLimiter: %v", capErr)
}
if !accepted && capguard != nil {
if capErr := capguard.Release(); capErr != nil {
logging.Base().Warnf("validateIncomingTxMessage: failed to release capacity to ElasticRateLimiter: %v", capErr)
}
}
}()

if shouldDrop {
// this TX message was found in the duplicate cache, or ERL rate-limited it
return network.ValidatedMessage{Action: network.Ignore, ValidatorData: nil}
return network.ValidatedMessage{Action: network.Ignore, ValidatedMessage: nil}
}

unverifiedTxGroup, consumed, invalid := decodeMsg(rawmsg.Data)
if invalid {
// invalid encoding or exceeding txgroup, disconnect from this peer
return network.ValidatedMessage{Action: network.Disconnect, ValidatorData: nil}
return network.ValidatedMessage{Action: network.Disconnect, ValidatedMessage: nil}
}

canonicalKey, drop := handler.incomingTxGroupDupRateLimit(unverifiedTxGroup, consumed, rawmsg.Sender)
if drop {
// this re-serialized txgroup was detected as a duplicate by the canonical message cache,
// or it was rate-limited by the per-app rate limiter
return network.ValidatedMessage{Action: network.Ignore, ValidatorData: nil}
return network.ValidatedMessage{Action: network.Ignore, ValidatedMessage: nil}
}

accepted = true
return network.ValidatedMessage{
Action: network.Accept,
Tag: rawmsg.Tag,
ValidatorData: &validatedIncomingTxMessage{
ValidatedMessage: &validatedIncomingTxMessage{
rawmsg: rawmsg,
unverifiedTxGroup: unverifiedTxGroup,
msgKey: msgKey,
Expand All @@ -822,7 +820,7 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa

// processIncomingTxMessage is the handler for the MessageProcessor implementation used by P2PNetwork.
func (handler *TxHandler) processIncomingTxMessage(validatedMessage network.ValidatedMessage) network.OutgoingMessage {
msg := validatedMessage.ValidatorData.(*validatedIncomingTxMessage)
msg := validatedMessage.ValidatedMessage.(*validatedIncomingTxMessage)
select {
case handler.backlogQueue <- &txBacklogMsg{
rawmsg: &msg.rawmsg,
Expand Down
2 changes: 1 addition & 1 deletion installer/config.json.example
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
"OptimizeAccountsDatabaseOnStartup": false,
"OutgoingMessageFilterBucketCount": 3,
"OutgoingMessageFilterBucketSize": 128,
"P2PListenAddress": "",
"P2PNetAddress": "",
"P2PPersistPeerID": false,
"P2PPrivateKeyLocation": "",
"ParticipationKeysRefreshInterval": 60000000000,
Expand Down
2 changes: 1 addition & 1 deletion logging/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (l logger) getOutput() io.Writer {
}

func (l logger) SetJSONFormatter() {
l.entry.Logger.Formatter = &logrus.JSONFormatter{TimestampFormat: "2006-01-02T15:04:05.000000Z07:00"}
l.entry.Logger.SetFormatter(&logrus.JSONFormatter{TimestampFormat: "2006-01-02T15:04:05.000000Z07:00"})
}

func (l logger) Entry() *logrus.Entry {
Expand Down
10 changes: 5 additions & 5 deletions netdeploy/remote/nodecfg/nodeDir.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,21 +174,21 @@ func (nd *nodeDir) configureP2PDNSBootstrap(p2pBootstrap bool) error {
}
// ensure p2p config params set are what is expected:
// - EnableP2P or EnableP2PHybridMode
// - NetAddress or P2PListenAddress is set
// - NetAddress or P2PNetAddress is set
// - EnableGossipService
if !nd.config.EnableP2P && !nd.config.EnableP2PHybridMode {
return errors.New("p2p bootstrap requires EnableP2P or EnableP2PHybridMode to be set")
}
if nd.NetAddress == "" && nd.config.P2PListenAddress == "" {
return errors.New("p2p bootstrap requires NetAddress or P2PListenAddress to be set")
if nd.NetAddress == "" && nd.config.P2PNetAddress == "" {
return errors.New("p2p bootstrap requires NetAddress or P2PNetAddress to be set")
}
if !nd.config.EnableGossipService {
return errors.New("p2p bootstrap requires EnableGossipService to be set")
}

netAddress := nd.NetAddress
if nd.config.P2PListenAddress != "" {
netAddress = nd.config.P2PListenAddress
if nd.config.P2PNetAddress != "" {
netAddress = nd.config.P2PNetAddress
}

key, err := p2p.GetPrivKey(config.Local{P2PPersistPeerID: true}, nd.dataDir)
Expand Down
2 changes: 1 addition & 1 deletion network/connPerfMon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func makeMsgPool(N int, peers []Peer) (out []IncomingMessage) {

addMsg := func(msgCount int) {
for i := 0; i < msgCount; i++ {
msg.Sender = peers[(int(msgIndex)+i)%len(peers)].(DisconnectablePeer)
msg.Sender = peers[(int(msgIndex)+i)%len(peers)].(DeadlineSettableConn)
timer += int64(7 * time.Nanosecond)
msg.Received = timer
out = append(out, msg)
Expand Down
14 changes: 7 additions & 7 deletions network/gossipNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
// Peer opaque interface for referring to a neighbor in the network
type Peer interface{}

// DisconnectablePeer is a Peer with a long-living connection to a network that can be disconnected
type DisconnectablePeer interface {
// DeadlineSettableConn is a Peer with a long-living connection to a network that can be disconnected
type DeadlineSettableConn interface {
GetNetwork() GossipNode
}

Expand Down Expand Up @@ -62,7 +62,7 @@ type GossipNode interface {
Address() (string, bool)
Broadcast(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error
Relay(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error
Disconnect(badnode DisconnectablePeer)
Disconnect(badnode DeadlineSettableConn)
DisconnectPeers() // only used by testing

// RegisterHTTPHandler path accepts gorilla/mux path annotations
Expand Down Expand Up @@ -127,7 +127,7 @@ var outgoingMessagesBufferSize = int(

// IncomingMessage represents a message arriving from some peer in our p2p network
type IncomingMessage struct {
Sender DisconnectablePeer
Sender DeadlineSettableConn
Tag Tag
Data []byte
Err error
Expand Down Expand Up @@ -171,9 +171,9 @@ type OutgoingMessage struct {
// ValidatedMessage is a message that has been validated and is ready to be processed.
// Think as an intermediate one between IncomingMessage and OutgoingMessage
type ValidatedMessage struct {
Action ForwardingPolicy
Tag Tag
ValidatorData interface{}
Action ForwardingPolicy
Tag Tag
ValidatedMessage interface{}
}

// ForwardingPolicy is an enum indicating to whom we should send a message
Expand Down
8 changes: 4 additions & 4 deletions network/hybridNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type HybridP2PNetwork struct {
func NewHybridP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebookAddresses []string, genesisID string, networkID protocol.NetworkID, nodeInfo NodeInfo) (*HybridP2PNetwork, error) {
// supply alternate NetAddress for P2P network
p2pcfg := cfg
p2pcfg.NetAddress = cfg.P2PListenAddress
p2pcfg.NetAddress = cfg.P2PNetAddress
p2pnet, err := NewP2PNetwork(log, p2pcfg, datadir, phonebookAddresses, genesisID, networkID, nodeInfo)
if err != nil {
return nil, err
Expand Down Expand Up @@ -115,7 +115,7 @@ func (n *HybridP2PNetwork) Relay(ctx context.Context, tag protocol.Tag, data []b
}

// Disconnect implements GossipNode
func (n *HybridP2PNetwork) Disconnect(badnode DisconnectablePeer) {
func (n *HybridP2PNetwork) Disconnect(badnode DeadlineSettableConn) {
net := badnode.GetNetwork()
if net == n.p2pNetwork {
n.p2pNetwork.Disconnect(badnode)
Expand Down Expand Up @@ -180,13 +180,13 @@ func (n *HybridP2PNetwork) ClearHandlers() {
n.wsNetwork.ClearHandlers()
}

// RegisterProcessors adds to the set of given message handlers.
// RegisterProcessors adds to the set of given message processors.
func (n *HybridP2PNetwork) RegisterProcessors(dispatch []TaggedMessageProcessor) {
n.p2pNetwork.RegisterProcessors(dispatch)
n.wsNetwork.RegisterProcessors(dispatch)
}

// ClearProcessors deregisters all the existing message handlers.
// ClearProcessors deregisters all the existing message processors.
func (n *HybridP2PNetwork) ClearProcessors() {
n.p2pNetwork.ClearProcessors()
n.wsNetwork.ClearProcessors()
Expand Down
23 changes: 14 additions & 9 deletions network/limitcaller/rateLimitingTransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ import (
"time"

"github.com/algorand/go-algorand/util"
"github.com/libp2p/go-libp2p/core/peer"
)

// ConnectionTimeStore is a subset of the phonebook that is used to store the connection times.
type ConnectionTimeStore interface {
GetConnectionWaitTime(addrOrInfo interface{}) (bool, time.Duration, time.Time)
UpdateConnectionTime(addrOrInfo interface{}, provisionalTime time.Time) bool
GetConnectionWaitTime(addrOrPeerID string) (bool, time.Duration, time.Time)
UpdateConnectionTime(addrOrPeerID string, provisionalTime time.Time) bool
}

// RateLimitingTransport is the transport for execute a single HTTP transaction, obtaining the Response for a given Request.
Expand Down Expand Up @@ -64,9 +65,9 @@ func MakeRateLimitingTransport(phonebook ConnectionTimeStore, queueingTimeout ti
}
}

// MakeRateLimitingTransportWithTransport creates a rate limiting http transport that would limit the requests rate
// MakeRateLimitingTransportWithRoundTripper creates a rate limiting http transport that would limit the requests rate
// according to the entries in the phonebook.
func MakeRateLimitingTransportWithTransport(phonebook ConnectionTimeStore, queueingTimeout time.Duration, rt http.RoundTripper, target interface{}, maxIdleConnsPerHost int) RateLimitingTransport {
func MakeRateLimitingTransportWithRoundTripper(phonebook ConnectionTimeStore, queueingTimeout time.Duration, rt http.RoundTripper, target interface{}, maxIdleConnsPerHost int) RateLimitingTransport {
return RateLimitingTransport{
phonebook: phonebook,
innerTransport: rt,
Expand All @@ -81,13 +82,17 @@ func (r *RateLimitingTransport) RoundTrip(req *http.Request) (res *http.Response
var waitTime time.Duration
var provisionalTime time.Time
queueingDeadline := time.Now().Add(r.queueingTimeout)
var host interface{} = req.Host
addrOrPeerID := req.Host
// p2p/http clients have per-connection transport and address info so use that
if len(req.Host) == 0 && req.URL != nil && len(req.URL.Host) == 0 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking out loud — there's no way a weird HTTP request that didn't come through libp2p HTTP mode could force an invalid cast to (*peer.AddrInfo) below for a URL by requesting an empty host or something?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, added a type check and return an error. That's why we practice CR :)

// p2p/http clients have per-connection transport and address info so use that
host = r.targetAddr
addrInfo, ok := r.targetAddr.(*peer.AddrInfo)
if !ok {
return nil, errors.New("rateLimitingTransport: request without Host/URL and targetAddr is not a peer.AddrInfo")
}
addrOrPeerID = string(addrInfo.ID)
}
for {
_, waitTime, provisionalTime = r.phonebook.GetConnectionWaitTime(host)
_, waitTime, provisionalTime = r.phonebook.GetConnectionWaitTime(addrOrPeerID)
if waitTime == 0 {
break // break out of the loop and proceed to the connection
}
Expand All @@ -99,6 +104,6 @@ func (r *RateLimitingTransport) RoundTrip(req *http.Request) (res *http.Response
return nil, ErrConnectionQueueingTimeout
}
res, err = r.innerTransport.RoundTrip(req)
r.phonebook.UpdateConnectionTime(host, provisionalTime)
r.phonebook.UpdateConnectionTime(addrOrPeerID, provisionalTime)
return
}
2 changes: 1 addition & 1 deletion network/p2p/dnsaddr/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func Iterate(initial multiaddr.Multiaddr, controller ResolveController, f func(d
if resolver == nil {
return errors.New("passed controller has no resolvers Iterate")
}
const maxHops = 100
const maxHops = 25 // any reasonable number to prevent infinite loop in case of circular dnsaddr
hops := 0
var toResolve = []multiaddr.Multiaddr{initial}
for resolver != nil && len(toResolve) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion network/p2p/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func MakeHTTPClientWithRateLimit(addrInfo *peer.AddrInfo, pstore limitcaller.Con
if err != nil {
return nil, err
}
rlrt := limitcaller.MakeRateLimitingTransportWithTransport(pstore, queueingTimeout, cl.Transport, addrInfo, maxIdleConnsPerHost)
rlrt := limitcaller.MakeRateLimitingTransportWithRoundTripper(pstore, queueingTimeout, cl.Transport, addrInfo, maxIdleConnsPerHost)
cl.Transport = &rlrt
return cl, nil

Expand Down
3 changes: 2 additions & 1 deletion network/p2p/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ func EnableP2PLogging(log logging.Logger, l logging.Level) {
}

func (c *loggingCore) Enabled(l zapcore.Level) bool {
return c.log.IsLevelEnabled(c.level)
level := levelsMap[l]
return c.log.IsLevelEnabled(level)
}

func (c *loggingCore) With(fields []zapcore.Field) zapcore.Core {
Expand Down
Loading