Skip to content

Commit

Permalink
fix: Log early abort. Fixes #9573 (#9575)
Browse files Browse the repository at this point in the history
Signed-off-by: pengfei.ji <[email protected]>
Co-authored-by: pengfei.ji <[email protected]>
  • Loading branch information
jibuji and pengfei.ji authored Oct 10, 2022
1 parent f1bab89 commit 1fc6460
Showing 1 changed file with 31 additions and 1 deletion.
32 changes: 31 additions & 1 deletion util/logs/workflow-logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,27 @@ type sender interface {
Send(entry *workflowpkg.LogEntry) error
}

const maxTokenLength = 1024 * 1024
const startBufSize = 16 * 1024

func scanLinesOrGiveLong(data []byte, atEOF bool) (advance int, token []byte, err error) {
advance, token, err = bufio.ScanLines(data, atEOF)
if advance > 0 || token != nil || err != nil {
// bufio.ScanLines found something, use it
return
}

// bufio.ScanLines found nothing
// if our buffer is still a reasonable size, continue scanning for regular lines
if len(data) < maxTokenLength {
return
}

// our buffer is getting massive, stop waiting for line breaks and return data now
// this avoids bufio.ErrTooLong
return maxTokenLength, data[0:maxTokenLength], nil
}

func WorkflowLogs(ctx context.Context, wfClient versioned.Interface, kubeClient kubernetes.Interface, req request, sender sender) error {
wfInterface := wfClient.ArgoprojV1alpha1().Workflows(req.GetNamespace())
_, err := wfInterface.Get(ctx, req.GetName(), metav1.GetOptions{})
Expand Down Expand Up @@ -111,15 +132,24 @@ func WorkflowLogs(ctx context.Context, wfClient versioned.Interface, kubeClient
logCtx.Error(err)
return
}

scanner := bufio.NewScanner(stream)
//give it more space for long line
scanner.Buffer(make([]byte, startBufSize), maxTokenLength)
//avoid bufio.ErrTooLong error when encounters a very very long line
scanner.Split(scanLinesOrGiveLong)
for scanner.Scan() {
select {
case <-ctx.Done():
return
default:
line := scanner.Text()
parts := strings.SplitN(line, " ", 2)
content := parts[1]
//on old version k8s, the line may contains no space, hence len(parts) would equal to 1
content := ""
if len(parts) > 1 {
content = parts[1]
}
timestamp, err := time.Parse(time.RFC3339, parts[0])
if err != nil {
logCtx.Errorf("unable to decode or infer timestamp from log line: %s", err)
Expand Down

0 comments on commit 1fc6460

Please sign in to comment.