Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 117 additions & 23 deletions data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/algorand/go-algorand/util"
"github.com/algorand/go-algorand/util/execpool"
"github.com/algorand/go-algorand/util/metrics"
"github.com/algorand/go-deadlock"
)

var transactionMessagesHandled = metrics.MakeCounter(metrics.TransactionMessagesHandled)
Expand Down Expand Up @@ -128,6 +129,7 @@ type TxHandler struct {
streamVerifierChan chan execpool.InputJob
streamVerifierDropped chan *verify.UnverifiedTxnSigJob
erl *util.ElasticRateLimiter
erlClientMapper erlClientMapper
appLimiter *appRateLimiter
appLimiterBacklogThreshold int
appLimiterCountERLDrops bool
Expand Down Expand Up @@ -200,6 +202,10 @@ func MakeTxHandler(opts TxHandlerOpts) (*TxHandler, error) {
time.Duration(opts.Config.TxBacklogServiceRateWindowSeconds)*time.Second,
txBacklogDroppedCongestionManagement,
)
handler.erlClientMapper = erlClientMapper{
mapping: make(map[string]*erlIPClient),
maxClients: opts.Config.MaxConnectionsPerIP,
}
}
if opts.Config.EnableTxBacklogAppRateLimiting {
handler.appLimiter = makeAppRateLimiter(
Expand Down Expand Up @@ -616,32 +622,116 @@ func (handler *TxHandler) incomingMsgDupCheck(data []byte) (crypto.Digest, bool)
return msgKey, false
}

// erlClientMapper handles erlIPClient creation from erlClient
// in order to map multiple clients to a single IP address.
// Then that meta erlIPClient is supposed to be supplied to ERL
type erlClientMapper struct {
m deadlock.RWMutex
mapping map[string]*erlIPClient
maxClients int
}

// getClient returns erlIPClient for a given sender
func (mp *erlClientMapper) getClient(sender network.DisconnectableAddressablePeer) util.ErlClient {
addr := string(sender.RoutingAddr())
Comment thread
algorandskiy marked this conversation as resolved.
ec := sender.(util.ErlClient)

// check if the client is already known
// typically one client sends lots of messages so more much more reads than writes.
// handle with a quick read lock, and if not found, create a new one with a write lock
mp.m.RLock()
ipClient, has := mp.mapping[addr]
mp.m.RUnlock()

if !has {
ipClient = mp.getClientByAddr(addr)
}

ipClient.register(ec)
return ipClient
}

// getClientByAddr is internal helper to get or create a new erlIPClient
// with write lock held
func (mp *erlClientMapper) getClientByAddr(addr string) *erlIPClient {
mp.m.Lock()
Comment thread
cce marked this conversation as resolved.
defer mp.m.Unlock()

ipClient, has := mp.mapping[addr]
if !has {
ipClient = &erlIPClient{
clients: make(map[util.ErlClient]struct{}, mp.maxClients),
Comment thread
cce marked this conversation as resolved.
}
mp.mapping[addr] = ipClient
}
return ipClient
}

type erlIPClient struct {
util.ErlClient
m deadlock.RWMutex
clients map[util.ErlClient]struct{}
closer func()
}

func (eic *erlIPClient) OnClose(f func()) {
eic.m.Lock()
defer eic.m.Unlock()
eic.closer = f
}

// register registers a new client to the erlIPClient
// by adding a helper closer function to track connection closures
func (eic *erlIPClient) register(ec util.ErlClient) {
eic.m.Lock()
defer eic.m.Unlock()
if _, has := eic.clients[ec]; has {
// this peer is known => noop
return
}
eic.clients[ec] = struct{}{}

ec.OnClose(func() {
eic.connClosed(ec)
})
}

// connClosed is called when a connection is closed so that
// erlIPClient removes the client from its list of clients
// and calls the closer function if there are no more clients
func (eic *erlIPClient) connClosed(ec util.ErlClient) {
eic.m.Lock()
defer eic.m.Unlock()
delete(eic.clients, ec)
empty := len(eic.clients) == 0
// if no elements left, call the closer
if empty && eic.closer != nil {
eic.closer()
eic.closer = nil
}
}

// incomingMsgErlCheck runs the rate limiting check on a sender.
// Returns:
// - the capacity guard returned by the elastic rate limiter
// - a boolean indicating if the sender is rate limited
func (handler *TxHandler) incomingMsgErlCheck(sender network.DisconnectableAddressablePeer) (*util.ErlCapacityGuard, bool) {
var capguard *util.ErlCapacityGuard
var isCMEnabled bool
var err error
if handler.erl != nil {
Comment thread
algorandskiy marked this conversation as resolved.
congestedERL := float64(cap(handler.backlogQueue))*handler.backlogCongestionThreshold < float64(len(handler.backlogQueue))
// consume a capacity unit
// if the elastic rate limiter cannot vend a capacity, the error it returns
// is sufficient to indicate that we should enable Congestion Control, because
// an issue in vending capacity indicates the underlying resource (TXBacklog) is full
capguard, isCMEnabled, err = handler.erl.ConsumeCapacity(sender.(util.ErlClient))
if err != nil { // did ERL ask to enable congestion control?
handler.erl.EnableCongestionControl()
// if there is no capacity, it is the same as if we failed to put the item onto the backlog, so report such
transactionMessagesDroppedFromBacklog.Inc(nil)
return capguard, true
} else if !isCMEnabled && congestedERL { // is CM not currently enabled, but queue is congested?
handler.erl.EnableCongestionControl()
} else if !congestedERL {
// if the backlog Queue has 50% of its buffer back, turn congestion control off
handler.erl.DisableCongestionControl()
}
func (handler *TxHandler) incomingMsgErlCheck(sender util.ErlClient) (*util.ErlCapacityGuard, bool) {
congestedERL := float64(cap(handler.backlogQueue))*handler.backlogCongestionThreshold < float64(len(handler.backlogQueue))
// consume a capacity unit
// if the elastic rate limiter cannot vend a capacity, the error it returns
// is sufficient to indicate that we should enable Congestion Control, because
// an issue in vending capacity indicates the underlying resource (TXBacklog) is full
capguard, isCMEnabled, err := handler.erl.ConsumeCapacity(sender)
if err != nil { // did ERL ask to enable congestion control?
handler.erl.EnableCongestionControl()
// if there is no capacity, it is the same as if we failed to put the item onto the backlog, so report such
transactionMessagesDroppedFromBacklog.Inc(nil)
return capguard, true
} else if !isCMEnabled && congestedERL { // is CM not currently enabled, but queue is congested?
handler.erl.EnableCongestionControl()
} else if !congestedERL {
// if the backlog Queue has 50% of its buffer back, turn congestion control off
handler.erl.DisableCongestionControl()
}
return capguard, false
}
Expand Down Expand Up @@ -731,7 +821,11 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net
return network.OutgoingMessage{Action: network.Ignore}
}

capguard, shouldDrop := handler.incomingMsgErlCheck(rawmsg.Sender)
var capguard *util.ErlCapacityGuard
if handler.erl != nil {
client := handler.erlClientMapper.getClient(rawmsg.Sender)
capguard, shouldDrop = handler.incomingMsgErlCheck(client)
}
accepted := false
defer func() {
// if we failed to put the item onto the backlog, we should release the capacity if any
Expand Down
176 changes: 176 additions & 0 deletions data/txHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/algorand/go-algorand/network"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/test/partitiontest"
"github.com/algorand/go-algorand/util"
"github.com/algorand/go-algorand/util/execpool"
"github.com/algorand/go-algorand/util/metrics"
)
Expand Down Expand Up @@ -2822,3 +2823,178 @@ func TestTxHandlerValidateIncomingTxMessage(t *testing.T) {
require.Equal(t, outmsg.Action, network.Disconnect)
})
}

// Create mock types to satisfy interfaces
type erlMockPeer struct {
network.DisconnectableAddressablePeer
util.ErlClient
addr string
closer func()
}

func newErlMockPeer(addr string) *erlMockPeer {
return &erlMockPeer{
addr: addr,
}
}

// Implement required interface methods
func (m *erlMockPeer) RoutingAddr() []byte { return []byte(m.addr) }
func (m *erlMockPeer) OnClose(f func()) { m.closer = f }

func TestTxHandlerErlClientMapper(t *testing.T) {
partitiontest.PartitionTest(t)
t.Parallel()

t.Run("Same routing address clients share erlIPClient", func(t *testing.T) {
mapper := erlClientMapper{
mapping: make(map[string]*erlIPClient),
maxClients: 4,
}

peer1 := newErlMockPeer("192.168.1.1")
peer2 := newErlMockPeer("192.168.1.1")

client1 := mapper.getClient(peer1)
client2 := mapper.getClient(peer2)

// Verify both peers got same erlIPClient
require.Equal(t, client1, client2, "Expected same erlIPClient for same routing address")
require.Equal(t, 1, len(mapper.mapping))

ipClient := mapper.mapping["192.168.1.1"]
require.Equal(t, 2, len(ipClient.clients))
})

t.Run("Different routing addresses get different erlIPClients", func(t *testing.T) {
mapper := erlClientMapper{
mapping: make(map[string]*erlIPClient),
maxClients: 4,
}

peer1 := newErlMockPeer("192.168.1.1")
peer2 := newErlMockPeer("192.168.1.2")

client1 := mapper.getClient(peer1)
client2 := mapper.getClient(peer2)

// Verify peers got different erlIPClients
require.NotEqual(t, client1, client2, "Expected different erlIPClients for different routing addresses")
require.Equal(t, 2, len(mapper.mapping))
})

t.Run("Client cleanup on connection close", func(t *testing.T) {
mapper := erlClientMapper{
mapping: make(map[string]*erlIPClient),
maxClients: 4,
}

peer1 := newErlMockPeer("192.168.1.1")
peer2 := newErlMockPeer("192.168.1.1")

// Register clients for both peers
mapper.getClient(peer1)
mapper.getClient(peer2)

ipClient := mapper.mapping["192.168.1.1"]
closerCalled := false
ipClient.OnClose(func() {
closerCalled = true
})

require.Equal(t, 2, len(ipClient.clients))

// Simulate connection close for peer1
peer1.closer()
require.Equal(t, 1, len(ipClient.clients))
require.False(t, closerCalled)

// Simulate connection close for peer2
peer2.closer()
require.Equal(t, 0, len(ipClient.clients))
require.True(t, closerCalled)
})
}

// TestTxHandlerERLIPClient checks that ERL properly handles sender with the same and different addresses:
// Configure ERL in following way:
// 1. Small maxCapacity=10 fully shared by two IP senders (TxBacklogReservedCapacityPerPeer=5, IncomingConnectionsLimit=0)
// 2. Submit one from both IP senders to initalize per peer-queues and exhaust shared capacity
// 3. Make sure the third peer does not come through
// 4. Make sure extra messages from the first peer and second peer are accepted
func TestTxHandlerERLIPClient(t *testing.T) {
partitiontest.PartitionTest(t)
t.Parallel()

// technically we don't need any users for this test
// but we need to create the genesis accounts to prevent this warning:
// "cannot start evaluator: overflowed subtracting rewards for block 1"
_, _, genesis := makeTestGenesisAccounts(t, 0)
genBal := bookkeeping.MakeGenesisBalances(genesis, sinkAddr, poolAddr)
ledgerName := fmt.Sprintf("%s-mem", t.Name())
const inMem = true

log := logging.TestingLog(t)
log.SetLevel(logging.Panic)

const backlogSize = 10 // to have targetRateRefreshTicks: bsize / 10 != 0 in NewREDCongestionManager
cfg := config.GetDefaultLocal()
cfg.TxIncomingFilteringFlags = 0 // disable duplicate filtering to simplify the test
cfg.IncomingConnectionsLimit = 0 // disable incoming connections limit to have TxBacklogSize controlled
cfg.EnableTxBacklogRateLimiting = true
cfg.EnableTxBacklogAppRateLimiting = false
cfg.TxBacklogServiceRateWindowSeconds = 100 // large window
cfg.TxBacklogRateLimitingCongestionPct = 0 // always congested
cfg.TxBacklogReservedCapacityPerPeer = 5 // 5 messages per peer (IP address in our case)
cfg.TxBacklogSize = backlogSize
l, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg)
require.NoError(t, err)
defer l.Close()

handler, err := makeTestTxHandler(l, cfg)
require.NoError(t, err)
defer handler.txVerificationPool.Shutdown()
defer close(handler.streamVerifierDropped)
require.NotNil(t, handler.erl)
require.Nil(t, handler.appLimiter)
handler.erl.Start()
defer handler.erl.Stop()

var addr1, addr2 basics.Address
crypto.RandBytes(addr1[:])
crypto.RandBytes(addr2[:])

tx := getTransaction(addr1, addr2, 1)

signedTx := tx.Sign(keypair()) // some random key
blob := protocol.Encode(&signedTx)
sender1 := newErlMockPeer("1")
sender2 := newErlMockPeer("2")
sender3 := newErlMockPeer("3")

// initialize peer queues
action := handler.processIncomingTxn(network.IncomingMessage{Data: blob, Sender: sender1})
require.Equal(t, network.OutgoingMessage{Action: network.Ignore}, action)
require.Equal(t, 1, len(handler.backlogQueue))

action = handler.processIncomingTxn(network.IncomingMessage{Data: blob, Sender: sender2})
require.Equal(t, network.OutgoingMessage{Action: network.Ignore}, action)
require.Equal(t, 2, len(handler.backlogQueue))

// make sure the third peer does not come through
action = handler.processIncomingTxn(network.IncomingMessage{Data: blob, Sender: sender3})
require.Equal(t, network.OutgoingMessage{Action: network.Ignore}, action)
require.Equal(t, 2, len(handler.backlogQueue))

// make sure messages from other sender objects with the same IP are accepted
sender11 := newErlMockPeer("1")
sender21 := newErlMockPeer("2")

action = handler.processIncomingTxn(network.IncomingMessage{Data: blob, Sender: sender11})
require.Equal(t, network.OutgoingMessage{Action: network.Ignore}, action)
require.Equal(t, 3, len(handler.backlogQueue))

action = handler.processIncomingTxn(network.IncomingMessage{Data: blob, Sender: sender21})
require.Equal(t, network.OutgoingMessage{Action: network.Ignore}, action)
require.Equal(t, 4, len(handler.backlogQueue))
}