Skip to content

Commit

Permalink
Rework timeout using MutexGroup
Browse files Browse the repository at this point in the history
Signed-off-by: Vladimir Popov <[email protected]>
  • Loading branch information
Vladimir Popov committed Oct 20, 2020
1 parent 411d788 commit 4f7fb71
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 144 deletions.
82 changes: 43 additions & 39 deletions pkg/networkservice/common/timeout/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ import (

"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/tools/extend"
"github.com/networkservicemesh/sdk/pkg/tools/fifosync"
"github.com/networkservicemesh/sdk/pkg/tools/log"
"github.com/networkservicemesh/sdk/pkg/tools/multiexecutor"
)

type timeoutServer struct {
ctx context.Context
connections timerMap
executor *multiexecutor.Executor
mutexGroup fifosync.MutexGroup
}

type timer struct {
Expand All @@ -46,39 +47,41 @@ type timer struct {
// NewServer - creates a new NetworkServiceServer chain element that implements timeout of expired connections.
func NewServer(ctx context.Context) networkservice.NetworkServiceServer {
return &timeoutServer{
executor: multiexecutor.NewExecutor(ctx),
ctx: ctx,
}
}

func (t *timeoutServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (conn *networkservice.Connection, err error) {
logEntry := log.Entry(ctx).WithField("timeoutServer", "Request")

connID := request.GetConnection().GetId()
<-t.executor.AsyncExec(connID, func() {
if timer, ok := t.connections.Load(connID); ok {
if !timer.timer.Stop() {
logEntry.Warnf("connection has been timed out, re requesting: %v", connID)
}
close(timer.stopCh)
t.connections.Delete(connID)
}

conn, err = next.Server(ctx).Request(ctx, request)
if err != nil {
return
t.mutexGroup.Lock(connID)
defer t.mutexGroup.Unlock(connID)

if timer, ok := t.connections.Load(connID); ok {
if !timer.timer.Stop() {
logEntry.Warnf("connection has been timed out, re requesting: %v", connID)
}
close(timer.stopCh)
t.connections.Delete(connID)
}

var timer *timer
timer, err = t.createTimer(ctx, conn)
if err != nil {
if _, closeErr := next.Server(ctx).Close(ctx, conn); closeErr != nil {
err = errors.Wrapf(err, "error attempting to close failed connection %v: %+v", connID, closeErr)
}
return
conn, err = next.Server(ctx).Request(ctx, request)
if err != nil {
return
}

var timer *timer
timer, err = t.createTimer(ctx, conn)
if err != nil {
if _, closeErr := next.Server(ctx).Close(ctx, conn); closeErr != nil {
err = errors.Wrapf(err, "error attempting to close failed connection %v: %+v", connID, closeErr)
}
return
}

t.connections.Store(connID, timer)
})
t.connections.Store(connID, timer)

return conn, err
}
Expand All @@ -92,35 +95,36 @@ func (t *timeoutServer) createTimer(ctx context.Context, conn *networkservice.Co
}

conn = conn.Clone()
ctx = extend.WithValuesFromContext(context.Background(), ctx)
timerCtx := extend.WithValuesFromContext(context.Background(), t.ctx)

timer := &timer{
stopCh: make(chan struct{}, 1),
}
timer.timer = time.AfterFunc(time.Until(expireTime), func() {
t.executor.AsyncExec(conn.GetId(), func() {
select {
case <-timer.stopCh:
logEntry.Warnf("timer has been already stopped: %v", conn.GetId())
default:
if err := t.close(ctx, conn); err != nil {
logEntry.Errorf("failed to close timed out connection: %v %+v", conn.GetId(), err)
}
t.mutexGroup.Lock(conn.GetId())
defer t.mutexGroup.Unlock(conn.GetId())

select {
case <-timer.stopCh:
logEntry.Warnf("timer has been already stopped: %v", conn.GetId())
default:
if err := t.close(timerCtx, conn, next.Server(ctx)); err != nil {
logEntry.Errorf("failed to close timed out connection: %v %+v", conn.GetId(), err)
}
})
}
})

return timer, nil
}

func (t *timeoutServer) Close(ctx context.Context, conn *networkservice.Connection) (_ *empty.Empty, err error) {
<-t.executor.AsyncExec(conn.GetId(), func() {
err = t.close(ctx, conn)
})
return &empty.Empty{}, err
t.mutexGroup.Lock(conn.GetId())
defer t.mutexGroup.Unlock(conn.GetId())

return &empty.Empty{}, t.close(ctx, conn, next.Server(ctx))
}

func (t *timeoutServer) close(ctx context.Context, conn *networkservice.Connection) error {
func (t *timeoutServer) close(ctx context.Context, conn *networkservice.Connection, nextServer networkservice.NetworkServiceServer) error {
logEntry := log.Entry(ctx).WithField("timeoutServer", "close")

timer, ok := t.connections.Load(conn.GetId())
Expand All @@ -133,6 +137,6 @@ func (t *timeoutServer) close(ctx context.Context, conn *networkservice.Connecti
close(timer.stopCh)
t.connections.Delete(conn.GetId())

_, err := next.Server(ctx).Close(ctx, conn)
_, err := nextServer.Close(ctx, conn)
return err
}
105 changes: 0 additions & 105 deletions pkg/tools/multiexecutor/multi_executor.go

This file was deleted.

0 comments on commit 4f7fb71

Please sign in to comment.