Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions components/mocks/mockNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ func (network *MockNetwork) RegisterHandlers(dispatch []network.TaggedMessageHan
func (network *MockNetwork) ClearHandlers() {
}

// RegisterProcessors - empty implementation.
func (network *MockNetwork) RegisterProcessors(dispatch []network.TaggedMessageProcessor) {
}

// ClearProcessors - empty implementation
func (network *MockNetwork) ClearProcessors() {
}

// RegisterHTTPHandler - empty implementation
func (network *MockNetwork) RegisterHTTPHandler(path string, handler http.Handler) {
}
Expand Down
173 changes: 148 additions & 25 deletions data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,21 @@ func (handler *TxHandler) Start() {
handler.net.RegisterHandlers([]network.TaggedMessageHandler{
{Tag: protocol.TxnTag, MessageHandler: network.HandlerFunc(handler.processIncomingTxn)},
})

handler.net.RegisterProcessors([]network.TaggedMessageProcessor{
{
Tag: protocol.TxnTag,
// create anonymous struct to hold the two functions and satisfy the network.MessageProcessor interface
MessageProcessor: struct {
network.ProcessorValidateFunc
network.ProcessorHandleFunc
}{
network.ProcessorValidateFunc(handler.validateIncomingTxMessage),
network.ProcessorHandleFunc(handler.processIncomingTxMessage),
},
},
})

handler.backlogWg.Add(2)
go handler.backlogWorker()
go handler.backlogGaugeThread()
Expand Down Expand Up @@ -530,7 +545,7 @@ func (handler *TxHandler) deleteFromCaches(msgKey *crypto.Digest, canonicalKey *

// dedupCanonical checks if the transaction group has been seen before after reencoding to canonical representation.
// returns a key used for insertion if the group was not found.
func (handler *TxHandler) dedupCanonical(ntx int, unverifiedTxGroup []transactions.SignedTxn, consumed int) (key *crypto.Digest, isDup bool) {
func (handler *TxHandler) dedupCanonical(unverifiedTxGroup []transactions.SignedTxn, consumed int) (key *crypto.Digest, isDup bool) {
// consider situations where someone want to censor transactions A
// 1. Txn A is not part of a group => txn A with a valid signature is OK
// Censorship attempts are:
Expand All @@ -547,6 +562,7 @@ func (handler *TxHandler) dedupCanonical(ntx int, unverifiedTxGroup []transactio
// - using individual txn from a group: {A, Z} could be poisoned by {A, B}, where B is invalid

var d crypto.Digest
ntx := len(unverifiedTxGroup)
if ntx == 1 {
// a single transaction => cache/dedup canonical txn with its signature
enc := unverifiedTxGroup[0].MarshalMsg(nil)
Expand Down Expand Up @@ -574,49 +590,52 @@ func (handler *TxHandler) dedupCanonical(ntx int, unverifiedTxGroup []transactio
return &d, false
}

// processIncomingTxn decodes a transaction group from incoming message and enqueues into the back log for processing.
// The function also performs some input data pre-validation;
// - txn groups are cut to MaxTxGroupSize size
// - message are checked for duplicates
// - transactions are checked for duplicates

func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) network.OutgoingMessage {
// incomingMsgDupErlCheck runs the duplicate and rate limiting checks on a raw incoming messages.
// Returns:
// - the key used for insertion if the message was not found in the cache
// - the capacity guard returned by the elastic rate limiter
// - a boolean indicating if the message was a duplicate or the sender is rate limited
func (handler *TxHandler) incomingMsgDupErlCheck(data []byte, sender network.DisconnectablePeer) (*crypto.Digest, *util.ErlCapacityGuard, bool) {
Comment thread
algorandskiy marked this conversation as resolved.
var msgKey *crypto.Digest
var capguard *util.ErlCapacityGuard
var isDup bool
if handler.msgCache != nil {
// check for duplicate messages
// this helps against relaying duplicates
if msgKey, isDup = handler.msgCache.CheckAndPut(rawmsg.Data); isDup {
if msgKey, isDup = handler.msgCache.CheckAndPut(data); isDup {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gossipsub has its own duplicate checker too, depending on the message hash function implementation? I wonder how often you will hit this when gossipsub is in front of it

transactionMessagesDupRawMsg.Inc(nil)
return network.OutgoingMessage{Action: network.Ignore}
return msgKey, capguard, true
}
}

unverifiedTxGroup := make([]transactions.SignedTxn, 1)
dec := protocol.NewMsgpDecoderBytes(rawmsg.Data)
ntx := 0
consumed := 0

var err error
var capguard *util.ErlCapacityGuard
if handler.erl != nil {
congestedERL := float64(cap(handler.backlogQueue))*handler.backlogCongestionThreshold < float64(len(handler.backlogQueue))
// consume a capacity unit
// if the elastic rate limiter cannot vend a capacity, the error it returns
// is sufficient to indicate that we should enable Congestion Control, because
// an issue in vending capacity indicates the underlying resource (TXBacklog) is full
capguard, err = handler.erl.ConsumeCapacity(rawmsg.Sender.(util.ErlClient))
capguard, err = handler.erl.ConsumeCapacity(sender.(util.ErlClient))
if err != nil {
handler.erl.EnableCongestionControl()
// if there is no capacity, it is the same as if we failed to put the item onto the backlog, so report such
transactionMessagesDroppedFromBacklog.Inc(nil)
return network.OutgoingMessage{Action: network.Ignore}
return msgKey, capguard, true
}
// if the backlog Queue has 50% of its buffer back, turn congestion control off
if !congestedERL {
handler.erl.DisableCongestionControl()
}
}
return msgKey, capguard, false
}

// decodeMsg decodes TX message buffer into transactions.SignedTxn,
// and returns number of bytes consumed from the buffer and a boolean indicating if the message was invalid.
func decodeMsg(data []byte) (unverifiedTxGroup []transactions.SignedTxn, consumed int, invalid bool) {
unverifiedTxGroup = make([]transactions.SignedTxn, 1)
dec := protocol.NewMsgpDecoderBytes(data)
ntx := 0

for {
if len(unverifiedTxGroup) == ntx {
Expand All @@ -630,7 +649,7 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net
break
}
logging.Base().Warnf("Received a non-decodable txn: %v", err)
return network.OutgoingMessage{Action: network.Disconnect}
return nil, 0, true
}
consumed = dec.Consumed()
ntx++
Expand All @@ -639,13 +658,13 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net
if dec.Remaining() > 0 {
// if something else left in the buffer - this is an error, drop
transactionMessageTxGroupExcessive.Inc(nil)
return network.OutgoingMessage{Action: network.Disconnect}
return nil, 0, true
}
}
}
if ntx == 0 {
logging.Base().Warnf("Received empty tx group")
return network.OutgoingMessage{Action: network.Disconnect}
return nil, 0, true
}

unverifiedTxGroup = unverifiedTxGroup[:ntx]
Expand All @@ -654,22 +673,57 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net
transactionMessageTxGroupFull.Inc(nil)
}

return unverifiedTxGroup, consumed, false
}

// incomingTxGroupDupRateLimit checks
// - if the incoming transaction group has been seen before after reencoding to canonical representation, and
// - if the sender is rate limited by the per-application rate limiter.
func (handler *TxHandler) incomingTxGroupDupRateLimit(unverifiedTxGroup []transactions.SignedTxn, encodedExpectedSize int, sender network.DisconnectablePeer) (*crypto.Digest, bool) {
var canonicalKey *crypto.Digest
if handler.txCanonicalCache != nil {
if canonicalKey, isDup = handler.dedupCanonical(ntx, unverifiedTxGroup, consumed); isDup {
var isDup bool
if canonicalKey, isDup = handler.dedupCanonical(unverifiedTxGroup, encodedExpectedSize); isDup {
transactionMessagesDupCanonical.Inc(nil)
return network.OutgoingMessage{Action: network.Ignore}
return canonicalKey, true
}
}

// rate limit per application in a group. Limiting any app in a group drops the entire message.
if handler.appLimiter != nil {
congestedARL := len(handler.backlogQueue) > handler.appLimiterBacklogThreshold
if congestedARL && handler.appLimiter.shouldDrop(unverifiedTxGroup, rawmsg.Sender.(network.IPAddressable).RoutingAddr()) {
if congestedARL && handler.appLimiter.shouldDrop(unverifiedTxGroup, sender.(network.IPAddressable).RoutingAddr()) {
transactionMessagesAppLimiterDrop.Inc(nil)
return network.OutgoingMessage{Action: network.Ignore}
return canonicalKey, true
}
}
return canonicalKey, false
}

// processIncomingTxn decodes a transaction group from incoming message and enqueues into the back log for processing.
// The function also performs some input data pre-validation;
// - txn groups are cut to MaxTxGroupSize size
// - message are checked for duplicates
// - transactions are checked for duplicates
func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) network.OutgoingMessage {
msgKey, capguard, shouldDrop := handler.incomingMsgDupErlCheck(rawmsg.Data, rawmsg.Sender)
if shouldDrop {
// this TX message was found in the duplicate cache, or ERL rate-limited it
return network.OutgoingMessage{Action: network.Ignore}
}

unverifiedTxGroup, consumed, invalid := decodeMsg(rawmsg.Data)
if invalid {
// invalid encoding or exceeding txgroup, disconnect from this peer
return network.OutgoingMessage{Action: network.Disconnect}
}

canonicalKey, drop := handler.incomingTxGroupDupRateLimit(unverifiedTxGroup, consumed, rawmsg.Sender)
if drop {
Comment thread
algorandskiy marked this conversation as resolved.
// this re-serialized txgroup was detected as a duplicate by the canonical message cache,
// or it was rate-limited by the per-app rate limiter
return network.OutgoingMessage{Action: network.Ignore}
}

select {
case handler.backlogQueue <- &txBacklogMsg{
Expand All @@ -696,6 +750,75 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net
return network.OutgoingMessage{Action: network.Ignore}
}

type validatedIncomingTxMessage struct {
rawmsg network.IncomingMessage
unverifiedTxGroup []transactions.SignedTxn
msgKey *crypto.Digest
canonicalKey *crypto.Digest
capguard *util.ErlCapacityGuard
}

// validateIncomingTxMessage is the validator for the MessageProcessor implementation used by P2PNetwork.
func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessage) network.ValidatedMessage {
Comment thread
algorandskiy marked this conversation as resolved.
msgKey, capguard, shouldDrop := handler.incomingMsgDupErlCheck(rawmsg.Data, rawmsg.Sender)
if shouldDrop {
// this TX message was found in the duplicate cache, or ERL rate-limited it
return network.ValidatedMessage{Action: network.Ignore, ValidatorData: nil}
}

unverifiedTxGroup, consumed, invalid := decodeMsg(rawmsg.Data)
if invalid {
// invalid encoding or exceeding txgroup, disconnect from this peer
return network.ValidatedMessage{Action: network.Disconnect, ValidatorData: nil}
}

canonicalKey, drop := handler.incomingTxGroupDupRateLimit(unverifiedTxGroup, consumed, rawmsg.Sender)
if drop {
// this re-serialized txgroup was detected as a duplicate by the canonical message cache,
// or it was rate-limited by the per-app rate limiter
return network.ValidatedMessage{Action: network.Ignore, ValidatorData: nil}
}

return network.ValidatedMessage{
Action: network.Accept,
Tag: rawmsg.Tag,
ValidatorData: &validatedIncomingTxMessage{
rawmsg: rawmsg,
unverifiedTxGroup: unverifiedTxGroup,
msgKey: msgKey,
canonicalKey: canonicalKey,
capguard: capguard,
},
}
}

// processIncomingTxMessage is the handler for the MessageProcessor implementation used by P2PNetwork.
func (handler *TxHandler) processIncomingTxMessage(validatedMessage network.ValidatedMessage) network.OutgoingMessage {
Comment thread
algorandskiy marked this conversation as resolved.
msg := validatedMessage.ValidatorData.(*validatedIncomingTxMessage)
select {
case handler.backlogQueue <- &txBacklogMsg{
rawmsg: &msg.rawmsg,
unverifiedTxGroup: msg.unverifiedTxGroup,
rawmsgDataHash: msg.msgKey,
unverifiedTxGroupHash: msg.canonicalKey,
capguard: msg.capguard,
}:
default:
// if we failed here we want to increase the corresponding metric. It might suggest that we
// want to increase the queue size.
transactionMessagesDroppedFromBacklog.Inc(nil)

// additionally, remove the txn from duplicate caches to ensure it can be re-submitted
if handler.txCanonicalCache != nil && msg.canonicalKey != nil {
handler.txCanonicalCache.Delete(msg.canonicalKey)
}
if handler.msgCache != nil && msg.msgKey != nil {
handler.msgCache.DeleteByKey(msg.msgKey)
}
}
return network.OutgoingMessage{Action: network.Ignore}
}

var errBackLogFullLocal = errors.New("backlog full")

// LocalTransaction is a special shortcut handler for local transactions and intended to be used
Expand Down
53 changes: 51 additions & 2 deletions network/gossipNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ type GossipNode interface {
// ClearHandlers deregisters all the existing message handlers.
ClearHandlers()

// RegisterProcessors adds to the set of given message processors.
RegisterProcessors(dispatch []TaggedMessageProcessor)

// ClearProcessors deregisters all the existing message processors.
ClearProcessors()

// GetHTTPClient returns a http.Client with a suitable for the network Transport
// that would also limit the number of outgoing connections.
GetHTTPClient(address string) (*http.Client, error)
Expand Down Expand Up @@ -162,6 +168,14 @@ type OutgoingMessage struct {
OnRelease func()
}

// ValidatedMessage is a message that has been validated and is ready to be processed.
// Think as an intermediate one between IncomingMessage and OutgoingMessage
type ValidatedMessage struct {
Action ForwardingPolicy
Tag Tag
ValidatorData interface{}
}

// ForwardingPolicy is an enum indicating to whom we should send a message
//
//msgp:ignore ForwardingPolicy
Expand All @@ -179,6 +193,9 @@ const (

// Respond - reply to the sender
Respond

// Accept - accept for further processing after successful validation
Accept
)

// MessageHandler takes a IncomingMessage (e.g., vote, transaction), processes it, and returns what (if anything)
Expand All @@ -189,20 +206,52 @@ type MessageHandler interface {
Handle(message IncomingMessage) OutgoingMessage
}

// HandlerFunc represents an implemenation of the MessageHandler interface
// HandlerFunc represents an implementation of the MessageHandler interface
type HandlerFunc func(message IncomingMessage) OutgoingMessage

// Handle implements MessageHandler.Handle, calling the handler with the IncomingKessage and returning the OutgoingMessage
// Handle implements MessageHandler.Handle, calling the handler with the IncomingMessage and returning the OutgoingMessage
func (f HandlerFunc) Handle(message IncomingMessage) OutgoingMessage {
return f(message)
}

// MessageProcessor takes a IncomingMessage (e.g., vote, transaction), processes it, and returns what (if anything)
// to send to the network in response.
// This is an extension of the MessageHandler that works in two stages: validate ->[result]-> handle.
type MessageProcessor interface {
Validate(message IncomingMessage) ValidatedMessage
Handle(message ValidatedMessage) OutgoingMessage
}

// ProcessorValidateFunc represents an implementation of the MessageProcessor interface
type ProcessorValidateFunc func(message IncomingMessage) ValidatedMessage

// ProcessorHandleFunc represents an implementation of the MessageProcessor interface
type ProcessorHandleFunc func(message ValidatedMessage) OutgoingMessage

// Validate implements MessageProcessor.Validate, calling the validator with the IncomingMessage and returning the action
// and validation extra data that can be use as the handler input.
func (f ProcessorValidateFunc) Validate(message IncomingMessage) ValidatedMessage {
return f(message)
}

// Handle implements MessageProcessor.Handle calling the handler with the ValidatedMessage and returning the OutgoingMessage
func (f ProcessorHandleFunc) Handle(message ValidatedMessage) OutgoingMessage {
return f(message)
}

// TaggedMessageHandler receives one type of broadcast messages
type TaggedMessageHandler struct {
Tag
MessageHandler
}

// TaggedMessageProcessor receives one type of broadcast messages
// and performs two stage processing: validating and handling
type TaggedMessageProcessor struct {
Tag
MessageProcessor
}

// Propagate is a convenience function to save typing in the common case of a message handler telling us to propagate an incoming message
// "return network.Propagate(msg)" instead of "return network.OutgoingMsg{network.Broadcast, msg.Tag, msg.Data}"
func Propagate(msg IncomingMessage) OutgoingMessage {
Expand Down
12 changes: 12 additions & 0 deletions network/hybridNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,18 @@ func (n *HybridP2PNetwork) ClearHandlers() {
n.wsNetwork.ClearHandlers()
}

// RegisterProcessors adds to the set of given message handlers.
func (n *HybridP2PNetwork) RegisterProcessors(dispatch []TaggedMessageProcessor) {
n.p2pNetwork.RegisterProcessors(dispatch)
n.wsNetwork.RegisterProcessors(dispatch)
}

// ClearProcessors deregisters all the existing message handlers.
func (n *HybridP2PNetwork) ClearProcessors() {
n.p2pNetwork.ClearProcessors()
n.wsNetwork.ClearProcessors()
}

// GetHTTPClient returns a http.Client with a suitable for the network Transport
// that would also limit the number of outgoing connections.
func (n *HybridP2PNetwork) GetHTTPClient(address string) (*http.Client, error) {
Expand Down
Loading