Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Job Manager changes #1820

Merged
merged 4 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions common/chunkStatusLogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ type ChunkStatusLoggerCloser interface {
GetCounts(td TransferDirection) []chunkStatusCount
GetPrimaryPerfConstraint(td TransferDirection, rc RetryCounter) PerfConstraint
FlushLog() // not close, because we had issues with writes coming in after this // TODO: see if that issue still exists
CloseLogger()
}

type RetryCounter interface {
Expand Down Expand Up @@ -315,6 +316,18 @@ func (csl *chunkStatusLogger) FlushLog() {
}
}

// CloseLogger close the chunklogger thread.
func (csl *chunkStatusLogger) CloseLogger() {
// Once logger is closed, we log no more chunks.
csl.outputEnabled = false

/*
* No more chunks will ever be written, let the main logger know about this.
* On closing this channel the main logger will exit from its for-range loop.
*/
close(csl.unsavedEntries)
}

func (csl *chunkStatusLogger) main(chunkLogPath string) {
f, err := os.Create(chunkLogPath)
if err != nil {
Expand All @@ -332,18 +345,22 @@ func (csl *chunkStatusLogger) main(chunkLogPath string) {
defer doFlush()

alwaysFlushFromNowOn := false

// We will exit the following for-range loop after CloseLogger() closes the csl.unsavedEntries channel.
for x := range csl.unsavedEntries {
if x == nil {
alwaysFlushFromNowOn = true
doFlush()
csl.flushDone <- struct{}{}
continue // TODO can become break (or be moved to later if we close unsaved entries, once we figure out how we got stuff written to us after CloseLog was called)

}
_, _ = w.WriteString(fmt.Sprintf("%s,%d,%s,%s\n", x.Name, x.OffsetInFile(), x.reason, x.waitStart))
if alwaysFlushFromNowOn {
// TODO: remove when we figure out how we got stuff written to us after CloseLog was called. For now, this should handle those cases (if they still exist)
doFlush()
}

}
}

Expand Down
22 changes: 20 additions & 2 deletions common/singleChunkReader.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,14 +227,20 @@ func (cr *singleChunkReader) blockingPrefetch(fileReader io.ReaderAt, isRetry bo
// Must use "relaxed" RAM limit IFF this is a retry. Else, we can, in theory, get deadlock with all active goroutines blocked
// here doing retries, but no RAM _will_ become available because its
// all used by queued chunkfuncs (that can't be processed because all goroutines are active).
cr.chunkLogger.LogChunkStatus(cr.chunkId, EWaitReason.RAMToSchedule())
if cr.chunkLogger != nil {
cr.chunkLogger.LogChunkStatus(cr.chunkId, EWaitReason.RAMToSchedule())
}

err := cr.cacheLimiter.WaitUntilAdd(cr.ctx, cr.length, func() bool { return isRetry })
if err != nil {
return err
}

// prepare to read
cr.chunkLogger.LogChunkStatus(cr.chunkId, EWaitReason.DiskIO())
if cr.chunkLogger != nil {
cr.chunkLogger.LogChunkStatus(cr.chunkId, EWaitReason.DiskIO())
}

targetBuffer := cr.slicePool.RentSlice(cr.length)

// read WITHOUT holding the "close" lock. While we don't have the lock, we mutate ONLY local variables, no instance state.
Expand Down Expand Up @@ -412,6 +418,18 @@ func (cr *singleChunkReader) Close() error {
// do the real work
cr.closeBuffer()
cr.isClosed = true

/*
* Set chunkLogger to nil, so that chunkStatusLogger can be GC'ed.
*
* TODO: We should not need to explicitly set this to nil but today we have a yet-unknown ref on cr which
* is leaking this "big" chunkStatusLogger memory, so we cause that to be freed by force dropping this ref.
*
* Note: We are force setting this to nil and we safe guard against this by checking chunklogger not nil at respective places.
* At present this is called only from blockingPrefetch().
*/
cr.chunkLogger = nil

return nil
}

Expand Down
69 changes: 57 additions & 12 deletions jobsAdmin/JobsAdmin.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/Azure/azure-storage-azcopy/v10/ste"
"os"
"path/filepath"
"runtime"
Expand All @@ -35,6 +34,8 @@ import (
"sync/atomic"
"time"

"github.com/Azure/azure-storage-azcopy/v10/ste"

"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-storage-azcopy/v10/common"
)
Expand Down Expand Up @@ -99,6 +100,9 @@ var JobsAdmin interface {
TryGetPerformanceAdvice(bytesInJob uint64, filesInJob uint32, fromTo common.FromTo, dir common.TransferDirection, p *ste.PipelineNetworkStats) []common.PerformanceAdvice

SetConcurrencySettingsToAuto()

// JobMgrCleanUp do the JobMgr cleanup.
JobMgrCleanUp(jobId common.JobID)
}

func initJobsAdmin(appCtx context.Context, concurrency ste.ConcurrencySettings, targetRateInMegaBitsPerSec float64, azcopyJobPlanFolder string, azcopyLogPathFolder string, providePerfAdvice bool) {
Expand Down Expand Up @@ -241,19 +245,20 @@ type jobsAdmin struct {
logger common.ILoggerCloser
jobIDToJobMgr jobIDToJobMgr // Thread-safe map from each JobID to its JobInfo
// Other global state can be stored in more fields here...
logDir string // Where log files are stored
planDir string // Initialize to directory where Job Part Plans are stored
appCtx context.Context
pacer ste.PacerAdmin
slicePool common.ByteSlicePooler
cacheLimiter common.CacheLimiter
fileCountLimiter common.CacheLimiter
concurrencyTuner ste.ConcurrencyTuner
commandLineMbpsCap float64
logDir string // Where log files are stored
planDir string // Initialize to directory where Job Part Plans are stored
appCtx context.Context
pacer ste.PacerAdmin
slicePool common.ByteSlicePooler
cacheLimiter common.CacheLimiter
fileCountLimiter common.CacheLimiter
concurrencyTuner ste.ConcurrencyTuner
commandLineMbpsCap float64
provideBenchmarkResults bool
cpuMonitor common.CPUMonitor
jobLogger common.ILoggerResetable
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

func (ja *jobsAdmin) NewJobPartPlanFileName(jobID common.JobID, partNumber common.PartNumber) ste.JobPartPlanFileName {
Expand Down Expand Up @@ -296,6 +301,39 @@ func (ja *jobsAdmin) JobMgrEnsureExists(jobID common.JobID,
})
}

// JobMgrCleanup cleans up the jobMgr identified by the given jobId. It undoes what NewJobMgr() does, basically it does the following:
// 1. Stop all go routines started to process this job.
// 2. Release the memory allocated for this JobMgr instance.
// Note: this is not thread safe and only one goroutine should call this for a job.
func (ja *jobsAdmin) JobMgrCleanUp(jobId common.JobID) {
// First thing get the jobMgr.
jm, found := ja.JobMgr(jobId)

if found {
/*
* Change log level to Info, so that we can capture these messages in job log file.
* These log messages useful in debuggability and tells till what stage cleanup done.
*/
jm.Log(pipeline.LogInfo, "JobMgrCleanUp Enter")

// Delete the jobMgr from jobIDtoJobMgr map, so that next call will fail.
ja.DeleteJob(jobId)

jm.Log(pipeline.LogInfo, "Job deleted from jobMgr map")

/*
* Rest of jobMgr related cleanup done by DeferredCleanupJobMgr function.
* Now that we have removed the jobMgr from the map, no new caller will find it and hence cannot start any
* new activity using the jobMgr. We cleanup the resources of the jobMgr in a deferred manner as a safety net
* to allow processing any messages that may be in transit.
*
* NOTE: This is not really required but we don't want to miss any in-transit messages as some of the TODOs in
* the code suggest.
*/
go jm.DeferredCleanupJobMgr()
}
}

func (ja *jobsAdmin) BytesOverWire() int64 {
return ja.pacer.GetTotalTraffic()
}
Expand Down Expand Up @@ -507,6 +545,13 @@ func (ja *jobsAdmin) TryGetPerformanceAdvice(bytesInJob uint64, filesInJob uint3
a := ste.NewPerformanceAdvisor(p, ja.commandLineMbpsCap, int64(megabitsPerSec), finalReason, finalConcurrency, dir, averageBytesPerFile, isToAzureFiles)
return a.GetAdvice()
}

//Structs for messageHandler

/* PerfAdjustment message. */
type jaPerfAdjustmentMsg struct {
Throughput int64 `json:"cap-mbps,string"`
}

func (ja *jobsAdmin) messageHandler(inputChan <-chan *common.LCMMsg) {
toBitsPerSec := func(megaBitsPerSec int64) int64 {
Expand All @@ -516,7 +561,7 @@ func (ja *jobsAdmin) messageHandler(inputChan <-chan *common.LCMMsg) {
const minIntervalBetweenPerfAdjustment = time.Minute
lastPerfAdjustTime := time.Now().Add(-2 * minIntervalBetweenPerfAdjustment)
var err error

for {
msg := <-inputChan
var msgType common.LCMMsgType
Expand All @@ -534,7 +579,7 @@ func (ja *jobsAdmin) messageHandler(inputChan <-chan *common.LCMMsg) {
if e := json.Unmarshal([]byte(msg.Req.Value), &perfAdjustmentReq); e != nil {
err = fmt.Errorf("parsing %s failed with %s", msg.Req.Value, e.Error())
}

if perfAdjustmentReq.Throughput < 0 {
err = fmt.Errorf("invalid value %d for cap-mbps. cap-mpbs should be greater than 0",
perfAdjustmentReq.Throughput)
Expand Down
19 changes: 15 additions & 4 deletions ste/jobStatusManager.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ package ste
import (
"time"

"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-storage-azcopy/v10/common"
)

type JobPartCreatedMsg struct {
TotalTransfers uint32
IsFinalPart bool
TotalTransfers uint32
IsFinalPart bool
TotalBytesEnumerated uint64
FileTransfers uint32
FolderTransfer uint32
FileTransfers uint32
FolderTransfer uint32
}

type xferDoneMsg = common.TransferDetail
Expand All @@ -41,6 +42,7 @@ type jobStatusManager struct {
listReq chan bool
partCreated chan JobPartCreatedMsg
xferDone chan xferDoneMsg
done chan struct{}
}

/* These functions should not fail */
Expand All @@ -61,6 +63,11 @@ func (jm *jobMgr) ResurrectSummary(js common.ListJobSummaryResponse) {
jm.jstm.js = js
}

func (jm *jobMgr) CleanupJobStatusMgr() {
jm.Log(pipeline.LogInfo, "CleanJobStatusMgr called.")
jm.jstm.done <- struct{}{}
}

func (jm *jobMgr) handleStatusUpdateMessage() {
jstm := jm.jstm
js := &jstm.js
Expand Down Expand Up @@ -106,6 +113,10 @@ func (jm *jobMgr) handleStatusUpdateMessage() {
// There is no need to keep sending the same items over and over again
js.FailedTransfers = []common.TransferDetail{}
js.SkippedTransfers = []common.TransferDetail{}

case <-jstm.done:
jm.Log(pipeline.LogInfo, "Cleanup JobStatusmgr.")
return
}
}
}
Loading