Skip to content

Commit

Permalink
chore_: fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
friofry committed Oct 4, 2024
1 parent fb0c1ca commit ff86a39
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 36 deletions.
43 changes: 24 additions & 19 deletions health-manager/blockchain_health_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (

// BlockchainFullStatus contains the full status of the blockchain, including provider statuses.
type BlockchainFullStatus struct {
Status rpcstatus.ProviderStatus `json:"status"`
StatusPerChainPerProvider map[uint64]map[string]*rpcstatus.ProviderStatus `json:"statusPerChainPerProvider"`
Status rpcstatus.ProviderStatus `json:"status"`
StatusPerChainPerProvider map[uint64]map[string]rpcstatus.ProviderStatus `json:"statusPerChainPerProvider"`
}

// BlockchainHealthManager manages the state of all providers and aggregates their statuses.
Expand Down Expand Up @@ -59,20 +59,20 @@ func (b *BlockchainHealthManager) Start(ctx context.Context) {
providerCtx, cancel := context.WithCancel(ctx)
b.cancelFuncs[chainID] = cancel

// Subscribe to provider status and ensure Subscribe returns a bidirectional channel
statusCh := phm.Subscribe()
b.wg.Add(1)
go func(phm *ProvidersHealthManager, statusCh <-chan struct{}, ctx context.Context) {
go func(phm *ProvidersHealthManager, statusCh chan struct{}, providerCtx context.Context) {
defer func() {
// Unsubscribe from the provider when the goroutine exits
b.Unsubscribe(statusCh)
b.wg.Done()
phm.Unsubscribe(statusCh)
}()
for {
select {
case <-statusCh:
// When the provider updates its status, check the statuses of all providers
b.aggregateAndUpdateStatus()
case <-ctx.Done():
b.aggregateAndUpdateStatus(providerCtx)
case <-providerCtx.Done():
// Stop processing when the context is cancelled
return
}
Expand All @@ -84,17 +84,18 @@ func (b *BlockchainHealthManager) Start(ctx context.Context) {
// Stop stops the event processing and unsubscribes.
func (b *BlockchainHealthManager) Stop() {
b.mu.Lock()
defer b.mu.Unlock()

if !b.started {
return
}
b.started = false

for chainID, cancel := range b.cancelFuncs {
for _, cancel := range b.cancelFuncs {
cancel()
delete(b.cancelFuncs, chainID)
}
clear(b.cancelFuncs)

b.mu.Unlock()
b.wg.Wait()
}

Expand All @@ -116,17 +117,15 @@ func (b *BlockchainHealthManager) Unsubscribe(ch chan struct{}) {
for i, subscriber := range b.subscribers {
if subscriber == ch {
b.subscribers = append(b.subscribers[:i], b.subscribers[i+1:]...)
close(ch) // Close the channel to free resources
close(ch)
break
}
}
}

// aggregateAndUpdateStatus collects statuses from all providers and updates the overall status.
func (b *BlockchainHealthManager) aggregateAndUpdateStatus() {
func (b *BlockchainHealthManager) aggregateAndUpdateStatus(ctx context.Context) {
b.mu.Lock()
defer b.mu.Unlock()

// Store the previous status for comparison
previousStatus := b.aggregator.GetAggregatedStatus()

Expand All @@ -142,16 +141,22 @@ func (b *BlockchainHealthManager) aggregateAndUpdateStatus() {
// Get the new aggregated status
newStatus := b.aggregator.GetAggregatedStatus()

// If the new status differs from the previous one, emit a notification
if newStatus.Status != previousStatus.Status {
b.emitBlockchainHealthStatus()
b.mu.Unlock()
if newStatus.Status == previousStatus.Status {
return
}
b.emitBlockchainHealthStatus(ctx)
}

// emitBlockchainHealthStatus sends a notification to all subscribers about the new blockchain status.
func (b *BlockchainHealthManager) emitBlockchainHealthStatus() {
func (b *BlockchainHealthManager) emitBlockchainHealthStatus(ctx context.Context) {
b.mu.RLock()
defer b.mu.RUnlock()
for _, subscriber := range b.subscribers {
select {
case <-ctx.Done():
// Stop sending notifications when the context is cancelled
return
case subscriber <- struct{}{}:
default:
// Skip notification if the subscriber's channel is full
Expand All @@ -163,7 +168,7 @@ func (b *BlockchainHealthManager) GetFullStatus() BlockchainFullStatus {
b.mu.RLock()
defer b.mu.RUnlock()

statusPerChainPerProvider := make(map[uint64]map[string]*rpcstatus.ProviderStatus)
statusPerChainPerProvider := make(map[uint64]map[string]rpcstatus.ProviderStatus)

for chainID, phm := range b.providers {
providerStatuses := phm.GetStatuses()
Expand Down
188 changes: 183 additions & 5 deletions health-manager/blockchain_health_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s *BlockchainHealthManagerSuite) TestStatusUpdateNotification() {
{Name: "providerName", Timestamp: time.Now(), Err: nil},
})

s.waitForUpdate(ch, rpcstatus.StatusUp, 10*time.Millisecond)
s.waitForUpdate(ch, rpcstatus.StatusUp, 100*time.Millisecond)
}

// Test getting the full status
Expand Down Expand Up @@ -97,16 +97,76 @@ func (s *BlockchainHealthManagerSuite) TestConcurrency() {
var wg sync.WaitGroup

chainsCount := 100
providersCount := 1000
providersCount := 100
for i := 1; i <= chainsCount; i++ {
phm := NewProvidersHealthManager(uint64(i))
s.manager.RegisterProvidersHealthManager(phm)
}
s.manager.Start(s.ctx)
ctx, _ := context.WithTimeout(s.ctx, 3*time.Second)
s.manager.Start(ctx)

s.Equal(rpcstatus.StatusUnknown, s.manager.Status().Status, "Expected blockchain status to be unknown")
ch := s.manager.Subscribe()
ctx := context.Background()

for i := 1; i <= chainsCount; i++ {
wg.Add(1)
go func(chainID uint64) {
defer func() {
wg.Done()
}()
phm := s.manager.providers[chainID]
for j := 0; j < providersCount; j++ {
err := errors.New("connection error")
if j == providersCount-1 {
err = nil
}
name := fmt.Sprintf("provider-%d", j)
phm.Update(ctx, []rpcstatus.RpcCallStatus{
{Name: name, Timestamp: time.Now(), Err: err},
})
}
}(uint64(i))
}

wg.Wait()
s.waitForUpdate(ch, rpcstatus.StatusUp, 2*time.Second)
s.Equal(rpcstatus.StatusUp, s.manager.Status().Status, "Expected blockchain status to be up")
}

func (s *BlockchainHealthManagerSuite) TestConcurrentSubscriptionUnsubscription() {
var wg sync.WaitGroup
subscribersCount := 100
s.manager.Start(s.ctx)

// Concurrently add and remove subscribers
for i := 0; i < subscribersCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
subCh := s.manager.Subscribe()
time.Sleep(100 * time.Millisecond)
s.manager.Unsubscribe(subCh)
}()
}

wg.Wait()
// After all subscribers are removed, there should be no active subscribers
s.Equal(0, len(s.manager.subscribers), "Expected no subscribers after unsubscription")
}

func (s *BlockchainHealthManagerSuite) TestStopWithConcurrentUpdates() {
var wg sync.WaitGroup

chainsCount := 50
providersCount := 30
for i := 1; i <= chainsCount; i++ {
phm := NewProvidersHealthManager(uint64(i))
s.manager.RegisterProvidersHealthManager(phm)
}
ctx, cancel := context.WithCancel(s.ctx)
s.manager.Start(ctx)

// Perform updates
for i := 1; i <= chainsCount; i++ {
wg.Add(1)
go func(chainID uint64) {
Expand All @@ -125,11 +185,129 @@ func (s *BlockchainHealthManagerSuite) TestConcurrency() {
}(uint64(i))
}

time.Sleep(500 * time.Millisecond)
cancel() // Stop the manager while updates are happening

wg.Wait()
s.waitForUpdate(ch, rpcstatus.StatusUp, 10*time.Second)
s.manager.Stop()

// Ensure that after Stop(), all contexts have been canceled
s.Equal(false, s.manager.started, "Expected manager to be stopped")
}
func (s *BlockchainHealthManagerSuite) TestRaceConditionStatusUpdate() {
var wg sync.WaitGroup
chainsCount := 10
providersCount := 100
for i := 1; i <= chainsCount; i++ {
phm := NewProvidersHealthManager(uint64(i))
s.manager.RegisterProvidersHealthManager(phm)
}
ctx, cancel := context.WithCancel(s.ctx)
defer cancel()
s.manager.Start(ctx)

for i := 1; i <= chainsCount; i++ {
wg.Add(1)
go func(chainID uint64) {
defer wg.Done()
phm := s.manager.providers[chainID]
for j := 0; j < providersCount; j++ {
err := errors.New("connection error")
if j == providersCount-1 {
err = nil
}
name := fmt.Sprintf("provider-%d", j)
go phm.Update(ctx, []rpcstatus.RpcCallStatus{
{Name: name, Timestamp: time.Now(), Err: err},
})
}
}(uint64(i))
}

wg.Wait()

// Ensure that no data races occurred and the status is consistent
s.Equal(rpcstatus.StatusUp, s.manager.Status().Status, "Expected blockchain status to be up")
}

func (s *BlockchainHealthManagerSuite) TestMultipleStartAndStop() {
s.manager.Start(s.ctx)
s.manager.Stop()

s.manager.Start(s.ctx)
s.manager.Stop()

// Ensure that the manager is in a clean state after multiple starts and stops
s.Equal(false, s.manager.started, "Expected manager to be stopped")
s.Equal(0, len(s.manager.cancelFuncs), "Expected no cancel functions after stop")
}

func (s *BlockchainHealthManagerSuite) TestUnsubscribeWhileEmittingStatus() {
// Создаем экземпляр BlockchainHealthManager и регистрируем менеджер провайдеров
phm := NewProvidersHealthManager(1)
s.manager.RegisterProvidersHealthManager(phm)

// Запускаем менеджер
ctx, cancel := context.WithCancel(s.ctx)
defer cancel()
s.manager.Start(ctx)

// Подписываемся на уведомления
subscriber := s.manager.Subscribe()

// Удаляем подписчика (Unsubscribe)
s.manager.Unsubscribe(subscriber)

// Попробуем отправить уведомление после отписки
s.manager.aggregateAndUpdateStatus(ctx, phm.ChainID())

// Проверяем, что отписанный канал закрыт и уведомление не отправлено
select {
case _, ok := <-subscriber:
s.False(ok, "Канал подписчика должен быть закрыт")
default:
s.Fail("Канал подписчика не был закрыт")
}
}
func (s *BlockchainHealthManagerSuite) TestUnsubscribeOneOfMultipleSubscribers() {
// Создаем экземпляр BlockchainHealthManager и регистрируем менеджер провайдеров
phm := NewProvidersHealthManager(1)
s.manager.RegisterProvidersHealthManager(phm)

// Запускаем менеджер
ctx, cancel := context.WithCancel(s.ctx)
defer cancel()
s.manager.Start(ctx)

// Подписываемся двумя подписчиками
subscriber1 := s.manager.Subscribe()
subscriber2 := s.manager.Subscribe()

// Отписываем первого подписчика
s.manager.Unsubscribe(subscriber1)

err := errors.New("connection error")
phm.Update(ctx, []rpcstatus.RpcCallStatus{
{Name: "provider-1", Timestamp: time.Now(), Err: err},
})

// Проверяем, что первый подписчик не получил уведомление
select {
case _, ok := <-subscriber1:
s.False(ok, "Канал первого подписчика должен быть закрыт")
default:
s.Fail("Канал первого подписчика не был закрыт")
}

// Проверяем, что второй подписчик получил уведомление
select {
case <-subscriber2:
// Ожидаем получение уведомления вторым подписчиком
case <-time.After(100 * time.Millisecond):
s.Fail("Второй подписчик должен был получить уведомление")
}
}

func TestBlockchainHealthManagerSuite(t *testing.T) {
suite.Run(t, new(BlockchainHealthManagerSuite))
}
Loading

0 comments on commit ff86a39

Please sign in to comment.