Skip to content

Commit

Permalink
Added execution order guarantee between connection event handling and…
Browse files Browse the repository at this point in the history
… fanoutEventCh closing

Signed-off-by: Sergey Semenov <[email protected]>
  • Loading branch information
lvfxx authored and semenov-spirent committed Mar 17, 2020
1 parent 0b8962a commit b41b364
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 10 deletions.
20 changes: 11 additions & 9 deletions pkg/networkservice/common/heal/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,27 +217,29 @@ func TestNewClient_MissingConnectionsInInit(t *testing.T) {

// we emulate situation that server managed to handle only the first connection
// second connection should came in the UPDATE event, but we emulate server's falling down
cancelFunc()
close(eventCh)
// at that point we expect that 'healClient' start healing both 'conn-1' and 'conn-2'

healedIDs := map[string]bool{}

healsRemaining := map[string]int{
conns[0].GetId(): 1,
conns[1].GetId(): 2,
}
cond := func() bool {
select {
case r := <-requestCh:
if _, ok := healedIDs[r.GetConnection().GetId()]; !ok {
healedIDs[r.GetConnection().GetId()] = true
if val, ok := healsRemaining[r.GetConnection().GetId()]; ok && val != 0 {
healsRemaining[r.GetConnection().GetId()]--
return true
}
return false
default:
return false
}
}

require.Eventually(t, cond, waitForTimeout, tickTimeout)
require.Eventually(t, cond, waitForTimeout, tickTimeout)

require.True(t, healedIDs[conns[0].GetId()])
require.True(t, healedIDs[conns[1].GetId()])
require.Eventually(t, cond, waitForTimeout, tickTimeout)
require.Equal(t, 0, healsRemaining[conns[0].GetId()])
require.Equal(t, 0, healsRemaining[conns[1].GetId()])
cancelFunc()
}
11 changes: 10 additions & 1 deletion pkg/networkservice/core/eventchannel/monitor_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,16 @@ func NewMonitorConnectionClient(eventCh <-chan *networkservice.ConnectionEvent)
return rv
}

func (m *monitorConnectionClient) MonitorConnections(ctx context.Context, in *networkservice.MonitorScopeSelector, _ ...grpc.CallOption) (networkservice.MonitorConnection_MonitorConnectionsClient, error) {
func (m *monitorConnectionClient) MonitorConnections(ctx context.Context, _ *networkservice.MonitorScopeSelector, _ ...grpc.CallOption) (networkservice.MonitorConnection_MonitorConnectionsClient, error) {
fanoutEventCh := make(chan *networkservice.ConnectionEvent, 100)
m.updateExecutor.AsyncExec(func() {
m.fanoutEventChs = append(m.fanoutEventChs, fanoutEventCh)
go func() {
<-ctx.Done()
m.updateExecutor.AsyncExec(func() {
if len(m.fanoutEventChs) == 0 {
return
}
var newFanoutEventChs []chan *networkservice.ConnectionEvent
for _, ch := range m.fanoutEventChs {
if ch != fanoutEventCh {
Expand All @@ -81,5 +84,11 @@ func (m *monitorConnectionClient) eventLoop() {
}
})
}
m.updateExecutor.AsyncExec(func() {
for _, fanoutEventCh := range m.fanoutEventChs {
close(fanoutEventCh)
}
m.fanoutEventChs = []chan *networkservice.ConnectionEvent{}
})
}()
}

0 comments on commit b41b364

Please sign in to comment.