Skip to content

Commit

Permalink
Search for the error message we care to stop the for-loop
Browse files Browse the repository at this point in the history
  • Loading branch information
shackra committed Aug 28, 2024
1 parent 2d66da3 commit 02a98f4
Showing 1 changed file with 7 additions and 12 deletions.
19 changes: 7 additions & 12 deletions pkg/connectorrunner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@ import (
"os"
"os/signal"
"path/filepath"
"strings"
"time"

"golang.org/x/sync/semaphore"

"github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"

v1 "github.com/conductorone/baton-sdk/pb/c1/connectorapi/baton/v1"
Expand Down Expand Up @@ -130,9 +129,9 @@ func (c *connectorRunner) run(ctx context.Context) error {

waitDuration := time.Second * 0
errCount := 0
commErrCount := 0
stopForLoop := false
var err error
for commErrCount < 5 {
for !stopForLoop {
select {
case <-ctx.Done():
return c.handleContextCancel(ctx)
Expand Down Expand Up @@ -197,24 +196,20 @@ func (c *connectorRunner) run(ctx context.Context) error {
defer sem.Release(1)
err := c.processTask(ctx, t)
if err != nil {
switch code := status.Code(err); code {
case codes.Canceled:
commErrCount++
default:
l.Info("Received gRPC error code", zap.Uint32("grpc_code", uint32(code)))
if strings.Contains(err.Error(), "grpc: the client connection is closing") {
stopForLoop = true
}
l.Error("runner: error processing task", zap.Error(err), zap.String("task_id", t.Id), zap.String("task_type", tasks.GetType(t).String()))
return
}
// reset the counter
commErrCount = 0
l.Debug("runner: task processed", zap.String("task_id", t.Id), zap.String("task_type", tasks.GetType(t).String()))
}(nextTask)

l.Debug("runner: dispatched task, waiting for next task", zap.Duration("wait_duration", waitDuration))
}
}

if commErrCount > 0 {
if stopForLoop {
return fmt.Errorf("Unable to communicate with gRPC server")
}

Expand Down

0 comments on commit 02a98f4

Please sign in to comment.