Skip to content

Commit

Permalink
Added execution order guarantee between connection event handling and…
Browse files Browse the repository at this point in the history
… fanoutEventCh closing

Signed-off-by: Sergey Semenov <[email protected]>
  • Loading branch information
lvfxx committed Mar 17, 2020
1 parent 183e644 commit 049b754
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 7 deletions.
14 changes: 8 additions & 6 deletions pkg/networkservice/common/heal/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"testing"
"time"

"github.com/sirupsen/logrus"

"go.uber.org/goleak"

"github.com/golang/protobuf/ptypes/empty"
Expand All @@ -37,7 +39,7 @@ import (
)

const (
waitForTimeout = 5 * time.Second
waitForTimeout = 500 * time.Second
tickTimeout = 10 * time.Millisecond
)

Expand Down Expand Up @@ -205,6 +207,7 @@ func TestNewClient_MissingConnectionsInInit(t *testing.T) {
requestCh := make(chan *networkservice.NetworkServiceRequest)
onHeal := &testOnHeal{
RequestFunc: func(ctx context.Context, in *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (connection *networkservice.Connection, e error) {
logrus.Info(in.Connection.Id)
requestCh <- in
return &networkservice.Connection{}, nil
},
Expand Down Expand Up @@ -233,7 +236,7 @@ func TestNewClient_MissingConnectionsInInit(t *testing.T) {

// we emulate situation that server managed to handle only the first connection
// second connection should came in the UPDATE event, but we emulate server's falling down
cancelFunc()
close(eventCh)
// at that point we expect that 'healClient' start healing both 'conn-1' and 'conn-2'

healsRemaining := map[string]int{
Expand All @@ -249,14 +252,13 @@ func TestNewClient_MissingConnectionsInInit(t *testing.T) {
}
return false
default:
return healsRemaining[conns[0].GetId()] == 0 && healsRemaining[conns[1].GetId()] == 1
return false
}
}

require.Eventually(t, cond, waitForTimeout, tickTimeout)
require.Eventually(t, cond, waitForTimeout, tickTimeout)
require.Eventually(t, cond, waitForTimeout, tickTimeout)
require.Equal(t, 0, healsRemaining[conns[0].GetId()])
require.True(t, healsRemaining[conns[1].GetId()] <= 1)
close(eventCh)
require.Equal(t, 0, healsRemaining[conns[1].GetId()])
cancelFunc()
}
11 changes: 10 additions & 1 deletion pkg/networkservice/core/eventchannel/monitor_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,16 @@ func NewMonitorConnectionClient(eventCh <-chan *networkservice.ConnectionEvent)
return rv
}

func (m *monitorConnectionClient) MonitorConnections(ctx context.Context, in *networkservice.MonitorScopeSelector, _ ...grpc.CallOption) (networkservice.MonitorConnection_MonitorConnectionsClient, error) {
func (m *monitorConnectionClient) MonitorConnections(ctx context.Context, _ *networkservice.MonitorScopeSelector, _ ...grpc.CallOption) (networkservice.MonitorConnection_MonitorConnectionsClient, error) {
fanoutEventCh := make(chan *networkservice.ConnectionEvent, 100)
m.updateExecutor.AsyncExec(func() {
m.fanoutEventChs = append(m.fanoutEventChs, fanoutEventCh)
go func() {
<-ctx.Done()
m.updateExecutor.AsyncExec(func() {
if len(m.fanoutEventChs) == 0 {
return
}
var newFanoutEventChs []chan *networkservice.ConnectionEvent
for _, ch := range m.fanoutEventChs {
if ch != fanoutEventCh {
Expand All @@ -81,5 +84,11 @@ func (m *monitorConnectionClient) eventLoop() {
}
})
}
m.updateExecutor.AsyncExec(func() {
for _, fanoutEventCh := range m.fanoutEventChs {
close(fanoutEventCh)
}
m.fanoutEventChs = []chan *networkservice.ConnectionEvent{}
})
}()
}

0 comments on commit 049b754

Please sign in to comment.