diff --git a/pkg/networkservice/common/heal/client_test.go b/pkg/networkservice/common/heal/client_test.go index 0e11d828b..0e56a6594 100644 --- a/pkg/networkservice/common/heal/client_test.go +++ b/pkg/networkservice/common/heal/client_test.go @@ -217,16 +217,18 @@ func TestNewClient_MissingConnectionsInInit(t *testing.T) { // we emulate situation that server managed to handle only the first connection // second connection should came in the UPDATE event, but we emulate server's falling down - cancelFunc() + close(eventCh) // at that point we expect that 'healClient' start healing both 'conn-1' and 'conn-2' - healedIDs := map[string]bool{} - + healsRemaining := map[string]int{ + conns[0].GetId(): 1, + conns[1].GetId(): 2, + } cond := func() bool { select { case r := <-requestCh: - if _, ok := healedIDs[r.GetConnection().GetId()]; !ok { - healedIDs[r.GetConnection().GetId()] = true + if val, ok := healsRemaining[r.GetConnection().GetId()]; ok && val != 0 { + healsRemaining[r.GetConnection().GetId()]-- return true } return false @@ -234,10 +236,10 @@ func TestNewClient_MissingConnectionsInInit(t *testing.T) { return false } } - require.Eventually(t, cond, waitForTimeout, tickTimeout) require.Eventually(t, cond, waitForTimeout, tickTimeout) - - require.True(t, healedIDs[conns[0].GetId()]) - require.True(t, healedIDs[conns[1].GetId()]) + require.Eventually(t, cond, waitForTimeout, tickTimeout) + require.Equal(t, 0, healsRemaining[conns[0].GetId()]) + require.Equal(t, 0, healsRemaining[conns[1].GetId()]) + cancelFunc() } diff --git a/pkg/networkservice/core/eventchannel/monitor_client.go b/pkg/networkservice/core/eventchannel/monitor_client.go index 6de107149..eb6837985 100644 --- a/pkg/networkservice/core/eventchannel/monitor_client.go +++ b/pkg/networkservice/core/eventchannel/monitor_client.go @@ -50,13 +50,16 @@ func NewMonitorConnectionClient(eventCh <-chan *networkservice.ConnectionEvent) return rv } -func (m *monitorConnectionClient) MonitorConnections(ctx context.Context, in *networkservice.MonitorScopeSelector, _ ...grpc.CallOption) (networkservice.MonitorConnection_MonitorConnectionsClient, error) { +func (m *monitorConnectionClient) MonitorConnections(ctx context.Context, _ *networkservice.MonitorScopeSelector, _ ...grpc.CallOption) (networkservice.MonitorConnection_MonitorConnectionsClient, error) { fanoutEventCh := make(chan *networkservice.ConnectionEvent, 100) m.updateExecutor.AsyncExec(func() { m.fanoutEventChs = append(m.fanoutEventChs, fanoutEventCh) go func() { <-ctx.Done() m.updateExecutor.AsyncExec(func() { + if len(m.fanoutEventChs) == 0 { + return + } var newFanoutEventChs []chan *networkservice.ConnectionEvent for _, ch := range m.fanoutEventChs { if ch != fanoutEventCh { @@ -81,5 +84,11 @@ func (m *monitorConnectionClient) eventLoop() { } }) } + m.updateExecutor.AsyncExec(func() { + for _, fanoutEventCh := range m.fanoutEventChs { + close(fanoutEventCh) + } + m.fanoutEventChs = []chan *networkservice.ConnectionEvent{} + }) }() }