Skip to content

Commit

Permalink
fix add dual-upload for livelogs show in beta (#166)
Browse files Browse the repository at this point in the history
The scrollback beta broke live logs
  • Loading branch information
ChristopherHX authored Jan 23, 2024
1 parent 2a2c5ff commit 7068fc3
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 50 deletions.
9 changes: 8 additions & 1 deletion actionsrunner/worker_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,9 @@ func (wc *DefaultWorkerContext) Init() {

if hasResultsEndpoint && strings.EqualFold(jobreq.MessageType, "RunnerJobRequest") {
wc.JobLogger.IsResults = true
jobVssConnection.TenantURL = resultsEndpoint.Value
con := *jobVssConnection
con.TenantURL = resultsEndpoint.Value
wc.JobLogger.ResultsConnection = &con
wc.JobLogger.Logger = &logger.BufferedLiveLogger{
LiveLogger: &logger.WebsocketLiveloggerWithFallback{
JobRequest: jobreq,
Expand All @@ -166,6 +168,11 @@ func (wc *DefaultWorkerContext) Init() {
},
}
} else {
if hasResultsEndpoint {
con := *jobVssConnection
con.TenantURL = resultsEndpoint.Value
wc.JobLogger.ResultsConnection = &con
}
wc.JobLogger.Logger = &logger.BufferedLiveLogger{
LiveLogger: &logger.WebsocketLiveloggerWithFallback{
JobRequest: jobreq,
Expand Down
119 changes: 70 additions & 49 deletions protocol/logger/job_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package logger
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -71,14 +72,14 @@ func (logger *WebsocketLivelogger) Connect() error {
HTTPClient: logger.Connection.HttpClient(),
HTTPHeader: http.Header{
"Authorization": []string{"Bearer " + logger.Connection.Token},
"User-Agent": []string{"github-act-runner/1.0.0"},
"User-Agent": []string{"github-act-runner/1.0.0"},
},
})
return err
}

func (logger *WebsocketLivelogger) SendLog(lines *protocol.TimelineRecordFeedLinesWrapper) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
return wsjson.Write(ctx, logger.ws, lines)
}
Expand Down Expand Up @@ -239,23 +240,26 @@ func (logger *BufferedLiveLogger) SendLog(wrapper *protocol.TimelineRecordFeedLi
}

type JobLogger struct {
JobRequest *protocol.AgentJobRequestMessage
Connection *protocol.VssConnection
TimelineRecords *protocol.TimelineRecordWrapper
CurrentRecord int64
CurrentLine int64
JobBuffer bytes.Buffer
CurrentBuffer bytes.Buffer
linefeedregex *regexp.Regexp
Logger LiveLogger
lineBuffer []byte
IsResults bool
ChangeId int64
CurrentJobLine int64
FirstBlock bool
FirstJobBlock bool
linesync sync.Mutex
loggersync sync.Mutex
JobRequest *protocol.AgentJobRequestMessage
Connection *protocol.VssConnection
ResultsConnection *protocol.VssConnection
TimelineRecords *protocol.TimelineRecordWrapper
CurrentRecord int64
CurrentLine int64
JobBuffer bytes.Buffer
CurrentBuffer bytes.Buffer
ResultsJobBuffer bytes.Buffer
ResultsCurrentBuffer bytes.Buffer
linefeedregex *regexp.Regexp
Logger LiveLogger
lineBuffer []byte
IsResults bool
ChangeId int64
CurrentJobLine int64
FirstBlock bool
FirstJobBlock bool
linesync sync.Mutex
loggersync sync.Mutex
}

func (logger *JobLogger) Write(p []byte) (n int, err error) {
Expand Down Expand Up @@ -296,6 +300,8 @@ func (logger *JobLogger) MoveNextExt(startNextRecord bool) *protocol.TimelineRec
logger.uploadBlock(cur, true)
logger.CurrentRecord++
logger.CurrentBuffer.Reset()
logger.ResultsCurrentBuffer.Reset()
logger.CurrentLine = 0
if c := logger.current(); c != nil && startNextRecord {
c.Start()
return c
Expand All @@ -305,22 +311,21 @@ func (logger *JobLogger) MoveNextExt(startNextRecord bool) *protocol.TimelineRec
}

func (logger *JobLogger) uploadBlock(cur *protocol.TimelineRecord, finalBlock bool) {
if finalBlock && logger.CurrentBuffer.Len() > 0 || logger.IsResults && (finalBlock || logger.CurrentBuffer.Len() > 2*1024*1024) {
if logger.IsResults {
rs := &results.ResultsService{
Connection: logger.Connection,
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
rs.UploadResultsStepLogAsync(ctx, logger.JobRequest.Plan.PlanID, logger.JobRequest.JobID, cur.ID, &logger.CurrentBuffer, int64(logger.CurrentBuffer.Len()), logger.FirstBlock, finalBlock, logger.CurrentLine)
logger.FirstBlock = false
logger.CurrentBuffer.Reset()
} else if finalBlock {
if logid, err := logger.Connection.UploadLogFile(logger.JobRequest.Timeline.ID, logger.JobRequest, logger.CurrentBuffer.String()); err == nil {
cur.Log = &protocol.TaskLogReference{ID: logid}
}
if !logger.IsResults && finalBlock && logger.CurrentBuffer.Len() > 0 {
if logid, err := logger.Connection.UploadLogFile(logger.JobRequest.Timeline.ID, logger.JobRequest, logger.CurrentBuffer.String()); err == nil {
cur.Log = &protocol.TaskLogReference{ID: logid}
}
}
if logger.ResultsConnection != nil && (finalBlock || logger.ResultsCurrentBuffer.Len() > 2*1024*1024) {
rs := &results.ResultsService{
Connection: logger.ResultsConnection,
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
rs.UploadResultsStepLogAsync(ctx, logger.JobRequest.Plan.PlanID, logger.JobRequest.JobID, cur.ID, &logger.ResultsCurrentBuffer, int64(logger.ResultsCurrentBuffer.Len()), logger.FirstBlock, finalBlock, logger.CurrentLine)
logger.FirstBlock = false
logger.ResultsCurrentBuffer.Reset()
}
}

func (logger *JobLogger) Finish() {
Expand All @@ -330,21 +335,22 @@ func (logger *JobLogger) Finish() {
}

func (logger *JobLogger) uploadJobBlob(finalBlock bool) {
if (finalBlock && logger.JobBuffer.Len() > 0 || logger.IsResults && (finalBlock || logger.JobBuffer.Len() > 2*1024*1024)) && len(logger.TimelineRecords.Value) > 0 {
if logger.IsResults {
if !logger.IsResults && finalBlock && logger.JobBuffer.Len() > 0 && len(logger.TimelineRecords.Value) > 0 {
if logid, err := logger.Connection.UploadLogFile(logger.JobRequest.Timeline.ID, logger.JobRequest, logger.JobBuffer.String()); err == nil {
logger.TimelineRecords.Value[0].Log = &protocol.TaskLogReference{ID: logid}
_ = logger.update()
}
}
if logger.ResultsConnection != nil && (finalBlock || logger.ResultsJobBuffer.Len() > 2*1024*1024) {
if logger.ResultsConnection != nil {
rs := &results.ResultsService{
Connection: logger.Connection,
Connection: logger.ResultsConnection,
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
rs.UploadResultsJobLogAsync(ctx, logger.JobRequest.Plan.PlanID, logger.JobRequest.JobID, &logger.JobBuffer, int64(logger.JobBuffer.Len()), logger.FirstJobBlock, finalBlock, logger.CurrentJobLine)
rs.UploadResultsJobLogAsync(ctx, logger.JobRequest.Plan.PlanID, logger.JobRequest.JobID, &logger.ResultsJobBuffer, int64(logger.ResultsJobBuffer.Len()), logger.FirstJobBlock, finalBlock, logger.CurrentJobLine)
logger.FirstJobBlock = false
logger.JobBuffer.Reset()
} else if finalBlock {
if logid, err := logger.Connection.UploadLogFile(logger.JobRequest.Timeline.ID, logger.JobRequest, logger.JobBuffer.String()); err == nil {
logger.TimelineRecords.Value[0].Log = &protocol.TaskLogReference{ID: logid}
_ = logger.update()
}
logger.ResultsJobBuffer.Reset()
}
}
}
Expand All @@ -356,7 +362,8 @@ func (logger *JobLogger) Update() error {
}

func (logger *JobLogger) update() error {
if logger.IsResults {
var errResults, errVss error
if logger.ResultsConnection != nil {
logger.ChangeId++
updatereq := &results.StepsUpdateRequest{}
updatereq.ChangeOrder = logger.ChangeId
Expand All @@ -367,13 +374,17 @@ func (logger *JobLogger) update() error {
updatereq.Steps[i] = results.ConvertTimelineRecordToStep(*rec)
}
rs := &results.ResultsService{
Connection: logger.Connection,
Connection: logger.ResultsConnection,
}
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
return rs.UpdateWorkflowStepsAsync(ctx, updatereq)
errResults = rs.UpdateWorkflowStepsAsync(ctx, updatereq)
}

if !logger.IsResults {
errVss = logger.Connection.UpdateTimeLine(logger.JobRequest.Timeline.ID, logger.JobRequest, logger.TimelineRecords)
}
return logger.Connection.UpdateTimeLine(logger.JobRequest.Timeline.ID, logger.JobRequest, logger.TimelineRecords)
return errors.Join(errResults, errVss)
}

func (logger *JobLogger) Append(record protocol.TimelineRecord) *protocol.TimelineRecord {
Expand Down Expand Up @@ -410,12 +421,22 @@ func (logger *JobLogger) Log(lines string) {
logger.FirstJobBlock = true
}
lines = logger.linefeedregex.ReplaceAllString(strings.TrimSuffix(lines, "\r\n"), "\n")
_, _ = logger.JobBuffer.WriteString(lines + "\n")
if !logger.IsResults {
_, _ = logger.JobBuffer.WriteString(lines + "\n")
}
if logger.ResultsConnection != nil {
_, _ = logger.ResultsJobBuffer.WriteString(lines + "\n")
}
cur := logger.current()
if cur == nil {
return
}
_, _ = logger.CurrentBuffer.WriteString(lines + "\n")
if !logger.IsResults {
_, _ = logger.CurrentBuffer.WriteString(lines + "\n")
}
if logger.ResultsConnection != nil {
_, _ = logger.ResultsCurrentBuffer.WriteString(lines + "\n")
}
cline := logger.CurrentLine
wrapper := &protocol.TimelineRecordFeedLinesWrapper{
StartLine: &cline,
Expand Down

0 comments on commit 7068fc3

Please sign in to comment.