diff --git a/common/chunkStatusLogger.go b/common/chunkStatusLogger.go index 134be24a7..573bfebf1 100644 --- a/common/chunkStatusLogger.go +++ b/common/chunkStatusLogger.go @@ -241,6 +241,7 @@ type ChunkStatusLoggerCloser interface { GetCounts(td TransferDirection) []chunkStatusCount GetPrimaryPerfConstraint(td TransferDirection, rc RetryCounter) PerfConstraint FlushLog() // not close, because we had issues with writes coming in after this // TODO: see if that issue still exists + CloseLogger() } type RetryCounter interface { @@ -315,6 +316,18 @@ func (csl *chunkStatusLogger) FlushLog() { } } +// CloseLogger close the chunklogger thread. +func (csl *chunkStatusLogger) CloseLogger() { + // Once logger is closed, we log no more chunks. + csl.outputEnabled = false + + /* + * No more chunks will ever be written, let the main logger know about this. + * On closing this channel the main logger will exit from its for-range loop. + */ + close(csl.unsavedEntries) +} + func (csl *chunkStatusLogger) main(chunkLogPath string) { f, err := os.Create(chunkLogPath) if err != nil { @@ -332,18 +345,22 @@ func (csl *chunkStatusLogger) main(chunkLogPath string) { defer doFlush() alwaysFlushFromNowOn := false + + // We will exit the following for-range loop after CloseLogger() closes the csl.unsavedEntries channel. for x := range csl.unsavedEntries { if x == nil { alwaysFlushFromNowOn = true doFlush() csl.flushDone <- struct{}{} continue // TODO can become break (or be moved to later if we close unsaved entries, once we figure out how we got stuff written to us after CloseLog was called) + } _, _ = w.WriteString(fmt.Sprintf("%s,%d,%s,%s\n", x.Name, x.OffsetInFile(), x.reason, x.waitStart)) if alwaysFlushFromNowOn { // TODO: remove when we figure out how we got stuff written to us after CloseLog was called. For now, this should handle those cases (if they still exist) doFlush() } + } } diff --git a/common/singleChunkReader.go b/common/singleChunkReader.go index 2e9d2f8d8..d362c2ea8 100644 --- a/common/singleChunkReader.go +++ b/common/singleChunkReader.go @@ -227,14 +227,20 @@ func (cr *singleChunkReader) blockingPrefetch(fileReader io.ReaderAt, isRetry bo // Must use "relaxed" RAM limit IFF this is a retry. Else, we can, in theory, get deadlock with all active goroutines blocked // here doing retries, but no RAM _will_ become available because its // all used by queued chunkfuncs (that can't be processed because all goroutines are active). - cr.chunkLogger.LogChunkStatus(cr.chunkId, EWaitReason.RAMToSchedule()) + if cr.chunkLogger != nil { + cr.chunkLogger.LogChunkStatus(cr.chunkId, EWaitReason.RAMToSchedule()) + } + err := cr.cacheLimiter.WaitUntilAdd(cr.ctx, cr.length, func() bool { return isRetry }) if err != nil { return err } // prepare to read - cr.chunkLogger.LogChunkStatus(cr.chunkId, EWaitReason.DiskIO()) + if cr.chunkLogger != nil { + cr.chunkLogger.LogChunkStatus(cr.chunkId, EWaitReason.DiskIO()) + } + targetBuffer := cr.slicePool.RentSlice(cr.length) // read WITHOUT holding the "close" lock. While we don't have the lock, we mutate ONLY local variables, no instance state. @@ -412,6 +418,18 @@ func (cr *singleChunkReader) Close() error { // do the real work cr.closeBuffer() cr.isClosed = true + + /* + * Set chunkLogger to nil, so that chunkStatusLogger can be GC'ed. + * + * TODO: We should not need to explicitly set this to nil but today we have a yet-unknown ref on cr which + * is leaking this "big" chunkStatusLogger memory, so we cause that to be freed by force dropping this ref. + * + * Note: We are force setting this to nil and we safe guard against this by checking chunklogger not nil at respective places. + * At present this is called only from blockingPrefetch(). + */ + cr.chunkLogger = nil + return nil } diff --git a/jobsAdmin/JobsAdmin.go b/jobsAdmin/JobsAdmin.go old mode 100644 new mode 100755 index 87130391c..f71a48e68 --- a/jobsAdmin/JobsAdmin.go +++ b/jobsAdmin/JobsAdmin.go @@ -24,7 +24,6 @@ import ( "context" "encoding/json" "fmt" - "github.com/Azure/azure-storage-azcopy/v10/ste" "os" "path/filepath" "runtime" @@ -35,6 +34,8 @@ import ( "sync/atomic" "time" + "github.com/Azure/azure-storage-azcopy/v10/ste" + "github.com/Azure/azure-pipeline-go/pipeline" "github.com/Azure/azure-storage-azcopy/v10/common" ) @@ -99,6 +100,9 @@ var JobsAdmin interface { TryGetPerformanceAdvice(bytesInJob uint64, filesInJob uint32, fromTo common.FromTo, dir common.TransferDirection, p *ste.PipelineNetworkStats) []common.PerformanceAdvice SetConcurrencySettingsToAuto() + + // JobMgrCleanUp do the JobMgr cleanup. + JobMgrCleanUp(jobId common.JobID) } func initJobsAdmin(appCtx context.Context, concurrency ste.ConcurrencySettings, targetRateInMegaBitsPerSec float64, azcopyJobPlanFolder string, azcopyLogPathFolder string, providePerfAdvice bool) { @@ -241,19 +245,20 @@ type jobsAdmin struct { logger common.ILoggerCloser jobIDToJobMgr jobIDToJobMgr // Thread-safe map from each JobID to its JobInfo // Other global state can be stored in more fields here... - logDir string // Where log files are stored - planDir string // Initialize to directory where Job Part Plans are stored - appCtx context.Context - pacer ste.PacerAdmin - slicePool common.ByteSlicePooler - cacheLimiter common.CacheLimiter - fileCountLimiter common.CacheLimiter - concurrencyTuner ste.ConcurrencyTuner - commandLineMbpsCap float64 + logDir string // Where log files are stored + planDir string // Initialize to directory where Job Part Plans are stored + appCtx context.Context + pacer ste.PacerAdmin + slicePool common.ByteSlicePooler + cacheLimiter common.CacheLimiter + fileCountLimiter common.CacheLimiter + concurrencyTuner ste.ConcurrencyTuner + commandLineMbpsCap float64 provideBenchmarkResults bool cpuMonitor common.CPUMonitor jobLogger common.ILoggerResetable } + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// func (ja *jobsAdmin) NewJobPartPlanFileName(jobID common.JobID, partNumber common.PartNumber) ste.JobPartPlanFileName { @@ -296,6 +301,39 @@ func (ja *jobsAdmin) JobMgrEnsureExists(jobID common.JobID, }) } +// JobMgrCleanup cleans up the jobMgr identified by the given jobId. It undoes what NewJobMgr() does, basically it does the following: +// 1. Stop all go routines started to process this job. +// 2. Release the memory allocated for this JobMgr instance. +// Note: this is not thread safe and only one goroutine should call this for a job. +func (ja *jobsAdmin) JobMgrCleanUp(jobId common.JobID) { + // First thing get the jobMgr. + jm, found := ja.JobMgr(jobId) + + if found { + /* + * Change log level to Info, so that we can capture these messages in job log file. + * These log messages useful in debuggability and tells till what stage cleanup done. + */ + jm.Log(pipeline.LogInfo, "JobMgrCleanUp Enter") + + // Delete the jobMgr from jobIDtoJobMgr map, so that next call will fail. + ja.DeleteJob(jobId) + + jm.Log(pipeline.LogInfo, "Job deleted from jobMgr map") + + /* + * Rest of jobMgr related cleanup done by DeferredCleanupJobMgr function. + * Now that we have removed the jobMgr from the map, no new caller will find it and hence cannot start any + * new activity using the jobMgr. We cleanup the resources of the jobMgr in a deferred manner as a safety net + * to allow processing any messages that may be in transit. + * + * NOTE: This is not really required but we don't want to miss any in-transit messages as some of the TODOs in + * the code suggest. + */ + go jm.DeferredCleanupJobMgr() + } +} + func (ja *jobsAdmin) BytesOverWire() int64 { return ja.pacer.GetTotalTraffic() } @@ -507,6 +545,13 @@ func (ja *jobsAdmin) TryGetPerformanceAdvice(bytesInJob uint64, filesInJob uint3 a := ste.NewPerformanceAdvisor(p, ja.commandLineMbpsCap, int64(megabitsPerSec), finalReason, finalConcurrency, dir, averageBytesPerFile, isToAzureFiles) return a.GetAdvice() } + +//Structs for messageHandler + +/* PerfAdjustment message. */ +type jaPerfAdjustmentMsg struct { + Throughput int64 `json:"cap-mbps,string"` +} func (ja *jobsAdmin) messageHandler(inputChan <-chan *common.LCMMsg) { toBitsPerSec := func(megaBitsPerSec int64) int64 { @@ -516,7 +561,7 @@ func (ja *jobsAdmin) messageHandler(inputChan <-chan *common.LCMMsg) { const minIntervalBetweenPerfAdjustment = time.Minute lastPerfAdjustTime := time.Now().Add(-2 * minIntervalBetweenPerfAdjustment) var err error - + for { msg := <-inputChan var msgType common.LCMMsgType @@ -534,7 +579,7 @@ func (ja *jobsAdmin) messageHandler(inputChan <-chan *common.LCMMsg) { if e := json.Unmarshal([]byte(msg.Req.Value), &perfAdjustmentReq); e != nil { err = fmt.Errorf("parsing %s failed with %s", msg.Req.Value, e.Error()) } - + if perfAdjustmentReq.Throughput < 0 { err = fmt.Errorf("invalid value %d for cap-mbps. cap-mpbs should be greater than 0", perfAdjustmentReq.Throughput) diff --git a/ste/jobStatusManager.go b/ste/jobStatusManager.go old mode 100644 new mode 100755 index 6b264d9bc..f4114dcc7 --- a/ste/jobStatusManager.go +++ b/ste/jobStatusManager.go @@ -23,15 +23,16 @@ package ste import ( "time" + "github.com/Azure/azure-pipeline-go/pipeline" "github.com/Azure/azure-storage-azcopy/v10/common" ) type JobPartCreatedMsg struct { - TotalTransfers uint32 - IsFinalPart bool + TotalTransfers uint32 + IsFinalPart bool TotalBytesEnumerated uint64 - FileTransfers uint32 - FolderTransfer uint32 + FileTransfers uint32 + FolderTransfer uint32 } type xferDoneMsg = common.TransferDetail @@ -41,6 +42,7 @@ type jobStatusManager struct { listReq chan bool partCreated chan JobPartCreatedMsg xferDone chan xferDoneMsg + done chan struct{} } /* These functions should not fail */ @@ -61,6 +63,11 @@ func (jm *jobMgr) ResurrectSummary(js common.ListJobSummaryResponse) { jm.jstm.js = js } +func (jm *jobMgr) CleanupJobStatusMgr() { + jm.Log(pipeline.LogInfo, "CleanJobStatusMgr called.") + jm.jstm.done <- struct{}{} +} + func (jm *jobMgr) handleStatusUpdateMessage() { jstm := jm.jstm js := &jstm.js @@ -106,6 +113,10 @@ func (jm *jobMgr) handleStatusUpdateMessage() { // There is no need to keep sending the same items over and over again js.FailedTransfers = []common.TransferDetail{} js.SkippedTransfers = []common.TransferDetail{} + + case <-jstm.done: + jm.Log(pipeline.LogInfo, "Cleanup JobStatusmgr.") + return } } } diff --git a/ste/mgr-JobMgr.go b/ste/mgr-JobMgr.go old mode 100644 new mode 100755 index 4ff6fabe2..2920b83ae --- a/ste/mgr-JobMgr.go +++ b/ste/mgr-JobMgr.go @@ -101,6 +101,10 @@ type IJobMgr interface { SuccessfulBytesInActiveFiles() uint64 CancelPauseJobOrder(desiredJobStatus common.JobStatus) common.CancelPauseResumeResponse IsDaemon() bool + + // Cleanup Functions + DeferredCleanupJobMgr() + CleanupJobStatusMgr() } // ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -137,6 +141,12 @@ func NewJobMgr(concurrency ConcurrencySettings, jobID common.JobID, appCtx conte jstm.listReq = make(chan bool) jstm.partCreated = make(chan JobPartCreatedMsg, 100) jstm.xferDone = make(chan xferDoneMsg, 1000) + jstm.done = make(chan struct{}, 1) + // Different logger for each job. + if jobLogger == nil { + jobLogger = common.NewJobLogger(jobID, common.ELogLevel.Debug(), logFileFolder, "" /* logFileNameSuffix */) + jobLogger.OpenLog() + } jm := jobMgr{jobID: jobID, jobPartMgrs: newJobPartToJobPartMgr(), include: map[string]int{}, exclude: map[string]int{}, httpClient: NewAzcopyHTTPClient(concurrency.MaxIdleConnections), @@ -147,6 +157,7 @@ func NewJobMgr(concurrency ConcurrencySettings, jobID common.JobID, appCtx conte pipelineNetworkStats: newPipelineNetworkStats(tuner), // let the stats coordinate with the concurrency tuner initMu: &sync.Mutex{}, jobPartProgress: jobPartProgressCh, + reportCancelCh: make(chan struct{}, 1), coordinatorChannels: CoordinatorChannels{ partsChannel: partsCh, normalTransferCh: normalTransferCh, @@ -158,12 +169,15 @@ func NewJobMgr(concurrency ConcurrencySettings, jobID common.JobID, appCtx conte lowTransferCh: lowTransferCh, normalChunckCh: normalChunkCh, lowChunkCh: lowChunkCh, + closeTransferCh: make(chan struct{}, 100), + scheduleCloseCh: make(chan struct{}, 1), }, poolSizingChannels: poolSizingChannels{ // all deliberately unbuffered, because pool sizer routine works in lock-step with these - processing them as they happen, never catching up on populated buffer later entryNotificationCh: make(chan struct{}), exitNotificationCh: make(chan struct{}), scalebackRequestCh: make(chan struct{}), requestSlowTuneCh: make(chan struct{}), + done: make(chan struct{}, 1), }, concurrencyTuner: tuner, pacer: pacer, @@ -288,6 +302,10 @@ type jobMgr struct { httpClient *http.Client jobPartMgrs jobPartToJobPartMgr // The map of part #s to JobPartMgrs + + // reportCancelCh to close the report thread. + reportCancelCh chan struct{} + // partsDone keep the count of completed part of the Job. partsDone uint32 // throughput common.CountPerSecond // TODO: Set LastCheckedTime to now @@ -586,62 +604,76 @@ func (jm *jobMgr) reportJobPartDoneHandler() { shouldLog := jm.ShouldLog(pipeline.LogInfo) for { - partProgressInfo := <-jm.jobPartProgress - jobPart0Mgr, ok := jm.jobPartMgrs.Get(0) - if !ok { - jm.Panic(fmt.Errorf("Failed to find Job %v, Part #0", jm.jobID)) - } - part0Plan := jobPart0Mgr.Plan() - jobStatus := part0Plan.JobStatus() // status of part 0 is status of job as a whole - partsDone := atomic.AddUint32(&jm.partsDone, 1) - jobProgressInfo.transfersCompleted += partProgressInfo.transfersCompleted - jobProgressInfo.transfersSkipped += partProgressInfo.transfersSkipped - jobProgressInfo.transfersFailed += partProgressInfo.transfersFailed - - if partProgressInfo.completionChan != nil { - close(partProgressInfo.completionChan) - } + select { + case <-jm.reportCancelCh: + jobPart0Mgr, ok := jm.jobPartMgrs.Get(0) + if ok { + part0plan := jobPart0Mgr.Plan() + if part0plan.JobStatus() == common.EJobStatus.InProgress() || + part0plan.JobStatus() == common.EJobStatus.Cancelling() { + jm.Panic(fmt.Errorf("reportCancelCh received cancel event while job still not completed, Job(%s) in state: %s", + jm.jobID.String(), part0plan.JobStatus())) + } + } else { + jm.Log(pipeline.LogError, "part0Plan of job invalid") + } + jm.Log(pipeline.LogInfo, "reportJobPartDoneHandler done called") + return - // If the last part is still awaited or other parts all still not complete, - // JobPart 0 status is not changed (unless we are cancelling) - haveFinalPart = atomic.LoadInt32(&jm.atomicFinalPartOrderedIndicator) == 1 - allKnownPartsDone := partsDone == jm.jobPartMgrs.Count() - isCancelling := jobStatus == common.EJobStatus.Cancelling() - shouldComplete := allKnownPartsDone && (haveFinalPart || isCancelling) - if shouldComplete { - partDescription := "all parts of entire Job" - if !haveFinalPart { - partDescription = "known parts of incomplete Job" + case partProgressInfo := <-jm.jobPartProgress: + jobPart0Mgr, ok := jm.jobPartMgrs.Get(0) + if !ok { + jm.Panic(fmt.Errorf("Failed to find Job %v, Part #0", jm.jobID)) } - if shouldLog { - jm.Log(pipeline.LogInfo, fmt.Sprintf("%s %s successfully completed, cancelled or paused", partDescription, jm.jobID.String())) + part0Plan := jobPart0Mgr.Plan() + jobStatus := part0Plan.JobStatus() // status of part 0 is status of job as a whole + partsDone := atomic.AddUint32(&jm.partsDone, 1) + jobProgressInfo.transfersCompleted += partProgressInfo.transfersCompleted + jobProgressInfo.transfersSkipped += partProgressInfo.transfersSkipped + jobProgressInfo.transfersFailed += partProgressInfo.transfersFailed + + if partProgressInfo.completionChan != nil { + close(partProgressInfo.completionChan) } - switch part0Plan.JobStatus() { - case common.EJobStatus.Cancelling(): - part0Plan.SetJobStatus(common.EJobStatus.Cancelled()) + // If the last part is still awaited or other parts all still not complete, + // JobPart 0 status is not changed (unless we are cancelling) + haveFinalPart = atomic.LoadInt32(&jm.atomicFinalPartOrderedIndicator) == 1 + allKnownPartsDone := partsDone == jm.jobPartMgrs.Count() + isCancelling := jobStatus == common.EJobStatus.Cancelling() + shouldComplete := allKnownPartsDone && (haveFinalPart || isCancelling) + if shouldComplete { + partDescription := "all parts of entire Job" + if !haveFinalPart { + partDescription = "known parts of incomplete Job" + } if shouldLog { - jm.Log(pipeline.LogInfo, fmt.Sprintf("%s %v successfully cancelled", partDescription, jm.jobID)) + jm.Log(pipeline.LogInfo, fmt.Sprintf("%s %s successfully completed, cancelled or paused", partDescription, jm.jobID.String())) } - case common.EJobStatus.InProgress(): - part0Plan.SetJobStatus((common.EJobStatus).EnhanceJobStatusInfo(jobProgressInfo.transfersSkipped > 0, - jobProgressInfo.transfersFailed > 0, - jobProgressInfo.transfersCompleted > 0)) - } - // reset counters - atomic.StoreUint32(&jm.partsDone, 0) - jobProgressInfo = jobPartProgressInfo{} + switch part0Plan.JobStatus() { + case common.EJobStatus.Cancelling(): + part0Plan.SetJobStatus(common.EJobStatus.Cancelled()) + if shouldLog { + jm.Log(pipeline.LogInfo, fmt.Sprintf("%s %v successfully cancelled", partDescription, jm.jobID)) + } + case common.EJobStatus.InProgress(): + part0Plan.SetJobStatus((common.EJobStatus).EnhanceJobStatusInfo(jobProgressInfo.transfersSkipped > 0, + jobProgressInfo.transfersFailed > 0, + jobProgressInfo.transfersCompleted > 0)) + } - // flush logs - jm.chunkStatusLogger.FlushLog() // TODO: remove once we sort out what will be calling CloseLog (currently nothing) - if allKnownPartsDone { - common.GetLifecycleMgr().ReportAllJobPartsDone() - } - } // Else log and wait for next part to complete + // reset counters + atomic.StoreUint32(&jm.partsDone, 0) + jobProgressInfo = jobPartProgressInfo{} - if shouldLog { - jm.Log(pipeline.LogInfo, fmt.Sprintf("is part of Job which %d total number of parts done ", partsDone)) + // flush logs + jm.chunkStatusLogger.FlushLog() // TODO: remove once we sort out what will be calling CloseLog (currently nothing) + } //Else log and wait for next part to complete + + if shouldLog { + jm.Log(pipeline.LogInfo, fmt.Sprintf("is part of Job which %d total number of parts done ", partsDone)) + } } } } @@ -672,10 +704,54 @@ func (jm *jobMgr) CloseLog() { jm.chunkStatusLogger.FlushLog() } +// DeferredCleanupJobMgr cleanup all the jobMgr resources. +// Warning: DeferredCleanupJobMgr should be called from JobMgrCleanup(). +// As this function neither threadsafe nor idempotient. So if DeferredCleanupJobMgr called +// mulitple times, it may stuck as receiving channel already closed. Where as JobMgrCleanup() +// safe in that sense it will do the cleanup only once. +// +// TODO: Add JobsAdmin reference to each JobMgr so that in any circumstances JobsAdmin should not freed, +// while jobMgr running. Whereas JobsAdmin store number JobMgr running at any time. +// At that point DeferredCleanupJobMgr() will delete jobMgr from jobsAdmin map. +func (jm *jobMgr) DeferredCleanupJobMgr() { + jm.Log(pipeline.LogInfo, "DeferredCleanupJobMgr called") + + time.Sleep(60 * time.Second) + + jm.Log(pipeline.LogInfo, "DeferredCleanupJobMgr out of sleep") + + // Call jm.Cancel to signal routines workdone. + // This will take care of any jobPartMgr release. + jm.Cancel() + + // Cleanup the JobStatusMgr go routine. + jm.CleanupJobStatusMgr() + + // Transfer Thread Cleanup. + jm.cleanupTransferRoutine() + + // Remove JobPartsMgr from jobPartMgr kv. + jm.deleteJobPartsMgrs() + + // Close chunk status logger. + jm.cleanupChunkStatusLogger() + jm.Log(pipeline.LogInfo, "DeferredCleanupJobMgr Exit, Closing the log") + + // Sleep for sometime so that all go routine done with cleanUp and log the progress in job log. + time.Sleep(60 * time.Second) + + jm.logger.CloseLog() +} + func (jm *jobMgr) ChunkStatusLogger() common.ChunkStatusLogger { return jm.chunkStatusLogger } +func (jm *jobMgr) cleanupChunkStatusLogger() { + jm.chunkStatusLogger.FlushLog() + jm.chunkStatusLogger.CloseLogger() +} + // PartsDone returns the number of the Job's parts that are either completed or failed // func (jm *jobMgr) PartsDone() uint32 { return atomic.LoadUint32(&jm.partsDone) } @@ -697,6 +773,8 @@ type XferChannels struct { lowTransferCh <-chan IJobPartTransferMgr // Read-only normalChunckCh chan chunkFunc // Read-write lowChunkCh chan chunkFunc // Read-write + closeTransferCh chan struct{} + scheduleCloseCh chan struct{} } type poolSizingChannels struct { @@ -704,6 +782,7 @@ type poolSizingChannels struct { exitNotificationCh chan struct{} scalebackRequestCh chan struct{} requestSlowTuneCh chan struct{} + done chan struct{} } ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -744,6 +823,26 @@ func (jm *jobMgr) QueueJobParts(jpm IJobPartMgr) { jm.coordinatorChannels.partsChannel <- jpm } +// deleteJobPartsMgrs remove jobPartMgrs from jobPartToJobPartMgr kv. +func (jm *jobMgr) deleteJobPartsMgrs() { + jm.Log(pipeline.LogInfo, "deleteJobPartsMgrs enter") + jm.jobPartMgrs.Iterate(false, func(k common.PartNumber, v IJobPartMgr) { + v.Close() + delete(jm.jobPartMgrs.m, k) + }) + jm.Log(pipeline.LogInfo, "deleteJobPartsMgrs exit") +} + +// cleanupTransferRoutine closes all the Transfer thread. +// Note: Created the buffer channel so that, if somehow any thread missing(down), it should not stuck. +func (jm *jobMgr) cleanupTransferRoutine() { + jm.reportCancelCh <- struct{}{} + jm.xferChannels.scheduleCloseCh <- struct{}{} + for cc := 0; cc < jm.concurrency.TransferInitiationPoolSize.Value; cc++ { + jm.xferChannels.closeTransferCh <- struct{}{} + } +} + // worker that sizes the chunkProcessor pool, dynamically if necessary func (jm *jobMgr) poolSizer() { @@ -784,10 +883,15 @@ func (jm *jobMgr) poolSizer() { } else if actualConcurrency > targetConcurrency { hasHadTimeToStablize = false jm.poolSizingChannels.scalebackRequestCh <- struct{}{} + } else if actualConcurrency == 0 && targetConcurrency == 0 { + jm.Log(pipeline.LogInfo, "Exits Pool sizer") + return } // wait for something to happen (maybe ack from the worker of the change, else a timer interval) select { + case <-jm.poolSizingChannels.done: + targetConcurrency = 0 case <-jm.poolSizingChannels.entryNotificationCh: // new worker has started actualConcurrency++ @@ -802,7 +906,7 @@ func (jm *jobMgr) poolSizer() { throughputMonitoringInterval = expandedMonitoringInterval slowTuneCh = nil // so we won't keep running this case at the expense of others) case <-time.After(throughputMonitoringInterval): - if actualConcurrency == targetConcurrency { // scalebacks can take time. Don't want to do any tuning if actual is not yet aligned to target + if targetConcurrency != 0 && actualConcurrency == targetConcurrency { // scalebacks can take time. Don't want to do any tuning if actual is not yet aligned to target bytesOnWire := jm.pacer.GetTotalTraffic() if hasHadTimeToStablize { // throughput has had time to stabilize since last change, so we can meaningfully measure and act on throughput @@ -840,15 +944,22 @@ func (jm *jobMgr) RequestTuneSlowly() { func (jm *jobMgr) scheduleJobParts() { startedPoolSizer := false for { - jobPart := <-jm.xferChannels.partsChannel + select { + case <-jm.xferChannels.scheduleCloseCh: + jm.Log(pipeline.LogInfo, "ScheduleJobParts done called") + jm.poolSizingChannels.done <- struct{}{} + return + + case jobPart := <-jm.xferChannels.partsChannel: - if !startedPoolSizer { - // spin up a GR to co-ordinate dynamic sizing of the main pool - // It will automatically spin up the right number of chunk processors - go jm.poolSizer() - startedPoolSizer = true + if !startedPoolSizer { + // spin up a GR to co-ordinate dynamic sizing of the main pool + // It will automatically spin up the right number of chunk processors + go jm.poolSizer() + startedPoolSizer = true + } + jobPart.ScheduleTransfers(jm.Context()) } - jobPart.ScheduleTransfers(jm.Context()) } } @@ -905,8 +1016,13 @@ func (jm *jobMgr) transferProcessor(workerID int) { for { // No scaleback check here, because this routine runs only in a small number of goroutines, so no need to kill them off select { + case <-jm.xferChannels.closeTransferCh: + jm.Log(pipeline.LogInfo, "transferProcessor done called") + return + case jptm := <-jm.xferChannels.normalTransferCh: startTransfer(jptm) + default: select { case jptm := <-jm.xferChannels.lowTransferCh: diff --git a/ste/mgr-JobPartMgr.go b/ste/mgr-JobPartMgr.go index ceda812c7..0b5cae224 100644 --- a/ste/mgr-JobPartMgr.go +++ b/ste/mgr-JobPartMgr.go @@ -877,6 +877,18 @@ func (jpm *jobPartMgr) Close() { jpm.httpHeaders = common.ResourceHTTPHeaders{} jpm.metadata = common.Metadata{} jpm.preserveLastModifiedTime = false + + /* + * Set pipeline to nil, so that jpm/JobMgr can be GC'ed. + * + * TODO: We should not need to explicitly set this to nil but today we have a yet-unknown ref on pipeline which + * is leaking JobMgr memory, so we cause that to be freed by force dropping this ref. + * + * Note: Force setting this to nil can technically result in crashes since the containing object is still around, + * but we should be protected against that since we do this Close in a deferred manner, at least few minutes after the job completes. + */ + jpm.pipeline = nil + // TODO: Delete file? /*if err := os.Remove(jpm.planFile.Name()); err != nil { jpm.Panic(fmt.Errorf("error removing Job Part Plan file %s. Error=%v", jpm.planFile.Name(), err))