From e696946adf4df4358f8511b70208e96783c6730d Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Wed, 20 Nov 2024 08:16:39 -0500 Subject: [PATCH 01/11] ERL: Use IP address instead of IP:port pair --- util/rateLimit.go | 17 ++++++++++------- util/rateLimit_test.go | 28 +++++++++++++++++----------- 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/util/rateLimit.go b/util/rateLimit.go index 3fea7f8912..a6d7b4dead 100644 --- a/util/rateLimit.go +++ b/util/rateLimit.go @@ -42,7 +42,7 @@ type ElasticRateLimiter struct { MaxCapacity int CapacityPerReservation int sharedCapacity capacityQueue - capacityByClient map[ErlClient]capacityQueue + capacityByClient map[string]capacityQueue clientLock deadlock.RWMutex // CongestionManager and enable flag cm CongestionManager @@ -53,6 +53,7 @@ type ElasticRateLimiter struct { // ErlClient clients must support OnClose for reservation closing type ErlClient interface { OnClose(func()) + RoutingAddr() []byte } // capacity is an empty structure used for loading and draining queues @@ -122,7 +123,7 @@ func NewElasticRateLimiter( ret := ElasticRateLimiter{ MaxCapacity: maxCapacity, CapacityPerReservation: reservedCapacity, - capacityByClient: map[ErlClient]capacityQueue{}, + capacityByClient: map[string]capacityQueue{}, sharedCapacity: capacityQueue(make(chan capacity, maxCapacity)), congestionControlCounter: conmanCount, } @@ -178,7 +179,7 @@ func (erl *ElasticRateLimiter) ConsumeCapacity(c ErlClient) (*ErlCapacityGuard, var isCMEnabled bool // get the client's queue erl.clientLock.RLock() - q, exists = erl.capacityByClient[c] + q, exists = erl.capacityByClient[string(c.RoutingAddr())] isCMEnabled = erl.enableCM erl.clientLock.RUnlock() @@ -234,7 +235,8 @@ func (erl *ElasticRateLimiter) ConsumeCapacity(c ErlClient) (*ErlCapacityGuard, func (erl *ElasticRateLimiter) openReservation(c ErlClient) (capacityQueue, error) { erl.clientLock.Lock() defer erl.clientLock.Unlock() - if _, exists := erl.capacityByClient[c]; exists { + addr := string(c.RoutingAddr()) + if _, exists := erl.capacityByClient[addr]; exists { return capacityQueue(nil), errERLReservationExists } // guard against overprovisioning, if there is less than a reservedCapacity amount left @@ -244,7 +246,7 @@ func (erl *ElasticRateLimiter) openReservation(c ErlClient) (capacityQueue, erro } // make capacity for the provided client q := capacityQueue(make(chan capacity, erl.CapacityPerReservation)) - erl.capacityByClient[c] = q + erl.capacityByClient[addr] = q // create a thread to drain the capacity from sharedCapacity in a blocking way // and move it to the reservation, also in a blocking way go func() { @@ -261,12 +263,13 @@ func (erl *ElasticRateLimiter) openReservation(c ErlClient) (capacityQueue, erro func (erl *ElasticRateLimiter) closeReservation(c ErlClient) { erl.clientLock.Lock() defer erl.clientLock.Unlock() - q, exists := erl.capacityByClient[c] + addr := string(c.RoutingAddr()) + q, exists := erl.capacityByClient[addr] // guard clauses, and preventing the ElasticRateLimiter from draining its own sharedCapacity if !exists || q == erl.sharedCapacity { return } - delete(erl.capacityByClient, c) + delete(erl.capacityByClient, addr) // start a routine to consume capacity from the closed reservation, and return it to the sharedCapacity go func() { for i := 0; i < erl.CapacityPerReservation; i++ { diff --git a/util/rateLimit_test.go b/util/rateLimit_test.go index 8888bfcf4c..938ba6a657 100644 --- a/util/rateLimit_test.go +++ b/util/rateLimit_test.go @@ -38,6 +38,10 @@ func (c mockClient) OnClose(func()) { return } +func (c mockClient) RoutingAddr() []byte { + return []byte(c) +} + func TestNewElasticRateLimiter(t *testing.T) { partitiontest.PartitionTest(t) erl := NewElasticRateLimiter(100, 10, time.Second, nil) @@ -49,6 +53,7 @@ func TestNewElasticRateLimiter(t *testing.T) { func TestElasticRateLimiterCongestionControlled(t *testing.T) { partitiontest.PartitionTest(t) client := mockClient("client") + clientAddr := string(client.RoutingAddr()) erl := NewElasticRateLimiter(3, 2, time.Second, nil) // give the ERL a congestion controler with well defined behavior for testing erl.cm = mockCongestionControl{} @@ -57,24 +62,24 @@ func TestElasticRateLimiterCongestionControlled(t *testing.T) { // because the ERL gives capacity to a reservation, and then asynchronously drains capacity from the share, // wait a moment before testing the size of the sharedCapacity time.Sleep(100 * time.Millisecond) - assert.Equal(t, 1, len(erl.capacityByClient[client])) + assert.Equal(t, 1, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) erl.EnableCongestionControl() _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[client])) + assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[client])) + assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.Error(t, err) erl.DisableCongestionControl() _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[client])) + assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 0, len(erl.sharedCapacity)) assert.NoError(t, err) } @@ -132,46 +137,47 @@ func TestZeroSizeReservations(t *testing.T) { func TestConsumeReleaseCapacity(t *testing.T) { partitiontest.PartitionTest(t) client := mockClient("client") + clientAddr := string(client.RoutingAddr()) erl := NewElasticRateLimiter(4, 3, time.Second, nil) c1, _, err := erl.ConsumeCapacity(client) // because the ERL gives capacity to a reservation, and then asynchronously drains capacity from the share, // wait a moment before testing the size of the sharedCapacity time.Sleep(100 * time.Millisecond) - assert.Equal(t, 2, len(erl.capacityByClient[client])) + assert.Equal(t, 2, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 1, len(erl.capacityByClient[client])) + assert.Equal(t, 1, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[client])) + assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) // remember this capacity, as it is a shared capacity c4, _, err := erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[client])) + assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 0, len(erl.sharedCapacity)) assert.NoError(t, err) _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[client])) + assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 0, len(erl.sharedCapacity)) assert.Error(t, err) // now release the capacity and observe the items return to the correct places err = c1.Release() - assert.Equal(t, 1, len(erl.capacityByClient[client])) + assert.Equal(t, 1, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 0, len(erl.sharedCapacity)) assert.NoError(t, err) // now release the capacity and observe the items return to the correct places err = c4.Release() - assert.Equal(t, 1, len(erl.capacityByClient[client])) + assert.Equal(t, 1, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) From 39911118fa9c3e956b77a6c08d66bce2e476b484 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Wed, 20 Nov 2024 08:27:42 -0500 Subject: [PATCH 02/11] fix linter --- util/rateLimit_test.go | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/util/rateLimit_test.go b/util/rateLimit_test.go index 938ba6a657..7c09ee3e9a 100644 --- a/util/rateLimit_test.go +++ b/util/rateLimit_test.go @@ -53,7 +53,6 @@ func TestNewElasticRateLimiter(t *testing.T) { func TestElasticRateLimiterCongestionControlled(t *testing.T) { partitiontest.PartitionTest(t) client := mockClient("client") - clientAddr := string(client.RoutingAddr()) erl := NewElasticRateLimiter(3, 2, time.Second, nil) // give the ERL a congestion controler with well defined behavior for testing erl.cm = mockCongestionControl{} @@ -62,24 +61,24 @@ func TestElasticRateLimiterCongestionControlled(t *testing.T) { // because the ERL gives capacity to a reservation, and then asynchronously drains capacity from the share, // wait a moment before testing the size of the sharedCapacity time.Sleep(100 * time.Millisecond) - assert.Equal(t, 1, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 1, len(erl.capacityByClient[string(client.RoutingAddr())])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) erl.EnableCongestionControl() _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 0, len(erl.capacityByClient[string(client.RoutingAddr())])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 0, len(erl.capacityByClient[string(client.RoutingAddr())])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.Error(t, err) erl.DisableCongestionControl() _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 0, len(erl.capacityByClient[string(client.RoutingAddr())])) assert.Equal(t, 0, len(erl.sharedCapacity)) assert.NoError(t, err) } @@ -137,47 +136,46 @@ func TestZeroSizeReservations(t *testing.T) { func TestConsumeReleaseCapacity(t *testing.T) { partitiontest.PartitionTest(t) client := mockClient("client") - clientAddr := string(client.RoutingAddr()) erl := NewElasticRateLimiter(4, 3, time.Second, nil) c1, _, err := erl.ConsumeCapacity(client) // because the ERL gives capacity to a reservation, and then asynchronously drains capacity from the share, // wait a moment before testing the size of the sharedCapacity time.Sleep(100 * time.Millisecond) - assert.Equal(t, 2, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 2, len(erl.capacityByClient[string(client.RoutingAddr())])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 1, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 1, len(erl.capacityByClient[string(client.RoutingAddr())])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 0, len(erl.capacityByClient[string(client.RoutingAddr())])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) // remember this capacity, as it is a shared capacity c4, _, err := erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 0, len(erl.capacityByClient[string(client.RoutingAddr())])) assert.Equal(t, 0, len(erl.sharedCapacity)) assert.NoError(t, err) _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 0, len(erl.capacityByClient[string(client.RoutingAddr())])) assert.Equal(t, 0, len(erl.sharedCapacity)) assert.Error(t, err) // now release the capacity and observe the items return to the correct places err = c1.Release() - assert.Equal(t, 1, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 1, len(erl.capacityByClient[string(client.RoutingAddr())])) assert.Equal(t, 0, len(erl.sharedCapacity)) assert.NoError(t, err) // now release the capacity and observe the items return to the correct places err = c4.Release() - assert.Equal(t, 1, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 1, len(erl.capacityByClient[string(client.RoutingAddr())])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) From 145a7f7b51b7ed68063bf6cd33b8a55ba8e8673d Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 14 Feb 2025 11:54:49 -0500 Subject: [PATCH 03/11] Revert "fix linter" This reverts commit 39911118fa9c3e956b77a6c08d66bce2e476b484. --- util/rateLimit_test.go | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/util/rateLimit_test.go b/util/rateLimit_test.go index 4b3f4007dc..638bc58467 100644 --- a/util/rateLimit_test.go +++ b/util/rateLimit_test.go @@ -53,6 +53,7 @@ func TestNewElasticRateLimiter(t *testing.T) { func TestElasticRateLimiterCongestionControlled(t *testing.T) { partitiontest.PartitionTest(t) client := mockClient("client") + clientAddr := string(client.RoutingAddr()) erl := NewElasticRateLimiter(3, 2, time.Second, nil) // give the ERL a congestion controler with well defined behavior for testing erl.cm = mockCongestionControl{} @@ -61,24 +62,24 @@ func TestElasticRateLimiterCongestionControlled(t *testing.T) { // because the ERL gives capacity to a reservation, and then asynchronously drains capacity from the share, // wait a moment before testing the size of the sharedCapacity time.Sleep(100 * time.Millisecond) - assert.Equal(t, 1, len(erl.capacityByClient[string(client.RoutingAddr())])) + assert.Equal(t, 1, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) erl.EnableCongestionControl() _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[string(client.RoutingAddr())])) + assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[string(client.RoutingAddr())])) + assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.Error(t, err) erl.DisableCongestionControl() _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[string(client.RoutingAddr())])) + assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 0, len(erl.sharedCapacity)) assert.NoError(t, err) } @@ -136,46 +137,47 @@ func TestZeroSizeReservations(t *testing.T) { func TestConsumeReleaseCapacity(t *testing.T) { partitiontest.PartitionTest(t) client := mockClient("client") + clientAddr := string(client.RoutingAddr()) erl := NewElasticRateLimiter(4, 3, time.Second, nil) c1, _, err := erl.ConsumeCapacity(client) // because the ERL gives capacity to a reservation, and then asynchronously drains capacity from the share, // wait a moment before testing the size of the sharedCapacity time.Sleep(100 * time.Millisecond) - assert.Equal(t, 2, len(erl.capacityByClient[string(client.RoutingAddr())])) + assert.Equal(t, 2, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 1, len(erl.capacityByClient[string(client.RoutingAddr())])) + assert.Equal(t, 1, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[string(client.RoutingAddr())])) + assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) // remember this capacity, as it is a shared capacity c4, _, err := erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[string(client.RoutingAddr())])) + assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 0, len(erl.sharedCapacity)) assert.NoError(t, err) _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[string(client.RoutingAddr())])) + assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 0, len(erl.sharedCapacity)) assert.Error(t, err) // now release the capacity and observe the items return to the correct places err = c1.Release() - assert.Equal(t, 1, len(erl.capacityByClient[string(client.RoutingAddr())])) + assert.Equal(t, 1, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 0, len(erl.sharedCapacity)) assert.NoError(t, err) // now release the capacity and observe the items return to the correct places err = c4.Release() - assert.Equal(t, 1, len(erl.capacityByClient[string(client.RoutingAddr())])) + assert.Equal(t, 1, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) From b6abe5af71e7caaea9884f298671895317be6a54 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 14 Feb 2025 11:54:56 -0500 Subject: [PATCH 04/11] Revert "ERL: Use IP address instead of IP:port pair" This reverts commit e696946adf4df4358f8511b70208e96783c6730d. --- util/rateLimit.go | 17 +++++++---------- util/rateLimit_test.go | 28 +++++++++++----------------- 2 files changed, 18 insertions(+), 27 deletions(-) diff --git a/util/rateLimit.go b/util/rateLimit.go index b062c2cafc..8406711418 100644 --- a/util/rateLimit.go +++ b/util/rateLimit.go @@ -42,7 +42,7 @@ type ElasticRateLimiter struct { MaxCapacity int CapacityPerReservation int sharedCapacity capacityQueue - capacityByClient map[string]capacityQueue + capacityByClient map[ErlClient]capacityQueue clientLock deadlock.RWMutex // CongestionManager and enable flag cm CongestionManager @@ -53,7 +53,6 @@ type ElasticRateLimiter struct { // ErlClient clients must support OnClose for reservation closing type ErlClient interface { OnClose(func()) - RoutingAddr() []byte } // capacity is an empty structure used for loading and draining queues @@ -123,7 +122,7 @@ func NewElasticRateLimiter( ret := ElasticRateLimiter{ MaxCapacity: maxCapacity, CapacityPerReservation: reservedCapacity, - capacityByClient: map[string]capacityQueue{}, + capacityByClient: map[ErlClient]capacityQueue{}, sharedCapacity: capacityQueue(make(chan capacity, maxCapacity)), congestionControlCounter: conmanCount, } @@ -179,7 +178,7 @@ func (erl *ElasticRateLimiter) ConsumeCapacity(c ErlClient) (*ErlCapacityGuard, var isCMEnabled bool // get the client's queue erl.clientLock.RLock() - q, exists = erl.capacityByClient[string(c.RoutingAddr())] + q, exists = erl.capacityByClient[c] isCMEnabled = erl.enableCM erl.clientLock.RUnlock() @@ -235,8 +234,7 @@ func (erl *ElasticRateLimiter) ConsumeCapacity(c ErlClient) (*ErlCapacityGuard, func (erl *ElasticRateLimiter) openReservation(c ErlClient) (capacityQueue, error) { erl.clientLock.Lock() defer erl.clientLock.Unlock() - addr := string(c.RoutingAddr()) - if _, exists := erl.capacityByClient[addr]; exists { + if _, exists := erl.capacityByClient[c]; exists { return capacityQueue(nil), errERLReservationExists } // guard against overprovisioning, if there is less than a reservedCapacity amount left @@ -246,7 +244,7 @@ func (erl *ElasticRateLimiter) openReservation(c ErlClient) (capacityQueue, erro } // make capacity for the provided client q := capacityQueue(make(chan capacity, erl.CapacityPerReservation)) - erl.capacityByClient[addr] = q + erl.capacityByClient[c] = q // create a thread to drain the capacity from sharedCapacity in a blocking way // and move it to the reservation, also in a blocking way go func() { @@ -263,13 +261,12 @@ func (erl *ElasticRateLimiter) openReservation(c ErlClient) (capacityQueue, erro func (erl *ElasticRateLimiter) closeReservation(c ErlClient) { erl.clientLock.Lock() defer erl.clientLock.Unlock() - addr := string(c.RoutingAddr()) - q, exists := erl.capacityByClient[addr] + q, exists := erl.capacityByClient[c] // guard clauses, and preventing the ElasticRateLimiter from draining its own sharedCapacity if !exists || q == erl.sharedCapacity { return } - delete(erl.capacityByClient, addr) + delete(erl.capacityByClient, c) // start a routine to consume capacity from the closed reservation, and return it to the sharedCapacity go func() { for i := 0; i < erl.CapacityPerReservation; i++ { diff --git a/util/rateLimit_test.go b/util/rateLimit_test.go index 638bc58467..5a62cac5b5 100644 --- a/util/rateLimit_test.go +++ b/util/rateLimit_test.go @@ -38,10 +38,6 @@ func (c mockClient) OnClose(func()) { return } -func (c mockClient) RoutingAddr() []byte { - return []byte(c) -} - func TestNewElasticRateLimiter(t *testing.T) { partitiontest.PartitionTest(t) erl := NewElasticRateLimiter(100, 10, time.Second, nil) @@ -53,7 +49,6 @@ func TestNewElasticRateLimiter(t *testing.T) { func TestElasticRateLimiterCongestionControlled(t *testing.T) { partitiontest.PartitionTest(t) client := mockClient("client") - clientAddr := string(client.RoutingAddr()) erl := NewElasticRateLimiter(3, 2, time.Second, nil) // give the ERL a congestion controler with well defined behavior for testing erl.cm = mockCongestionControl{} @@ -62,24 +57,24 @@ func TestElasticRateLimiterCongestionControlled(t *testing.T) { // because the ERL gives capacity to a reservation, and then asynchronously drains capacity from the share, // wait a moment before testing the size of the sharedCapacity time.Sleep(100 * time.Millisecond) - assert.Equal(t, 1, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 1, len(erl.capacityByClient[client])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) erl.EnableCongestionControl() _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 0, len(erl.capacityByClient[client])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 0, len(erl.capacityByClient[client])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.Error(t, err) erl.DisableCongestionControl() _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 0, len(erl.capacityByClient[client])) assert.Equal(t, 0, len(erl.sharedCapacity)) assert.NoError(t, err) } @@ -137,47 +132,46 @@ func TestZeroSizeReservations(t *testing.T) { func TestConsumeReleaseCapacity(t *testing.T) { partitiontest.PartitionTest(t) client := mockClient("client") - clientAddr := string(client.RoutingAddr()) erl := NewElasticRateLimiter(4, 3, time.Second, nil) c1, _, err := erl.ConsumeCapacity(client) // because the ERL gives capacity to a reservation, and then asynchronously drains capacity from the share, // wait a moment before testing the size of the sharedCapacity time.Sleep(100 * time.Millisecond) - assert.Equal(t, 2, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 2, len(erl.capacityByClient[client])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 1, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 1, len(erl.capacityByClient[client])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 0, len(erl.capacityByClient[client])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) // remember this capacity, as it is a shared capacity c4, _, err := erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 0, len(erl.capacityByClient[client])) assert.Equal(t, 0, len(erl.sharedCapacity)) assert.NoError(t, err) _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 0, len(erl.capacityByClient[client])) assert.Equal(t, 0, len(erl.sharedCapacity)) assert.Error(t, err) // now release the capacity and observe the items return to the correct places err = c1.Release() - assert.Equal(t, 1, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 1, len(erl.capacityByClient[client])) assert.Equal(t, 0, len(erl.sharedCapacity)) assert.NoError(t, err) // now release the capacity and observe the items return to the correct places err = c4.Release() - assert.Equal(t, 1, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 1, len(erl.capacityByClient[client])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) From 038936e9c5fa8dc733a595ad1fbfabd235ef8dbc Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 14 Feb 2025 15:54:23 -0500 Subject: [PATCH 05/11] ip-based ERL client --- data/txHandler.go | 121 +++++++++++++++++++++++++++++++++-------- data/txHandler_test.go | 93 +++++++++++++++++++++++++++++++ 2 files changed, 191 insertions(+), 23 deletions(-) diff --git a/data/txHandler.go b/data/txHandler.go index 65fd869d7e..53aeba22c5 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -38,6 +38,7 @@ import ( "github.com/algorand/go-algorand/util" "github.com/algorand/go-algorand/util/execpool" "github.com/algorand/go-algorand/util/metrics" + "github.com/algorand/go-deadlock" ) var transactionMessagesHandled = metrics.MakeCounter(metrics.TransactionMessagesHandled) @@ -130,6 +131,7 @@ type TxHandler struct { streamVerifierChan chan execpool.InputJob streamVerifierDropped chan *verify.UnverifiedTxnSigJob erl *util.ElasticRateLimiter + erlClientMapper erlClientMapper appLimiter *appRateLimiter appLimiterBacklogThreshold int appLimiterCountERLDrops bool @@ -206,6 +208,10 @@ func MakeTxHandler(opts TxHandlerOpts) (*TxHandler, error) { time.Duration(opts.Config.TxBacklogServiceRateWindowSeconds)*time.Second, txBacklogDroppedCongestionManagement, ) + handler.erlClientMapper = erlClientMapper{ + mapping: make(map[string]*erlIPClient), + maxClients: opts.Config.MaxConnectionsPerIP, + } } if opts.Config.EnableTxBacklogAppRateLimiting { handler.appLimiter = makeAppRateLimiter( @@ -623,32 +629,97 @@ func (handler *TxHandler) incomingMsgDupCheck(data []byte) (*crypto.Digest, bool return msgKey, false } +// erlClientMapper handles erlIPClient creation from erlClient +// in order to map multiple clients to a single IP address. +// Then that meta erlIPClient is supposed to be supplied to ERL +type erlClientMapper struct { + m deadlock.RWMutex + mapping map[string]*erlIPClient + maxClients int +} + +// getClient returns erlIPClient for a given sender +func (mp *erlClientMapper) getClient(sender network.DisconnectableAddressablePeer) util.ErlClient { + addr := string(sender.RoutingAddr()) + ec := sender.(util.ErlClient) + + mp.m.Lock() + ipClient, has := mp.mapping[addr] + if !has { + ipClient = &erlIPClient{ + clients: make(map[util.ErlClient]struct{}, mp.maxClients), + } + mp.mapping[addr] = ipClient + } + mp.m.Unlock() + + ipClient.register(ec) + return ipClient +} + +type erlIPClient struct { + util.ErlClient + m deadlock.RWMutex + clients map[util.ErlClient]struct{} + closer func() +} + +func (eic *erlIPClient) OnClose(f func()) { + eic.closer = f +} + +// register registers a new client to the erlIPClient +// by adding a helper closer function to track connection closures +func (eic *erlIPClient) register(ec util.ErlClient) { + eic.m.Lock() + _, has := eic.clients[ec] + defer eic.m.Unlock() + if has { + // this peer is known => noop + return + } + eic.clients[ec] = struct{}{} + + ec.OnClose(func() { + eic.connClosed(ec) + }) +} + +// connClosed is called when a connection is closed so that +// erlIPClient removes the client from its list of clients +// and calls the closer function if there are no more clients +func (eic *erlIPClient) connClosed(ec util.ErlClient) { + eic.m.Lock() + delete(eic.clients, ec) + empty := len(eic.clients) == 0 + eic.m.Unlock() + if empty && eic.closer != nil { + eic.closer() + eic.closer = nil + } +} + // incomingMsgErlCheck runs the rate limiting check on a sender. // Returns: // - the capacity guard returned by the elastic rate limiter // - a boolean indicating if the sender is rate limited -func (handler *TxHandler) incomingMsgErlCheck(sender network.DisconnectableAddressablePeer) (*util.ErlCapacityGuard, bool) { - var capguard *util.ErlCapacityGuard - var isCMEnabled bool - var err error - if handler.erl != nil { - congestedERL := float64(cap(handler.backlogQueue))*handler.backlogCongestionThreshold < float64(len(handler.backlogQueue)) - // consume a capacity unit - // if the elastic rate limiter cannot vend a capacity, the error it returns - // is sufficient to indicate that we should enable Congestion Control, because - // an issue in vending capacity indicates the underlying resource (TXBacklog) is full - capguard, isCMEnabled, err = handler.erl.ConsumeCapacity(sender.(util.ErlClient)) - if err != nil || // did ERL ask to enable congestion control? - (!isCMEnabled && congestedERL) { // is CM not currently enabled, but queue is congested? - handler.erl.EnableCongestionControl() - // if there is no capacity, it is the same as if we failed to put the item onto the backlog, so report such - transactionMessagesDroppedFromBacklog.Inc(nil) - return capguard, true - } - // if the backlog Queue has 50% of its buffer back, turn congestion control off - if !congestedERL { - handler.erl.DisableCongestionControl() - } +func (handler *TxHandler) incomingMsgErlCheck(sender util.ErlClient) (*util.ErlCapacityGuard, bool) { + congestedERL := float64(cap(handler.backlogQueue))*handler.backlogCongestionThreshold < float64(len(handler.backlogQueue)) + // consume a capacity unit + // if the elastic rate limiter cannot vend a capacity, the error it returns + // is sufficient to indicate that we should enable Congestion Control, because + // an issue in vending capacity indicates the underlying resource (TXBacklog) is full + capguard, isCMEnabled, err := handler.erl.ConsumeCapacity(sender) + if err != nil || // did ERL ask to enable congestion control? + (!isCMEnabled && congestedERL) { // is CM not currently enabled, but queue is congested? + handler.erl.EnableCongestionControl() + // if there is no capacity, it is the same as if we failed to put the item onto the backlog, so report such + transactionMessagesDroppedFromBacklog.Inc(nil) + return capguard, true + } + // if the backlog Queue has 50% of its buffer back, turn congestion control off + if !congestedERL { + handler.erl.DisableCongestionControl() } return capguard, false } @@ -738,7 +809,11 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net return network.OutgoingMessage{Action: network.Ignore} } - capguard, shouldDrop := handler.incomingMsgErlCheck(rawmsg.Sender) + var capguard *util.ErlCapacityGuard + if handler.erl != nil { + client := handler.erlClientMapper.getClient(rawmsg.Sender) + capguard, shouldDrop = handler.incomingMsgErlCheck(client) + } accepted := false defer func() { // if we failed to put the item onto the backlog, we should release the capacity if any diff --git a/data/txHandler_test.go b/data/txHandler_test.go index 67e82468fa..4626623204 100644 --- a/data/txHandler_test.go +++ b/data/txHandler_test.go @@ -51,6 +51,7 @@ import ( "github.com/algorand/go-algorand/network" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/test/partitiontest" + "github.com/algorand/go-algorand/util" "github.com/algorand/go-algorand/util/execpool" "github.com/algorand/go-algorand/util/metrics" ) @@ -2822,3 +2823,95 @@ func TestTxHandlerValidateIncomingTxMessage(t *testing.T) { require.Equal(t, outmsg.Action, network.Disconnect) }) } + +// Create mock types to satisfy interfaces +type erlMockPeer struct { + network.DisconnectableAddressablePeer + util.ErlClient + addr string + closer func() +} + +func newErlMockPeer(addr string) *erlMockPeer { + return &erlMockPeer{ + addr: addr, + } +} + +// Implement required interface methods +func (m *erlMockPeer) RoutingAddr() []byte { return []byte(m.addr) } +func (m *erlMockPeer) OnClose(f func()) { m.closer = f } + +func TestTxHandlerErlClientMapper(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + t.Run("Same routing address clients share erlIPClient", func(t *testing.T) { + mapper := erlClientMapper{ + mapping: make(map[string]*erlIPClient), + maxClients: 4, + } + + peer1 := newErlMockPeer("192.168.1.1") + peer2 := newErlMockPeer("192.168.1.1") + + client1 := mapper.getClient(peer1) + client2 := mapper.getClient(peer2) + + // Verify both peers got same erlIPClient + require.Equal(t, client1, client2, "Expected same erlIPClient for same routing address") + require.Equal(t, 1, len(mapper.mapping)) + + ipClient := mapper.mapping["192.168.1.1"] + require.Equal(t, 2, len(ipClient.clients)) + }) + + t.Run("Different routing addresses get different erlIPClients", func(t *testing.T) { + mapper := erlClientMapper{ + mapping: make(map[string]*erlIPClient), + maxClients: 4, + } + + peer1 := newErlMockPeer("192.168.1.1") + peer2 := newErlMockPeer("192.168.1.2") + + client1 := mapper.getClient(peer1) + client2 := mapper.getClient(peer2) + + // Verify peers got different erlIPClients + require.NotEqual(t, client1, client2, "Expected different erlIPClients for different routing addresses") + require.Equal(t, 2, len(mapper.mapping)) + }) + + t.Run("Client cleanup on connection close", func(t *testing.T) { + mapper := erlClientMapper{ + mapping: make(map[string]*erlIPClient), + maxClients: 4, + } + + peer1 := newErlMockPeer("192.168.1.1") + peer2 := newErlMockPeer("192.168.1.1") + + // Register clients for both peers + mapper.getClient(peer1) + mapper.getClient(peer2) + + ipClient := mapper.mapping["192.168.1.1"] + closerCalled := false + ipClient.OnClose(func() { + closerCalled = true + }) + + require.Equal(t, 2, len(ipClient.clients)) + + // Simulate connection close for peer1 + peer1.closer() + require.Equal(t, 1, len(ipClient.clients)) + require.False(t, closerCalled) + + // Simulate connection close for peer2 + peer2.closer() + require.Equal(t, 0, len(ipClient.clients)) + require.True(t, closerCalled) + }) +} From 5948fcc60689e17625995a38c04836752a31158e Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 14 Feb 2025 17:49:13 -0500 Subject: [PATCH 06/11] fix closer data race --- data/txHandler.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/data/txHandler.go b/data/txHandler.go index 53aeba22c5..0b1db364db 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -23,6 +23,7 @@ import ( "fmt" "io" "sync" + "sync/atomic" "time" "github.com/algorand/go-algorand/config" @@ -661,11 +662,11 @@ type erlIPClient struct { util.ErlClient m deadlock.RWMutex clients map[util.ErlClient]struct{} - closer func() + closer atomic.Value } func (eic *erlIPClient) OnClose(f func()) { - eic.closer = f + eic.closer.Store(f) } // register registers a new client to the erlIPClient @@ -693,9 +694,14 @@ func (eic *erlIPClient) connClosed(ec util.ErlClient) { delete(eic.clients, ec) empty := len(eic.clients) == 0 eic.m.Unlock() - if empty && eic.closer != nil { - eic.closer() - eic.closer = nil + // if no elements left, call the closer + // use atomic Swap in order to retrieve the closer and call it once + if empty { + // atomic.Value.Swap does not like nil values so use a typed nil + // and cast it to func() in order to compare with nil + if closer := eic.closer.Swap((func())(nil)).(func()); closer != nil { + closer() + } } } From e8f32773821f09e3965428a952d380d8f52737c3 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Sat, 15 Feb 2025 15:29:28 -0500 Subject: [PATCH 07/11] do not drop message if backlog congested but ERL accepted --- data/txHandler.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/data/txHandler.go b/data/txHandler.go index 0b1db364db..3a0bbe3450 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -716,15 +716,15 @@ func (handler *TxHandler) incomingMsgErlCheck(sender util.ErlClient) (*util.ErlC // is sufficient to indicate that we should enable Congestion Control, because // an issue in vending capacity indicates the underlying resource (TXBacklog) is full capguard, isCMEnabled, err := handler.erl.ConsumeCapacity(sender) - if err != nil || // did ERL ask to enable congestion control? - (!isCMEnabled && congestedERL) { // is CM not currently enabled, but queue is congested? + if err != nil { // did ERL ask to enable congestion control? handler.erl.EnableCongestionControl() // if there is no capacity, it is the same as if we failed to put the item onto the backlog, so report such transactionMessagesDroppedFromBacklog.Inc(nil) return capguard, true - } - // if the backlog Queue has 50% of its buffer back, turn congestion control off - if !congestedERL { + } else if !isCMEnabled && congestedERL { // is CM not currently enabled, but queue is congested? + handler.erl.EnableCongestionControl() + } else if !congestedERL { + // if the backlog Queue has 50% of its buffer back, turn congestion control off handler.erl.DisableCongestionControl() } return capguard, false From 33ae225684cd3a8372aadf514bfd4cd9b110b1cc Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Sun, 16 Feb 2025 16:10:30 -0500 Subject: [PATCH 08/11] IP-based ERL correctness test --- data/txHandler_test.go | 83 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/data/txHandler_test.go b/data/txHandler_test.go index 4626623204..2b415342c9 100644 --- a/data/txHandler_test.go +++ b/data/txHandler_test.go @@ -2915,3 +2915,86 @@ func TestTxHandlerErlClientMapper(t *testing.T) { require.True(t, closerCalled) }) } + +// TestTxHandlerERLIPClient checks that ERL properly handles sender with the same and different addresses: +// Configure ERL in following way: +// 1. Small maxCapacity=10 fully shared by two IP senders (TxBacklogReservedCapacityPerPeer=5, IncomingConnectionsLimit=0) +// 2. Submit one from both IP senders to initalize per peer-queues and exhaust shared capacity +// 3. Make sure the third peer does not come through +// 4. Make sure extra messages from the first peer and second peer are accepted +func TestTxHandlerERLIPClient(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + // technically we don't need any users for this test + // but we need to create the genesis accounts to prevent this warning: + // "cannot start evaluator: overflowed subtracting rewards for block 1" + _, _, genesis := makeTestGenesisAccounts(t, 0) + genBal := bookkeeping.MakeGenesisBalances(genesis, sinkAddr, poolAddr) + ledgerName := fmt.Sprintf("%s-mem", t.Name()) + const inMem = true + + log := logging.TestingLog(t) + log.SetLevel(logging.Panic) + + const backlogSize = 10 // to have targetRateRefreshTicks: bsize / 10 != 0 in NewREDCongestionManager + cfg := config.GetDefaultLocal() + cfg.TxIncomingFilteringFlags = 0 // disable duplicate filtering to simplify the test + cfg.IncomingConnectionsLimit = 0 // disable incoming connections limit to have TxBacklogSize controlled + cfg.EnableTxBacklogRateLimiting = true + cfg.EnableTxBacklogAppRateLimiting = false + cfg.TxBacklogServiceRateWindowSeconds = 100 // large window + cfg.TxBacklogRateLimitingCongestionPct = 0 // always congested + cfg.TxBacklogReservedCapacityPerPeer = 5 // 5 messages per peer (IP address in our case) + cfg.TxBacklogSize = backlogSize + l, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg) + require.NoError(t, err) + defer l.Close() + + handler, err := makeTestTxHandler(l, cfg) + require.NoError(t, err) + defer handler.txVerificationPool.Shutdown() + defer close(handler.streamVerifierDropped) + require.NotNil(t, handler.erl) + require.Nil(t, handler.appLimiter) + handler.erl.Start() + defer handler.erl.Stop() + + var addr1, addr2 basics.Address + crypto.RandBytes(addr1[:]) + crypto.RandBytes(addr2[:]) + + tx := getTransaction(addr1, addr2, 1) + + signedTx := tx.Sign(keypair()) // some random key + blob := protocol.Encode(&signedTx) + sender1 := newErlMockPeer("1") + sender2 := newErlMockPeer("2") + sender3 := newErlMockPeer("3") + + // initialize peer queues + action := handler.processIncomingTxn(network.IncomingMessage{Data: blob, Sender: sender1}) + require.Equal(t, network.OutgoingMessage{Action: network.Ignore}, action) + require.Equal(t, 1, len(handler.backlogQueue)) + + action = handler.processIncomingTxn(network.IncomingMessage{Data: blob, Sender: sender2}) + require.Equal(t, network.OutgoingMessage{Action: network.Ignore}, action) + require.Equal(t, 2, len(handler.backlogQueue)) + + // make sure the third peer does not come through + action = handler.processIncomingTxn(network.IncomingMessage{Data: blob, Sender: sender3}) + require.Equal(t, network.OutgoingMessage{Action: network.Ignore}, action) + require.Equal(t, 2, len(handler.backlogQueue)) + + // make sure messages from other sender objects with the same IP are accepted + sender11 := newErlMockPeer("1") + sender21 := newErlMockPeer("2") + + action = handler.processIncomingTxn(network.IncomingMessage{Data: blob, Sender: sender11}) + require.Equal(t, network.OutgoingMessage{Action: network.Ignore}, action) + require.Equal(t, 3, len(handler.backlogQueue)) + + action = handler.processIncomingTxn(network.IncomingMessage{Data: blob, Sender: sender21}) + require.Equal(t, network.OutgoingMessage{Action: network.Ignore}, action) + require.Equal(t, 4, len(handler.backlogQueue)) +} From a33ad31c7c1ff3a2586b22459aa0a8cc5a8ac106 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 20 Feb 2025 13:05:55 -0500 Subject: [PATCH 09/11] CR fixes: possible race cond with closer --- data/txHandler.go | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/data/txHandler.go b/data/txHandler.go index 3a0bbe3450..6d229efa5a 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -23,7 +23,6 @@ import ( "fmt" "io" "sync" - "sync/atomic" "time" "github.com/algorand/go-algorand/config" @@ -662,11 +661,13 @@ type erlIPClient struct { util.ErlClient m deadlock.RWMutex clients map[util.ErlClient]struct{} - closer atomic.Value + closer func() } func (eic *erlIPClient) OnClose(f func()) { - eic.closer.Store(f) + eic.m.Lock() + defer eic.m.Unlock() + eic.closer = f } // register registers a new client to the erlIPClient @@ -691,17 +692,13 @@ func (eic *erlIPClient) register(ec util.ErlClient) { // and calls the closer function if there are no more clients func (eic *erlIPClient) connClosed(ec util.ErlClient) { eic.m.Lock() + defer eic.m.Unlock() delete(eic.clients, ec) empty := len(eic.clients) == 0 - eic.m.Unlock() // if no elements left, call the closer - // use atomic Swap in order to retrieve the closer and call it once - if empty { - // atomic.Value.Swap does not like nil values so use a typed nil - // and cast it to func() in order to compare with nil - if closer := eic.closer.Swap((func())(nil)).(func()); closer != nil { - closer() - } + if empty && eic.closer != nil { + eic.closer() + eic.closer = nil } } From 8dc7867c4c0cd8816b09b7e5a76b2486bfa9cb8b Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy <65323360+algorandskiy@users.noreply.github.com> Date: Thu, 20 Feb 2025 14:05:46 -0500 Subject: [PATCH 10/11] Update data/txHandler.go Co-authored-by: John Jannotti --- data/txHandler.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/data/txHandler.go b/data/txHandler.go index 5f9851887a..ccf277fb8c 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -667,9 +667,8 @@ func (eic *erlIPClient) OnClose(f func()) { // by adding a helper closer function to track connection closures func (eic *erlIPClient) register(ec util.ErlClient) { eic.m.Lock() - _, has := eic.clients[ec] defer eic.m.Unlock() - if has { + if _, has := eic.clients[ec]; has { // this peer is known => noop return } From 686aaedba540c2a32e09e5578de1b7503f721d09 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 21 Feb 2025 12:37:42 -0500 Subject: [PATCH 11/11] use RLock, lookup, Lock, lookup access pattern for mp.mapping data --- data/txHandler.go | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/data/txHandler.go b/data/txHandler.go index ccf277fb8c..1df98a6a2f 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -636,7 +636,27 @@ func (mp *erlClientMapper) getClient(sender network.DisconnectableAddressablePee addr := string(sender.RoutingAddr()) ec := sender.(util.ErlClient) + // check if the client is already known + // typically one client sends lots of messages so more much more reads than writes. + // handle with a quick read lock, and if not found, create a new one with a write lock + mp.m.RLock() + ipClient, has := mp.mapping[addr] + mp.m.RUnlock() + + if !has { + ipClient = mp.getClientByAddr(addr) + } + + ipClient.register(ec) + return ipClient +} + +// getClientByAddr is internal helper to get or create a new erlIPClient +// with write lock held +func (mp *erlClientMapper) getClientByAddr(addr string) *erlIPClient { mp.m.Lock() + defer mp.m.Unlock() + ipClient, has := mp.mapping[addr] if !has { ipClient = &erlIPClient{ @@ -644,9 +664,6 @@ func (mp *erlClientMapper) getClient(sender network.DisconnectableAddressablePee } mp.mapping[addr] = ipClient } - mp.m.Unlock() - - ipClient.register(ec) return ipClient }