From 3480d3c2c3e7e7bbc77f5e4dc1926415cbfdf966 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Mon, 8 Apr 2024 16:45:14 -0400 Subject: [PATCH 01/10] split processIncomingTxn to smaller methods --- data/txHandler.go | 76 +++++++++++++++++++++++++++++++---------------- 1 file changed, 51 insertions(+), 25 deletions(-) diff --git a/data/txHandler.go b/data/txHandler.go index 3d20e95acd..fc7f3167f0 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -530,7 +530,7 @@ func (handler *TxHandler) deleteFromCaches(msgKey *crypto.Digest, canonicalKey * // dedupCanonical checks if the transaction group has been seen before after reencoding to canonical representation. // returns a key used for insertion if the group was not found. -func (handler *TxHandler) dedupCanonical(ntx int, unverifiedTxGroup []transactions.SignedTxn, consumed int) (key *crypto.Digest, isDup bool) { +func (handler *TxHandler) dedupCanonical(unverifiedTxGroup []transactions.SignedTxn, consumed int) (key *crypto.Digest, isDup bool) { // consider situations where someone want to censor transactions A // 1. Txn A is not part of a group => txn A with a valid signature is OK // Censorship attempts are: @@ -547,6 +547,7 @@ func (handler *TxHandler) dedupCanonical(ntx int, unverifiedTxGroup []transactio // - using individual txn from a group: {A, Z} could be poisoned by {A, B}, where B is invalid var d crypto.Digest + ntx := len(unverifiedTxGroup) if ntx == 1 { // a single transaction => cache/dedup canonical txn with its signature enc := unverifiedTxGroup[0].MarshalMsg(nil) @@ -574,49 +575,46 @@ func (handler *TxHandler) dedupCanonical(ntx int, unverifiedTxGroup []transactio return &d, false } -// processIncomingTxn decodes a transaction group from incoming message and enqueues into the back log for processing. -// The function also performs some input data pre-validation; -// - txn groups are cut to MaxTxGroupSize size -// - message are checked for duplicates -// - transactions are checked for duplicates - -func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) network.OutgoingMessage { +func (handler *TxHandler) incomingMsgDupErlCheck(data []byte, sender network.DisconnectablePeer) (*crypto.Digest, *util.ErlCapacityGuard, bool) { var msgKey *crypto.Digest + var capguard *util.ErlCapacityGuard var isDup bool if handler.msgCache != nil { // check for duplicate messages // this helps against relaying duplicates - if msgKey, isDup = handler.msgCache.CheckAndPut(rawmsg.Data); isDup { + if msgKey, isDup = handler.msgCache.CheckAndPut(data); isDup { transactionMessagesDupRawMsg.Inc(nil) - return network.OutgoingMessage{Action: network.Ignore} + return msgKey, capguard, true } } - unverifiedTxGroup := make([]transactions.SignedTxn, 1) - dec := protocol.NewMsgpDecoderBytes(rawmsg.Data) - ntx := 0 - consumed := 0 - var err error - var capguard *util.ErlCapacityGuard if handler.erl != nil { congestedERL := float64(cap(handler.backlogQueue))*handler.backlogCongestionThreshold < float64(len(handler.backlogQueue)) // consume a capacity unit // if the elastic rate limiter cannot vend a capacity, the error it returns // is sufficient to indicate that we should enable Congestion Control, because // an issue in vending capacity indicates the underlying resource (TXBacklog) is full - capguard, err = handler.erl.ConsumeCapacity(rawmsg.Sender.(util.ErlClient)) + capguard, err = handler.erl.ConsumeCapacity(sender.(util.ErlClient)) if err != nil { handler.erl.EnableCongestionControl() // if there is no capacity, it is the same as if we failed to put the item onto the backlog, so report such transactionMessagesDroppedFromBacklog.Inc(nil) - return network.OutgoingMessage{Action: network.Ignore} + return msgKey, capguard, true } // if the backlog Queue has 50% of its buffer back, turn congestion control off if !congestedERL { handler.erl.DisableCongestionControl() } } + return msgKey, capguard, false +} + +func decodeMsg(data []byte) ([]transactions.SignedTxn, int, bool) { + unverifiedTxGroup := make([]transactions.SignedTxn, 1) + dec := protocol.NewMsgpDecoderBytes(data) + ntx := 0 + consumed := 0 for { if len(unverifiedTxGroup) == ntx { @@ -630,7 +628,7 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net break } logging.Base().Warnf("Received a non-decodable txn: %v", err) - return network.OutgoingMessage{Action: network.Disconnect} + return nil, 0, true } consumed = dec.Consumed() ntx++ @@ -639,13 +637,13 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net if dec.Remaining() > 0 { // if something else left in the buffer - this is an error, drop transactionMessageTxGroupExcessive.Inc(nil) - return network.OutgoingMessage{Action: network.Disconnect} + return nil, 0, true } } } if ntx == 0 { logging.Base().Warnf("Received empty tx group") - return network.OutgoingMessage{Action: network.Disconnect} + return nil, 0, true } unverifiedTxGroup = unverifiedTxGroup[:ntx] @@ -654,22 +652,50 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net transactionMessageTxGroupFull.Inc(nil) } + return unverifiedTxGroup, consumed, false +} + +func (handler *TxHandler) incomingTxGroupDupRateLimit(unverifiedTxGroup []transactions.SignedTxn, encodedExpectedSize int, sender network.DisconnectablePeer) (*crypto.Digest, bool) { var canonicalKey *crypto.Digest if handler.txCanonicalCache != nil { - if canonicalKey, isDup = handler.dedupCanonical(ntx, unverifiedTxGroup, consumed); isDup { + var isDup bool + if canonicalKey, isDup = handler.dedupCanonical(unverifiedTxGroup, encodedExpectedSize); isDup { transactionMessagesDupCanonical.Inc(nil) - return network.OutgoingMessage{Action: network.Ignore} + return canonicalKey, true } } // rate limit per application in a group. Limiting any app in a group drops the entire message. if handler.appLimiter != nil { congestedARL := len(handler.backlogQueue) > handler.appLimiterBacklogThreshold - if congestedARL && handler.appLimiter.shouldDrop(unverifiedTxGroup, rawmsg.Sender.(network.IPAddressable).RoutingAddr()) { + if congestedARL && handler.appLimiter.shouldDrop(unverifiedTxGroup, sender.(network.IPAddressable).RoutingAddr()) { transactionMessagesAppLimiterDrop.Inc(nil) - return network.OutgoingMessage{Action: network.Ignore} + return canonicalKey, true } } + return canonicalKey, false +} + +// processIncomingTxn decodes a transaction group from incoming message and enqueues into the back log for processing. +// The function also performs some input data pre-validation; +// - txn groups are cut to MaxTxGroupSize size +// - message are checked for duplicates +// - transactions are checked for duplicates +func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) network.OutgoingMessage { + msgKey, capguard, isDup := handler.incomingMsgDupErlCheck(rawmsg.Data, rawmsg.Sender) + if isDup { + return network.OutgoingMessage{Action: network.Ignore} + } + + unverifiedTxGroup, consumed, drop := decodeMsg(rawmsg.Data) + if drop { + return network.OutgoingMessage{Action: network.Disconnect} + } + + canonicalKey, drop := handler.incomingTxGroupDupRateLimit(unverifiedTxGroup, consumed, rawmsg.Sender) + if drop { + return network.OutgoingMessage{Action: network.Ignore} + } select { case handler.backlogQueue <- &txBacklogMsg{ From 1539c06720a31d4e676cae5975f7cc3fa2a0c283 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Mon, 8 Apr 2024 16:57:55 -0400 Subject: [PATCH 02/10] add validateIncomingTxMessage and processIncomingTxMessage --- data/txHandler.go | 60 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/data/txHandler.go b/data/txHandler.go index fc7f3167f0..5713f970fc 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -243,6 +243,7 @@ func (handler *TxHandler) Start() { handler.net.RegisterHandlers([]network.TaggedMessageHandler{ {Tag: protocol.TxnTag, MessageHandler: network.HandlerFunc(handler.processIncomingTxn)}, }) + handler.backlogWg.Add(2) go handler.backlogWorker() go handler.backlogGaugeThread() @@ -722,6 +723,65 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net return network.OutgoingMessage{Action: network.Ignore} } +type validatedIncomingTxMessage struct { + rawmsg network.IncomingMessage + unverifiedTxGroup []transactions.SignedTxn + msgKey *crypto.Digest + canonicalKey *crypto.Digest + capguard *util.ErlCapacityGuard +} + +func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessage) (network.ForwardingPolicy, interface{}) { + msgKey, capguard, isDup := handler.incomingMsgDupErlCheck(rawmsg.Data, rawmsg.Sender) + if isDup { + return network.Ignore, nil + } + + unverifiedTxGroup, consumed, drop := decodeMsg(rawmsg.Data) + if drop { + return network.Disconnect, nil + } + + canonicalKey, drop := handler.incomingTxGroupDupRateLimit(unverifiedTxGroup, consumed, rawmsg.Sender) + if drop { + return network.Ignore, nil + } + + return network.Ignore, &validatedIncomingTxMessage{ + rawmsg: rawmsg, + unverifiedTxGroup: unverifiedTxGroup, + msgKey: msgKey, + canonicalKey: canonicalKey, + capguard: capguard, + } +} + +func (handler *TxHandler) processIncomingTxMessage(validatedTxMessage interface{}) network.OutgoingMessage { + msg := validatedTxMessage.(*validatedIncomingTxMessage) + select { + case handler.backlogQueue <- &txBacklogMsg{ + rawmsg: &msg.rawmsg, + unverifiedTxGroup: msg.unverifiedTxGroup, + rawmsgDataHash: msg.msgKey, + unverifiedTxGroupHash: msg.canonicalKey, + capguard: msg.capguard, + }: + default: + // if we failed here we want to increase the corresponding metric. It might suggest that we + // want to increase the queue size. + transactionMessagesDroppedFromBacklog.Inc(nil) + + // additionally, remove the txn from duplicate caches to ensure it can be re-submitted + if handler.txCanonicalCache != nil && msg.canonicalKey != nil { + handler.txCanonicalCache.Delete(msg.canonicalKey) + } + if handler.msgCache != nil && msg.msgKey != nil { + handler.msgCache.DeleteByKey(msg.msgKey) + } + } + return network.OutgoingMessage{Action: network.Ignore} +} + var errBackLogFullLocal = errors.New("backlog full") // LocalTransaction is a special shortcut handler for local transactions and intended to be used From 1bf6c2ef070ad4010e4049637ef6036676c5bec3 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Mon, 8 Apr 2024 18:33:10 -0400 Subject: [PATCH 03/10] implement TaggetMessageProcessors --- components/mocks/mockNetwork.go | 6 +++ data/txHandler.go | 42 +++++++++++----- network/gossipNode.go | 50 ++++++++++++++++++- network/hybridNetwork.go | 12 +++++ network/multiplexer.go | 86 ++++++++++++++++++++++++++++++++- network/p2pNetwork.go | 26 ++++++++-- network/wsNetwork.go | 8 +++ 7 files changed, 209 insertions(+), 21 deletions(-) diff --git a/components/mocks/mockNetwork.go b/components/mocks/mockNetwork.go index 7e1ab29126..4c74176a0d 100644 --- a/components/mocks/mockNetwork.go +++ b/components/mocks/mockNetwork.go @@ -91,6 +91,12 @@ func (network *MockNetwork) RegisterHandlers(dispatch []network.TaggedMessageHan func (network *MockNetwork) ClearHandlers() { } +func (network *MockNetwork) RegisterProcessors(dispatch []network.TaggedMessageProcessor) { +} + +func (network *MockNetwork) ClearProcessors() { +} + // RegisterHTTPHandler - empty implementation func (network *MockNetwork) RegisterHTTPHandler(path string, handler http.Handler) { } diff --git a/data/txHandler.go b/data/txHandler.go index 5713f970fc..4cf55aa635 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -244,6 +244,20 @@ func (handler *TxHandler) Start() { {Tag: protocol.TxnTag, MessageHandler: network.HandlerFunc(handler.processIncomingTxn)}, }) + handler.net.RegisterProcessors([]network.TaggedMessageProcessor{ + { + Tag: protocol.TxnTag, + // create anonymous struct to hold the two functions and satisfy the network.MessageProcessor interface + MessageProcessor: struct { + network.ProcessorValidateFunc + network.ProcessorHandleFunc + }{ + network.ProcessorValidateFunc(handler.validateIncomingTxMessage), + network.ProcessorHandleFunc(handler.processIncomingTxMessage), + }, + }, + }) + handler.backlogWg.Add(2) go handler.backlogWorker() go handler.backlogGaugeThread() @@ -731,33 +745,37 @@ type validatedIncomingTxMessage struct { capguard *util.ErlCapacityGuard } -func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessage) (network.ForwardingPolicy, interface{}) { +func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessage) network.ValidatedMessage { msgKey, capguard, isDup := handler.incomingMsgDupErlCheck(rawmsg.Data, rawmsg.Sender) if isDup { - return network.Ignore, nil + return network.ValidatedMessage{Action: network.Ignore, ValidatorData: nil} } unverifiedTxGroup, consumed, drop := decodeMsg(rawmsg.Data) if drop { - return network.Disconnect, nil + return network.ValidatedMessage{Action: network.Disconnect, ValidatorData: nil} } canonicalKey, drop := handler.incomingTxGroupDupRateLimit(unverifiedTxGroup, consumed, rawmsg.Sender) if drop { - return network.Ignore, nil + return network.ValidatedMessage{Action: network.Ignore, ValidatorData: nil} } - return network.Ignore, &validatedIncomingTxMessage{ - rawmsg: rawmsg, - unverifiedTxGroup: unverifiedTxGroup, - msgKey: msgKey, - canonicalKey: canonicalKey, - capguard: capguard, + return network.ValidatedMessage{ + Action: network.Ignore, + Tag: rawmsg.Tag, + ValidatorData: &validatedIncomingTxMessage{ + rawmsg: rawmsg, + unverifiedTxGroup: unverifiedTxGroup, + msgKey: msgKey, + canonicalKey: canonicalKey, + capguard: capguard, + }, } } -func (handler *TxHandler) processIncomingTxMessage(validatedTxMessage interface{}) network.OutgoingMessage { - msg := validatedTxMessage.(*validatedIncomingTxMessage) +func (handler *TxHandler) processIncomingTxMessage(validatedMessage network.ValidatedMessage) network.OutgoingMessage { + msg := validatedMessage.ValidatorData.(*validatedIncomingTxMessage) select { case handler.backlogQueue <- &txBacklogMsg{ rawmsg: &msg.rawmsg, diff --git a/network/gossipNode.go b/network/gossipNode.go index fb0a415876..280fa44786 100644 --- a/network/gossipNode.go +++ b/network/gossipNode.go @@ -88,6 +88,12 @@ type GossipNode interface { // ClearHandlers deregisters all the existing message handlers. ClearHandlers() + // RegisterProcessors adds to the set of given message processors. + RegisterProcessors(dispatch []TaggedMessageProcessor) + + // ClearProcessors deregisters all the existing message processors. + ClearProcessors() + // GetHTTPClient returns a http.Client with a suitable for the network Transport // that would also limit the number of outgoing connections. GetHTTPClient(address string) (*http.Client, error) @@ -162,6 +168,14 @@ type OutgoingMessage struct { OnRelease func() } +// ValidatedMessage is a message that has been validated and is ready to be processed. +// Think as an intermediate one between IncomingMessage and OutgoingMessage +type ValidatedMessage struct { + Action ForwardingPolicy + Tag Tag + ValidatorData interface{} +} + // ForwardingPolicy is an enum indicating to whom we should send a message // //msgp:ignore ForwardingPolicy @@ -189,20 +203,52 @@ type MessageHandler interface { Handle(message IncomingMessage) OutgoingMessage } -// HandlerFunc represents an implemenation of the MessageHandler interface +// HandlerFunc represents an implementation of the MessageHandler interface type HandlerFunc func(message IncomingMessage) OutgoingMessage -// Handle implements MessageHandler.Handle, calling the handler with the IncomingKessage and returning the OutgoingMessage +// Handle implements MessageHandler.Handle, calling the handler with the IncomingMessage and returning the OutgoingMessage func (f HandlerFunc) Handle(message IncomingMessage) OutgoingMessage { return f(message) } +// MessageProcessor takes a IncomingMessage (e.g., vote, transaction), processes it, and returns what (if anything) +// to send to the network in response. +// This is an extension of the MessageHandler that works in two stages: validate ->[result]-> handle. +type MessageProcessor interface { + Validate(message IncomingMessage) ValidatedMessage + Handle(message ValidatedMessage) OutgoingMessage +} + +// ProcessorValidateFunc represents an implementation of the MessageProcessor interface +type ProcessorValidateFunc func(message IncomingMessage) ValidatedMessage + +// ProcessorHandleFunc represents an implementation of the MessageProcessor interface +type ProcessorHandleFunc func(message ValidatedMessage) OutgoingMessage + +// Validate implements MessageProcessor.Validate, calling the validator with the IncomingMessage and returning the action +// and validation extra data that can be use as the handler input. +func (f ProcessorValidateFunc) Validate(message IncomingMessage) ValidatedMessage { + return f(message) +} + +// Handle implements MessageProcessor.Handle calling the handler with the ValidatedMessage and returning the OutgoingMessage +func (f ProcessorHandleFunc) Handle(message ValidatedMessage) OutgoingMessage { + return f(message) +} + // TaggedMessageHandler receives one type of broadcast messages type TaggedMessageHandler struct { Tag MessageHandler } +// TaggedMessageProcessor receives one type of broadcast messages +// and performs two stage processing: validating and handling +type TaggedMessageProcessor struct { + Tag + MessageProcessor +} + // Propagate is a convenience function to save typing in the common case of a message handler telling us to propagate an incoming message // "return network.Propagate(msg)" instead of "return network.OutgoingMsg{network.Broadcast, msg.Tag, msg.Data}" func Propagate(msg IncomingMessage) OutgoingMessage { diff --git a/network/hybridNetwork.go b/network/hybridNetwork.go index 7abb2ab569..0084a13f75 100644 --- a/network/hybridNetwork.go +++ b/network/hybridNetwork.go @@ -180,6 +180,18 @@ func (n *HybridP2PNetwork) ClearHandlers() { n.wsNetwork.ClearHandlers() } +// RegisterHandlers adds to the set of given message handlers. +func (n *HybridP2PNetwork) RegisterProcessors(dispatch []TaggedMessageProcessor) { + n.p2pNetwork.RegisterProcessors(dispatch) + n.wsNetwork.RegisterProcessors(dispatch) +} + +// ClearHandlers deregisters all the existing message handlers. +func (n *HybridP2PNetwork) ClearProcessors() { + n.p2pNetwork.ClearProcessors() + n.wsNetwork.ClearProcessors() +} + // GetHTTPClient returns a http.Client with a suitable for the network Transport // that would also limit the number of outgoing connections. func (n *HybridP2PNetwork) GetHTTPClient(address string) (*http.Client, error) { diff --git a/network/multiplexer.go b/network/multiplexer.go index 0e97d63f28..3764f08fe0 100644 --- a/network/multiplexer.go +++ b/network/multiplexer.go @@ -24,7 +24,8 @@ import ( // Multiplexer is a message handler that sorts incoming messages by Tag and passes // them along to the relevant message handler for that type of message. type Multiplexer struct { - msgHandlers atomic.Value // stores map[Tag]MessageHandler, an immutable map. + msgHandlers atomic.Value // stores map[Tag]MessageHandler, an immutable map. + msgProcessors atomic.Value // stores map[Tag]MessageProcessor, an immutable map. } // MakeMultiplexer creates an empty Multiplexer @@ -43,7 +44,16 @@ func (m *Multiplexer) getHandlersMap() map[Tag]MessageHandler { return nil } -// Retrives the handler for the given message Tag from the handlers array while taking a read lock. +// getProcessorsMap retrieves the handlers map. +func (m *Multiplexer) getProcessorsMap() map[Tag]MessageProcessor { + val := m.msgProcessors.Load() + if processor, valid := val.(map[Tag]MessageProcessor); valid { + return processor + } + return nil +} + +// Retrieves the handler for the given message Tag from the handlers array while taking a read lock. func (m *Multiplexer) getHandler(tag Tag) (MessageHandler, bool) { if handlers := m.getHandlersMap(); handlers != nil { handler, ok := handlers[tag] @@ -52,6 +62,15 @@ func (m *Multiplexer) getHandler(tag Tag) (MessageHandler, bool) { return nil, false } +// Retrieves the handler for the given message Tag from the handlers array while taking a read lock. +func (m *Multiplexer) getProcessor(tag Tag) (MessageProcessor, bool) { + if mp := m.getProcessorsMap(); mp != nil { + processor, ok := mp[tag] + return processor, ok + } + return nil, false +} + // Handle is the "input" side of the multiplexer. It dispatches the message to the previously defined handler. func (m *Multiplexer) Handle(msg IncomingMessage) OutgoingMessage { handler, ok := m.getHandler(msg.Tag) @@ -63,6 +82,28 @@ func (m *Multiplexer) Handle(msg IncomingMessage) OutgoingMessage { return OutgoingMessage{} } +// Validate is the "input" side of the multiplexer. It dispatches the message to the previously defined handler. +func (m *Multiplexer) Validate(msg IncomingMessage) ValidatedMessage { + handler, ok := m.getProcessor(msg.Tag) + + if ok { + outmsg := handler.Validate(msg) + return outmsg + } + return ValidatedMessage{} +} + +// Handle is the "input" side of the multiplexer. It dispatches the message to the previously defined handler. +func (m *Multiplexer) Process(msg ValidatedMessage) OutgoingMessage { + handler, ok := m.getProcessor(msg.Tag) + + if ok { + outmsg := handler.Handle(msg) + return outmsg + } + return OutgoingMessage{} +} + // RegisterHandlers registers the set of given message handlers. func (m *Multiplexer) RegisterHandlers(dispatch []TaggedMessageHandler) { mp := make(map[Tag]MessageHandler) @@ -103,3 +144,44 @@ func (m *Multiplexer) ClearHandlers(excludeTags []Tag) { m.msgHandlers.Store(newMap) } + +// RegisterHandlers registers the set of given message handlers. +func (m *Multiplexer) RegisterProcessors(dispatch []TaggedMessageProcessor) { + mp := make(map[Tag]MessageProcessor) + if existingMap := m.getProcessorsMap(); existingMap != nil { + for k, v := range existingMap { + mp[k] = v + } + } + for _, v := range dispatch { + if _, has := mp[v.Tag]; has { + panic(fmt.Sprintf("Already registered a handler for tag %v", v.Tag)) + } + mp[v.Tag] = v.MessageProcessor + } + m.msgProcessors.Store(mp) +} + +// ClearHandlers deregisters all the existing message handlers other than the one provided in the excludeTags list +func (m *Multiplexer) ClearProcessors(excludeTags []Tag) { + if len(excludeTags) == 0 { + m.msgProcessors.Store(make(map[Tag]MessageProcessor)) + return + } + + // convert into map, so that we can exclude duplicates. + excludeTagsMap := make(map[Tag]bool) + for _, tag := range excludeTags { + excludeTagsMap[tag] = true + } + + currentProcessorsMap := m.getProcessorsMap() + newMap := make(map[Tag]MessageProcessor, len(excludeTagsMap)) + for tag, handler := range currentProcessorsMap { + if excludeTagsMap[tag] { + newMap[tag] = handler + } + } + + m.msgProcessors.Store(newMap) +} diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index dd830ac5d7..90f4d696ec 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -659,6 +659,16 @@ func (n *P2PNetwork) ClearHandlers() { n.handler.ClearHandlers([]Tag{}) } +// RegisterHandlers adds to the set of given message handlers. +func (n *P2PNetwork) RegisterProcessors(dispatch []TaggedMessageProcessor) { + n.handler.RegisterProcessors(dispatch) +} + +// ClearHandlers deregisters all the existing message handlers. +func (n *P2PNetwork) ClearProcessors() { + n.handler.ClearProcessors([]Tag{}) +} + // GetHTTPClient returns a http.Client with a suitable for the network Transport // that would also limit the number of outgoing connections. func (n *P2PNetwork) GetHTTPClient(address string) (*http.Client, error) { @@ -884,11 +894,12 @@ func (n *P2PNetwork) txTopicHandleLoop() { sub.Cancel() return } + // if we sent the message no need to process it. + if msg.ReceivedFrom == n.service.ID() { + return + } - // discard TX message. - // from gossipsub's point of view, it's just waiting to hear back from the validator, - // and txHandler does all its work in the validator, so we don't need to do anything here - _ = msg + _ = n.handler.Process(msg.ValidatorData.(ValidatedMessage)) // participation or configuration change, cancel subscription and quit if !n.wantTXGossip.Load() { @@ -923,10 +934,15 @@ func (n *P2PNetwork) txTopicValidator(ctx context.Context, peerID peer.ID, msg * peerStats.txReceived.Add(1) n.peerStatsMu.Unlock() - outmsg := n.handler.Handle(inmsg) + outmsg := n.handler.Validate(inmsg) // there was a decision made in the handler about this message switch outmsg.Action { case Ignore: + // TODO: add a new ForwardingPolicy action? + if outmsg.ValidatorData != nil { + msg.ValidatorData = outmsg.ValidatorData + return pubsub.ValidationAccept + } return pubsub.ValidationIgnore case Disconnect: return pubsub.ValidationReject diff --git a/network/wsNetwork.go b/network/wsNetwork.go index 50307f2738..6ef812094b 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -853,6 +853,14 @@ func (wn *WebsocketNetwork) ClearHandlers() { wn.handler.ClearHandlers([]Tag{protocol.PingTag, protocol.PingReplyTag, protocol.NetPrioResponseTag}) } +// RegisterHandlers registers the set of given message handlers. +func (wn *WebsocketNetwork) RegisterProcessors(dispatch []TaggedMessageProcessor) { +} + +// ClearHandlers deregisters all the existing message handlers. +func (wn *WebsocketNetwork) ClearProcessors() { +} + func (wn *WebsocketNetwork) setHeaders(header http.Header) { localTelemetryGUID := wn.log.GetTelemetryGUID() localInstanceName := wn.log.GetInstanceName() From 2ad1742cc7f9aa6811486e7ce23748804b7eb83e Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Mon, 8 Apr 2024 19:21:26 -0400 Subject: [PATCH 04/10] fix linter --- components/mocks/mockNetwork.go | 2 ++ network/hybridNetwork.go | 4 ++-- network/multiplexer.go | 8 ++++---- network/p2pNetwork.go | 4 ++-- network/wsNetwork.go | 4 ++-- 5 files changed, 12 insertions(+), 10 deletions(-) diff --git a/components/mocks/mockNetwork.go b/components/mocks/mockNetwork.go index 4c74176a0d..c0b7724e07 100644 --- a/components/mocks/mockNetwork.go +++ b/components/mocks/mockNetwork.go @@ -91,9 +91,11 @@ func (network *MockNetwork) RegisterHandlers(dispatch []network.TaggedMessageHan func (network *MockNetwork) ClearHandlers() { } +// RegisterProcessors - empty implementation. func (network *MockNetwork) RegisterProcessors(dispatch []network.TaggedMessageProcessor) { } +// ClearProcessors - empty implementation func (network *MockNetwork) ClearProcessors() { } diff --git a/network/hybridNetwork.go b/network/hybridNetwork.go index 0084a13f75..6041d95f9a 100644 --- a/network/hybridNetwork.go +++ b/network/hybridNetwork.go @@ -180,13 +180,13 @@ func (n *HybridP2PNetwork) ClearHandlers() { n.wsNetwork.ClearHandlers() } -// RegisterHandlers adds to the set of given message handlers. +// RegisterProcessors adds to the set of given message handlers. func (n *HybridP2PNetwork) RegisterProcessors(dispatch []TaggedMessageProcessor) { n.p2pNetwork.RegisterProcessors(dispatch) n.wsNetwork.RegisterProcessors(dispatch) } -// ClearHandlers deregisters all the existing message handlers. +// ClearProcessors deregisters all the existing message handlers. func (n *HybridP2PNetwork) ClearProcessors() { n.p2pNetwork.ClearProcessors() n.wsNetwork.ClearProcessors() diff --git a/network/multiplexer.go b/network/multiplexer.go index 3764f08fe0..a95779b91c 100644 --- a/network/multiplexer.go +++ b/network/multiplexer.go @@ -82,7 +82,7 @@ func (m *Multiplexer) Handle(msg IncomingMessage) OutgoingMessage { return OutgoingMessage{} } -// Validate is the "input" side of the multiplexer. It dispatches the message to the previously defined handler. +// Validate is an alternative "input" side of the multiplexer. It dispatches the message to the previously defined validator. func (m *Multiplexer) Validate(msg IncomingMessage) ValidatedMessage { handler, ok := m.getProcessor(msg.Tag) @@ -93,7 +93,7 @@ func (m *Multiplexer) Validate(msg IncomingMessage) ValidatedMessage { return ValidatedMessage{} } -// Handle is the "input" side of the multiplexer. It dispatches the message to the previously defined handler. +// Process is the second step of message handling after validation. It dispatches the message to the previously defined processor. func (m *Multiplexer) Process(msg ValidatedMessage) OutgoingMessage { handler, ok := m.getProcessor(msg.Tag) @@ -145,7 +145,7 @@ func (m *Multiplexer) ClearHandlers(excludeTags []Tag) { m.msgHandlers.Store(newMap) } -// RegisterHandlers registers the set of given message handlers. +// RegisterProcessors registers the set of given message handlers. func (m *Multiplexer) RegisterProcessors(dispatch []TaggedMessageProcessor) { mp := make(map[Tag]MessageProcessor) if existingMap := m.getProcessorsMap(); existingMap != nil { @@ -162,7 +162,7 @@ func (m *Multiplexer) RegisterProcessors(dispatch []TaggedMessageProcessor) { m.msgProcessors.Store(mp) } -// ClearHandlers deregisters all the existing message handlers other than the one provided in the excludeTags list +// ClearProcessors deregisters all the existing message handlers other than the one provided in the excludeTags list func (m *Multiplexer) ClearProcessors(excludeTags []Tag) { if len(excludeTags) == 0 { m.msgProcessors.Store(make(map[Tag]MessageProcessor)) diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index 90f4d696ec..ab489f13b9 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -659,12 +659,12 @@ func (n *P2PNetwork) ClearHandlers() { n.handler.ClearHandlers([]Tag{}) } -// RegisterHandlers adds to the set of given message handlers. +// RegisterProcessors adds to the set of given message handlers. func (n *P2PNetwork) RegisterProcessors(dispatch []TaggedMessageProcessor) { n.handler.RegisterProcessors(dispatch) } -// ClearHandlers deregisters all the existing message handlers. +// ClearProcessors deregisters all the existing message handlers. func (n *P2PNetwork) ClearProcessors() { n.handler.ClearProcessors([]Tag{}) } diff --git a/network/wsNetwork.go b/network/wsNetwork.go index 6ef812094b..13e0fe71f5 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -853,11 +853,11 @@ func (wn *WebsocketNetwork) ClearHandlers() { wn.handler.ClearHandlers([]Tag{protocol.PingTag, protocol.PingReplyTag, protocol.NetPrioResponseTag}) } -// RegisterHandlers registers the set of given message handlers. +// RegisterProcessors registers the set of given message handlers. func (wn *WebsocketNetwork) RegisterProcessors(dispatch []TaggedMessageProcessor) { } -// ClearHandlers deregisters all the existing message handlers. +// ClearProcessors deregisters all the existing message handlers. func (wn *WebsocketNetwork) ClearProcessors() { } From ba4942d2844d26bdfc15011908ed05f6c8753620 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Mon, 8 Apr 2024 20:22:28 -0400 Subject: [PATCH 05/10] fix tests --- data/txHandler.go | 2 +- network/gossipNode.go | 3 ++ network/p2pNetwork.go | 8 +-- network/p2pNetwork_test.go | 108 +++++++++++++++++++++++++++---------- 4 files changed, 85 insertions(+), 36 deletions(-) diff --git a/data/txHandler.go b/data/txHandler.go index 4cf55aa635..cc4c2526a9 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -762,7 +762,7 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa } return network.ValidatedMessage{ - Action: network.Ignore, + Action: network.Accept, Tag: rawmsg.Tag, ValidatorData: &validatedIncomingTxMessage{ rawmsg: rawmsg, diff --git a/network/gossipNode.go b/network/gossipNode.go index 280fa44786..eeeca95167 100644 --- a/network/gossipNode.go +++ b/network/gossipNode.go @@ -193,6 +193,9 @@ const ( // Respond - reply to the sender Respond + + // Accept - accept for further processing after successful validation + Accept ) // MessageHandler takes a IncomingMessage (e.g., vote, transaction), processes it, and returns what (if anything) diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index ab489f13b9..9f5123765c 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -938,15 +938,11 @@ func (n *P2PNetwork) txTopicValidator(ctx context.Context, peerID peer.ID, msg * // there was a decision made in the handler about this message switch outmsg.Action { case Ignore: - // TODO: add a new ForwardingPolicy action? - if outmsg.ValidatorData != nil { - msg.ValidatorData = outmsg.ValidatorData - return pubsub.ValidationAccept - } return pubsub.ValidationIgnore case Disconnect: return pubsub.ValidationReject - case Broadcast: // TxHandler.processIncomingTxn does not currently return this Action + case Accept: + msg.ValidatorData = outmsg return pubsub.ValidationAccept default: n.log.Warnf("handler returned invalid action %d", outmsg.Action) diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index 4e929723f6..d91402b6fa 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -98,15 +98,26 @@ func TestP2PSubmitTX(t *testing.T) { // now we should be connected in a line: B <-> A <-> C where both B and C are connected to A but not each other // Since we aren't using the transaction handler in this test, we need to register a pass-through handler - passThroughHandler := []TaggedMessageHandler{ - {Tag: protocol.TxnTag, MessageHandler: HandlerFunc(func(msg IncomingMessage) OutgoingMessage { - return OutgoingMessage{Action: Broadcast} - })}, + passThroughHandler := []TaggedMessageProcessor{ + { + Tag: protocol.TxnTag, + MessageProcessor: struct { + ProcessorValidateFunc + ProcessorHandleFunc + }{ + ProcessorValidateFunc(func(msg IncomingMessage) ValidatedMessage { + return ValidatedMessage{Action: Accept, Tag: msg.Tag, ValidatorData: nil} + }), + ProcessorHandleFunc(func(msg ValidatedMessage) OutgoingMessage { + return OutgoingMessage{Action: Ignore} + }), + }, + }, } - netA.RegisterHandlers(passThroughHandler) - netB.RegisterHandlers(passThroughHandler) - netC.RegisterHandlers(passThroughHandler) + netA.RegisterProcessors(passThroughHandler) + netB.RegisterProcessors(passThroughHandler) + netC.RegisterProcessors(passThroughHandler) // send messages from B and confirm that they get received by C (via A) for i := 0; i < 10; i++ { @@ -178,14 +189,26 @@ func TestP2PSubmitTXNoGossip(t *testing.T) { time.Sleep(time.Second) // give time for peers to connect. // ensure netC cannot receive messages - passThroughHandler := []TaggedMessageHandler{ - {Tag: protocol.TxnTag, MessageHandler: HandlerFunc(func(msg IncomingMessage) OutgoingMessage { - return OutgoingMessage{Action: Broadcast} - })}, + + passThroughHandler := []TaggedMessageProcessor{ + { + Tag: protocol.TxnTag, + MessageProcessor: struct { + ProcessorValidateFunc + ProcessorHandleFunc + }{ + ProcessorValidateFunc(func(msg IncomingMessage) ValidatedMessage { + return ValidatedMessage{Action: Accept, Tag: msg.Tag, ValidatorData: nil} + }), + ProcessorHandleFunc(func(msg ValidatedMessage) OutgoingMessage { + return OutgoingMessage{Action: Ignore} + }), + }, + }, } - netB.RegisterHandlers(passThroughHandler) - netC.RegisterHandlers(passThroughHandler) + netB.RegisterProcessors(passThroughHandler) + netC.RegisterProcessors(passThroughHandler) for i := 0; i < 10; i++ { err = netA.Broadcast(context.Background(), protocol.TxnTag, []byte(fmt.Sprintf("test %d", i)), false, nil) require.NoError(t, err) @@ -207,7 +230,7 @@ func TestP2PSubmitTXNoGossip(t *testing.T) { 50*time.Millisecond, ) - // check netB did not receive the messages + // check netC did not receive the messages netC.peerStatsMu.Lock() _, ok := netC.peerStats[netA.service.ID()] netC.peerStatsMu.Unlock() @@ -804,9 +827,33 @@ func TestP2PRelay(t *testing.T) { return netA.hasPeers() && netB.hasPeers() }, 2*time.Second, 50*time.Millisecond) - counter := newMessageCounter(t, 1) - counterDone := counter.done - netA.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}}) + makeCounterHandler := func(numExpected int) ([]TaggedMessageProcessor, *int, chan struct{}) { + numActual := 0 + counterDone := make(chan struct{}) + counterHandler := []TaggedMessageProcessor{ + { + Tag: protocol.TxnTag, + MessageProcessor: struct { + ProcessorValidateFunc + ProcessorHandleFunc + }{ + ProcessorValidateFunc(func(msg IncomingMessage) ValidatedMessage { + return ValidatedMessage{Action: Accept, Tag: msg.Tag, ValidatorData: nil} + }), + ProcessorHandleFunc(func(msg ValidatedMessage) OutgoingMessage { + numActual++ + if numActual >= numExpected { + close(counterDone) + } + return OutgoingMessage{Action: Ignore} + }), + }, + }, + } + return counterHandler, &numActual, counterDone + } + counterHandler, _, counterDone := makeCounterHandler(1) + netA.RegisterProcessors(counterHandler) // send 5 messages from both netB to netA // since there is no node with listening address set => no messages should be received @@ -848,10 +895,9 @@ func TestP2PRelay(t *testing.T) { }, 2*time.Second, 50*time.Millisecond) const expectedMsgs = 10 - counter = newMessageCounter(t, expectedMsgs) - counterDone = counter.done - netA.ClearHandlers() - netA.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}}) + counterHandler, count, counterDone := makeCounterHandler(expectedMsgs) + netA.ClearProcessors() + netA.RegisterProcessors(counterHandler) for i := 0; i < expectedMsgs/2; i++ { err := netB.Relay(context.Background(), protocol.TxnTag, []byte{1, 2, 3, byte(i)}, true, nil) @@ -868,10 +914,10 @@ func TestP2PRelay(t *testing.T) { select { case <-counterDone: case <-time.After(2 * time.Second): - if counter.count < expectedMsgs { - require.Failf(t, "One or more messages failed to reach destination network", "%d > %d", expectedMsgs, counter.count) - } else if counter.count > expectedMsgs { - require.Failf(t, "One or more messages that were expected to be dropped, reached destination network", "%d < %d", expectedMsgs, counter.count) + if *count < expectedMsgs { + require.Failf(t, "One or more messages failed to reach destination network", "%d > %d", expectedMsgs, *count) + } else if *count > expectedMsgs { + require.Failf(t, "One or more messages that were expected to be dropped, reached destination network", "%d < %d", expectedMsgs, *count) } } } @@ -882,14 +928,17 @@ type mockSubPService struct { } type mockSubscription struct { + peerID peer.ID } -func (m *mockSubscription) Next(ctx context.Context) (*pubsub.Message, error) { return nil, nil } -func (m *mockSubscription) Cancel() {} +func (m *mockSubscription) Next(ctx context.Context) (*pubsub.Message, error) { + return &pubsub.Message{ReceivedFrom: m.peerID}, nil +} +func (m *mockSubscription) Cancel() {} func (m *mockSubPService) Subscribe(topic string, val pubsub.ValidatorEx) (p2p.SubNextCancellable, error) { m.count.Add(1) - return &mockSubscription{}, nil + return &mockSubscription{peerID: m.id}, nil } // TestP2PWantTXGossip checks txTopicHandleLoop runs as expected on wantTXGossip changes @@ -900,7 +949,8 @@ func TestP2PWantTXGossip(t *testing.T) { // cancelled context to trigger subscription.Next to return ctx, cancel := context.WithCancel(context.Background()) cancel() - mockService := &mockSubPService{} + peerID := peer.ID("myPeerID") + mockService := &mockSubPService{mockService: mockService{id: peerID}} net := &P2PNetwork{ service: mockService, log: logging.TestingLog(t), From f75c5240528fe1c4eef2777081a95a2cd6086123 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Tue, 9 Apr 2024 14:35:48 -0400 Subject: [PATCH 06/10] generalize multiplexer methods --- network/multiplexer.go | 92 +++++++++++++++++++----------------------- 1 file changed, 41 insertions(+), 51 deletions(-) diff --git a/network/multiplexer.go b/network/multiplexer.go index a95779b91c..2d69259c9d 100644 --- a/network/multiplexer.go +++ b/network/multiplexer.go @@ -31,44 +31,48 @@ type Multiplexer struct { // MakeMultiplexer creates an empty Multiplexer func MakeMultiplexer() *Multiplexer { m := &Multiplexer{} - m.ClearHandlers([]Tag{}) // allocate the map + m.ClearHandlers(nil) // allocate the map + m.ClearProcessors(nil) // allocate the map return m } -// getHandlersMap retrieves the handlers map. -func (m *Multiplexer) getHandlersMap() map[Tag]MessageHandler { - handlersVal := m.msgHandlers.Load() - if handlers, valid := handlersVal.(map[Tag]MessageHandler); valid { +// getMap retrieves a typed map from an atomic.Value. +func getMap[T any](source *atomic.Value) map[Tag]T { + mp := source.Load() + if handlers, valid := mp.(map[Tag]T); valid { return handlers } return nil } -// getProcessorsMap retrieves the handlers map. +// getHandlersMap retrieves the handlers map. +func (m *Multiplexer) getHandlersMap() map[Tag]MessageHandler { + return getMap[MessageHandler](&m.msgHandlers) +} + +// getProcessorsMap retrieves the processors map. func (m *Multiplexer) getProcessorsMap() map[Tag]MessageProcessor { - val := m.msgProcessors.Load() - if processor, valid := val.(map[Tag]MessageProcessor); valid { - return processor - } - return nil + return getMap[MessageProcessor](&m.msgHandlers) } -// Retrieves the handler for the given message Tag from the handlers array while taking a read lock. -func (m *Multiplexer) getHandler(tag Tag) (MessageHandler, bool) { - if handlers := m.getHandlersMap(); handlers != nil { +// Retrieves the handler for the given message Tag from the given value while. +func getHandler[T any](source *atomic.Value, tag Tag) (T, bool) { + if handlers := getMap[T](source); handlers != nil { handler, ok := handlers[tag] return handler, ok } - return nil, false + var empty T + return empty, false } -// Retrieves the handler for the given message Tag from the handlers array while taking a read lock. +// Retrieves the handler for the given message Tag from the handlers array. +func (m *Multiplexer) getHandler(tag Tag) (MessageHandler, bool) { + return getHandler[MessageHandler](&m.msgHandlers, tag) +} + +// Retrieves the processor for the given message Tag from the processors array. func (m *Multiplexer) getProcessor(tag Tag) (MessageProcessor, bool) { - if mp := m.getProcessorsMap(); mp != nil { - processor, ok := mp[tag] - return processor, ok - } - return nil, false + return getHandler[MessageProcessor](&m.msgProcessors, tag) } // Handle is the "input" side of the multiplexer. It dispatches the message to the previously defined handler. @@ -121,30 +125,6 @@ func (m *Multiplexer) RegisterHandlers(dispatch []TaggedMessageHandler) { m.msgHandlers.Store(mp) } -// ClearHandlers deregisters all the existing message handlers other than the one provided in the excludeTags list -func (m *Multiplexer) ClearHandlers(excludeTags []Tag) { - if len(excludeTags) == 0 { - m.msgHandlers.Store(make(map[Tag]MessageHandler)) - return - } - - // convert into map, so that we can exclude duplicates. - excludeTagsMap := make(map[Tag]bool) - for _, tag := range excludeTags { - excludeTagsMap[tag] = true - } - - currentHandlersMap := m.getHandlersMap() - newMap := make(map[Tag]MessageHandler, len(excludeTagsMap)) - for tag, handler := range currentHandlersMap { - if excludeTagsMap[tag] { - newMap[tag] = handler - } - } - - m.msgHandlers.Store(newMap) -} - // RegisterProcessors registers the set of given message handlers. func (m *Multiplexer) RegisterProcessors(dispatch []TaggedMessageProcessor) { mp := make(map[Tag]MessageProcessor) @@ -163,9 +143,9 @@ func (m *Multiplexer) RegisterProcessors(dispatch []TaggedMessageProcessor) { } // ClearProcessors deregisters all the existing message handlers other than the one provided in the excludeTags list -func (m *Multiplexer) ClearProcessors(excludeTags []Tag) { +func clear[T any](target *atomic.Value, excludeTags []Tag) { if len(excludeTags) == 0 { - m.msgProcessors.Store(make(map[Tag]MessageProcessor)) + target.Store(make(map[Tag]T)) return } @@ -175,13 +155,23 @@ func (m *Multiplexer) ClearProcessors(excludeTags []Tag) { excludeTagsMap[tag] = true } - currentProcessorsMap := m.getProcessorsMap() - newMap := make(map[Tag]MessageProcessor, len(excludeTagsMap)) - for tag, handler := range currentProcessorsMap { + currentMap := getMap[T](target) + newMap := make(map[Tag]T, len(excludeTagsMap)) + for tag, handler := range currentMap { if excludeTagsMap[tag] { newMap[tag] = handler } } - m.msgProcessors.Store(newMap) + target.Store(newMap) +} + +// ClearHandlers deregisters all the existing message handlers other than the one provided in the excludeTags list +func (m *Multiplexer) ClearHandlers(excludeTags []Tag) { + clear[MessageHandler](&m.msgHandlers, excludeTags) +} + +// ClearProcessors deregisters all the existing message handlers other than the one provided in the excludeTags list +func (m *Multiplexer) ClearProcessors(excludeTags []Tag) { + clear[MessageProcessor](&m.msgProcessors, excludeTags) } From 54d6e7c6a845b1ce4a0e1995a93ff2c4f59900d7 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Tue, 9 Apr 2024 14:53:16 -0400 Subject: [PATCH 07/10] add txTopicHandleLoop workers --- network/p2pNetwork.go | 50 +++++++++++++++++++++++++------------------ 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index 9f5123765c..f8b627710e 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -884,29 +884,37 @@ func (n *P2PNetwork) txTopicHandleLoop() { } n.log.Debugf("Subscribed to topic %s", p2p.TXTopicName) - for { - msg, err := sub.Next(n.ctx) - if err != nil { - if err != pubsub.ErrSubscriptionCancelled && err != context.Canceled { - n.log.Errorf("Error reading from subscription %v, peerId %s", err, n.service.ID()) - } - n.log.Debugf("Cancelling subscription to topic %s due Subscription.Next error: %v", p2p.TXTopicName, err) - sub.Cancel() - return - } - // if we sent the message no need to process it. - if msg.ReceivedFrom == n.service.ID() { - return - } + var wg sync.WaitGroup + wg.Add(incomingThreads) + defer wg.Wait() + for i := 0; i < incomingThreads; i++ { + go func() { + defer wg.Done() + for { + msg, err := sub.Next(n.ctx) + if err != nil { + if err != pubsub.ErrSubscriptionCancelled && err != context.Canceled { + n.log.Errorf("Error reading from subscription %v, peerId %s", err, n.service.ID()) + } + n.log.Debugf("Cancelling subscription to topic %s due Subscription.Next error: %v", p2p.TXTopicName, err) + sub.Cancel() + return + } + // if we sent the message no need to process it. + if msg.ReceivedFrom == n.service.ID() { + return + } - _ = n.handler.Process(msg.ValidatorData.(ValidatedMessage)) + _ = n.handler.Process(msg.ValidatorData.(ValidatedMessage)) - // participation or configuration change, cancel subscription and quit - if !n.wantTXGossip.Load() { - n.log.Debugf("Cancelling subscription to topic %s due participation change", p2p.TXTopicName) - sub.Cancel() - return - } + // participation or configuration change, cancel subscription and quit + if !n.wantTXGossip.Load() { + n.log.Debugf("Cancelling subscription to topic %s due participation change", p2p.TXTopicName) + sub.Cancel() + return + } + } + }() } } From 2b4e304d77062b6328f6d5f9ab49a3a2af216270 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Tue, 9 Apr 2024 18:41:58 -0400 Subject: [PATCH 08/10] Revert "add txTopicHandleLoop workers" This reverts commit 439e89ecbdcecd1cb6f92f32d17933694c723328. --- network/p2pNetwork.go | 50 ++++++++++++++++++------------------------- 1 file changed, 21 insertions(+), 29 deletions(-) diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index f8b627710e..9f5123765c 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -884,37 +884,29 @@ func (n *P2PNetwork) txTopicHandleLoop() { } n.log.Debugf("Subscribed to topic %s", p2p.TXTopicName) - var wg sync.WaitGroup - wg.Add(incomingThreads) - defer wg.Wait() - for i := 0; i < incomingThreads; i++ { - go func() { - defer wg.Done() - for { - msg, err := sub.Next(n.ctx) - if err != nil { - if err != pubsub.ErrSubscriptionCancelled && err != context.Canceled { - n.log.Errorf("Error reading from subscription %v, peerId %s", err, n.service.ID()) - } - n.log.Debugf("Cancelling subscription to topic %s due Subscription.Next error: %v", p2p.TXTopicName, err) - sub.Cancel() - return - } - // if we sent the message no need to process it. - if msg.ReceivedFrom == n.service.ID() { - return - } + for { + msg, err := sub.Next(n.ctx) + if err != nil { + if err != pubsub.ErrSubscriptionCancelled && err != context.Canceled { + n.log.Errorf("Error reading from subscription %v, peerId %s", err, n.service.ID()) + } + n.log.Debugf("Cancelling subscription to topic %s due Subscription.Next error: %v", p2p.TXTopicName, err) + sub.Cancel() + return + } + // if we sent the message no need to process it. + if msg.ReceivedFrom == n.service.ID() { + return + } - _ = n.handler.Process(msg.ValidatorData.(ValidatedMessage)) + _ = n.handler.Process(msg.ValidatorData.(ValidatedMessage)) - // participation or configuration change, cancel subscription and quit - if !n.wantTXGossip.Load() { - n.log.Debugf("Cancelling subscription to topic %s due participation change", p2p.TXTopicName) - sub.Cancel() - return - } - } - }() + // participation or configuration change, cancel subscription and quit + if !n.wantTXGossip.Load() { + n.log.Debugf("Cancelling subscription to topic %s due participation change", p2p.TXTopicName) + sub.Cancel() + return + } } } From e9d286ff43dcc1aae5300a9477078f36943e4d09 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 9 May 2024 11:04:28 -0400 Subject: [PATCH 09/10] CR fixes --- network/p2pNetwork.go | 4 ++-- network/p2pNetwork_test.go | 19 +++++++++++++++---- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index 9f5123765c..5eaf6ec36f 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -894,9 +894,9 @@ func (n *P2PNetwork) txTopicHandleLoop() { sub.Cancel() return } - // if we sent the message no need to process it. + // if there is a self-sent the message no need to process it. if msg.ReceivedFrom == n.service.ID() { - return + continue } _ = n.handler.Process(msg.ValidatorData.(ValidatedMessage)) diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index d91402b6fa..7cc590ff02 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -18,6 +18,7 @@ package network import ( "context" + "errors" "fmt" "io" "net/http" @@ -924,21 +925,31 @@ func TestP2PRelay(t *testing.T) { type mockSubPService struct { mockService - count atomic.Int64 + count atomic.Int64 + otherPeerID peer.ID + shouldNextFail bool } type mockSubscription struct { - peerID peer.ID + peerID peer.ID + shouldNextFail bool } func (m *mockSubscription) Next(ctx context.Context) (*pubsub.Message, error) { + if m.shouldNextFail { + return nil, errors.New("mockSubscription error") + } return &pubsub.Message{ReceivedFrom: m.peerID}, nil } func (m *mockSubscription) Cancel() {} func (m *mockSubPService) Subscribe(topic string, val pubsub.ValidatorEx) (p2p.SubNextCancellable, error) { m.count.Add(1) - return &mockSubscription{peerID: m.id}, nil + otherPeerID := m.otherPeerID + if otherPeerID == "" { + otherPeerID = "mockSubPServicePeerID" + } + return &mockSubscription{peerID: otherPeerID, shouldNextFail: m.shouldNextFail}, nil } // TestP2PWantTXGossip checks txTopicHandleLoop runs as expected on wantTXGossip changes @@ -950,7 +961,7 @@ func TestP2PWantTXGossip(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() peerID := peer.ID("myPeerID") - mockService := &mockSubPService{mockService: mockService{id: peerID}} + mockService := &mockSubPService{mockService: mockService{id: peerID}, shouldNextFail: true} net := &P2PNetwork{ service: mockService, log: logging.TestingLog(t), From 6f9e914f19a00f56f1ff4c16111e0f820acf7c57 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 10 May 2024 15:45:12 -0400 Subject: [PATCH 10/10] CR fixes --- data/txHandler.go | 41 ++++++++++++++++++++++++++++++----------- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/data/txHandler.go b/data/txHandler.go index cc4c2526a9..895b07269d 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -590,6 +590,11 @@ func (handler *TxHandler) dedupCanonical(unverifiedTxGroup []transactions.Signed return &d, false } +// incomingMsgDupErlCheck runs the duplicate and rate limiting checks on a raw incoming messages. +// Returns: +// - the key used for insertion if the message was not found in the cache +// - the capacity guard returned by the elastic rate limiter +// - a boolean indicating if the message was a duplicate or the sender is rate limited func (handler *TxHandler) incomingMsgDupErlCheck(data []byte, sender network.DisconnectablePeer) (*crypto.Digest, *util.ErlCapacityGuard, bool) { var msgKey *crypto.Digest var capguard *util.ErlCapacityGuard @@ -625,11 +630,12 @@ func (handler *TxHandler) incomingMsgDupErlCheck(data []byte, sender network.Dis return msgKey, capguard, false } -func decodeMsg(data []byte) ([]transactions.SignedTxn, int, bool) { - unverifiedTxGroup := make([]transactions.SignedTxn, 1) +// decodeMsg decodes TX message buffer into transactions.SignedTxn, +// and returns number of bytes consumed from the buffer and a boolean indicating if the message was invalid. +func decodeMsg(data []byte) (unverifiedTxGroup []transactions.SignedTxn, consumed int, invalid bool) { + unverifiedTxGroup = make([]transactions.SignedTxn, 1) dec := protocol.NewMsgpDecoderBytes(data) ntx := 0 - consumed := 0 for { if len(unverifiedTxGroup) == ntx { @@ -670,6 +676,9 @@ func decodeMsg(data []byte) ([]transactions.SignedTxn, int, bool) { return unverifiedTxGroup, consumed, false } +// incomingTxGroupDupRateLimit checks +// - if the incoming transaction group has been seen before after reencoding to canonical representation, and +// - if the sender is rate limited by the per-application rate limiter. func (handler *TxHandler) incomingTxGroupDupRateLimit(unverifiedTxGroup []transactions.SignedTxn, encodedExpectedSize int, sender network.DisconnectablePeer) (*crypto.Digest, bool) { var canonicalKey *crypto.Digest if handler.txCanonicalCache != nil { @@ -697,18 +706,22 @@ func (handler *TxHandler) incomingTxGroupDupRateLimit(unverifiedTxGroup []transa // - message are checked for duplicates // - transactions are checked for duplicates func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) network.OutgoingMessage { - msgKey, capguard, isDup := handler.incomingMsgDupErlCheck(rawmsg.Data, rawmsg.Sender) - if isDup { + msgKey, capguard, shouldDrop := handler.incomingMsgDupErlCheck(rawmsg.Data, rawmsg.Sender) + if shouldDrop { + // this TX message was found in the duplicate cache, or ERL rate-limited it return network.OutgoingMessage{Action: network.Ignore} } - unverifiedTxGroup, consumed, drop := decodeMsg(rawmsg.Data) - if drop { + unverifiedTxGroup, consumed, invalid := decodeMsg(rawmsg.Data) + if invalid { + // invalid encoding or exceeding txgroup, disconnect from this peer return network.OutgoingMessage{Action: network.Disconnect} } canonicalKey, drop := handler.incomingTxGroupDupRateLimit(unverifiedTxGroup, consumed, rawmsg.Sender) if drop { + // this re-serialized txgroup was detected as a duplicate by the canonical message cache, + // or it was rate-limited by the per-app rate limiter return network.OutgoingMessage{Action: network.Ignore} } @@ -745,19 +758,24 @@ type validatedIncomingTxMessage struct { capguard *util.ErlCapacityGuard } +// validateIncomingTxMessage is the validator for the MessageProcessor implementation used by P2PNetwork. func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessage) network.ValidatedMessage { - msgKey, capguard, isDup := handler.incomingMsgDupErlCheck(rawmsg.Data, rawmsg.Sender) - if isDup { + msgKey, capguard, shouldDrop := handler.incomingMsgDupErlCheck(rawmsg.Data, rawmsg.Sender) + if shouldDrop { + // this TX message was found in the duplicate cache, or ERL rate-limited it return network.ValidatedMessage{Action: network.Ignore, ValidatorData: nil} } - unverifiedTxGroup, consumed, drop := decodeMsg(rawmsg.Data) - if drop { + unverifiedTxGroup, consumed, invalid := decodeMsg(rawmsg.Data) + if invalid { + // invalid encoding or exceeding txgroup, disconnect from this peer return network.ValidatedMessage{Action: network.Disconnect, ValidatorData: nil} } canonicalKey, drop := handler.incomingTxGroupDupRateLimit(unverifiedTxGroup, consumed, rawmsg.Sender) if drop { + // this re-serialized txgroup was detected as a duplicate by the canonical message cache, + // or it was rate-limited by the per-app rate limiter return network.ValidatedMessage{Action: network.Ignore, ValidatorData: nil} } @@ -774,6 +792,7 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa } } +// processIncomingTxMessage is the handler for the MessageProcessor implementation used by P2PNetwork. func (handler *TxHandler) processIncomingTxMessage(validatedMessage network.ValidatedMessage) network.OutgoingMessage { msg := validatedMessage.ValidatorData.(*validatedIncomingTxMessage) select {