diff --git a/pkg/networkservice/common/heal/client_test.go b/pkg/networkservice/common/heal/client_test.go index a4d8ab4683..0d6fd04f06 100644 --- a/pkg/networkservice/common/heal/client_test.go +++ b/pkg/networkservice/common/heal/client_test.go @@ -22,6 +22,8 @@ import ( "testing" "time" + "github.com/sirupsen/logrus" + "go.uber.org/goleak" "github.com/golang/protobuf/ptypes/empty" @@ -37,7 +39,7 @@ import ( ) const ( - waitForTimeout = 5 * time.Second + waitForTimeout = 500 * time.Second tickTimeout = 10 * time.Millisecond ) @@ -205,6 +207,7 @@ func TestNewClient_MissingConnectionsInInit(t *testing.T) { requestCh := make(chan *networkservice.NetworkServiceRequest) onHeal := &testOnHeal{ RequestFunc: func(ctx context.Context, in *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (connection *networkservice.Connection, e error) { + logrus.Info(in.Connection.Id) requestCh <- in return &networkservice.Connection{}, nil }, @@ -233,7 +236,7 @@ func TestNewClient_MissingConnectionsInInit(t *testing.T) { // we emulate situation that server managed to handle only the first connection // second connection should came in the UPDATE event, but we emulate server's falling down - cancelFunc() + close(eventCh) // at that point we expect that 'healClient' start healing both 'conn-1' and 'conn-2' healsRemaining := map[string]int{ @@ -249,14 +252,13 @@ func TestNewClient_MissingConnectionsInInit(t *testing.T) { } return false default: - return healsRemaining[conns[0].GetId()] == 0 && healsRemaining[conns[1].GetId()] == 1 + return false } } - require.Eventually(t, cond, waitForTimeout, tickTimeout) require.Eventually(t, cond, waitForTimeout, tickTimeout) require.Eventually(t, cond, waitForTimeout, tickTimeout) require.Equal(t, 0, healsRemaining[conns[0].GetId()]) - require.True(t, healsRemaining[conns[1].GetId()] <= 1) - close(eventCh) + require.Equal(t, 0, healsRemaining[conns[1].GetId()]) + cancelFunc() } diff --git a/pkg/networkservice/core/eventchannel/monitor_client.go b/pkg/networkservice/core/eventchannel/monitor_client.go index 6de1071495..eb68379858 100644 --- a/pkg/networkservice/core/eventchannel/monitor_client.go +++ b/pkg/networkservice/core/eventchannel/monitor_client.go @@ -50,13 +50,16 @@ func NewMonitorConnectionClient(eventCh <-chan *networkservice.ConnectionEvent) return rv } -func (m *monitorConnectionClient) MonitorConnections(ctx context.Context, in *networkservice.MonitorScopeSelector, _ ...grpc.CallOption) (networkservice.MonitorConnection_MonitorConnectionsClient, error) { +func (m *monitorConnectionClient) MonitorConnections(ctx context.Context, _ *networkservice.MonitorScopeSelector, _ ...grpc.CallOption) (networkservice.MonitorConnection_MonitorConnectionsClient, error) { fanoutEventCh := make(chan *networkservice.ConnectionEvent, 100) m.updateExecutor.AsyncExec(func() { m.fanoutEventChs = append(m.fanoutEventChs, fanoutEventCh) go func() { <-ctx.Done() m.updateExecutor.AsyncExec(func() { + if len(m.fanoutEventChs) == 0 { + return + } var newFanoutEventChs []chan *networkservice.ConnectionEvent for _, ch := range m.fanoutEventChs { if ch != fanoutEventCh { @@ -81,5 +84,11 @@ func (m *monitorConnectionClient) eventLoop() { } }) } + m.updateExecutor.AsyncExec(func() { + for _, fanoutEventCh := range m.fanoutEventChs { + close(fanoutEventCh) + } + m.fanoutEventChs = []chan *networkservice.ConnectionEvent{} + }) }() }