From c9ae4d6e77a0eb63c38bf0bbdf664d004b92d5e9 Mon Sep 17 00:00:00 2001 From: Maria Shaldybin Date: Mon, 15 Aug 2022 23:20:00 +0000 Subject: [PATCH] Cancel download of dependencies on failure Stop leaking disk space and goroutines. When one of the dependecies faisl to be downloaded cancel the rest of them and wait for goroutines to complete. --- depot/containerstore/dependencymanager.go | 15 +++++-- .../containerstore/dependencymanager_test.go | 40 +++++++++++++++++++ 2 files changed, 52 insertions(+), 3 deletions(-) diff --git a/depot/containerstore/dependencymanager.go b/depot/containerstore/dependencymanager.go index e4755485..a630de88 100644 --- a/depot/containerstore/dependencymanager.go +++ b/depot/containerstore/dependencymanager.go @@ -3,6 +3,7 @@ package containerstore import ( "fmt" "net/url" + "sync" "time" "code.cloudfoundry.org/bytefmt" @@ -47,14 +48,20 @@ func (bm *dependencyManager) DownloadCachedDependencies(logger lager.Logger, mou completed := 0 mountChan := make(chan *cachedBindMount, total) errChan := make(chan error, total) + cancelChan := make(chan struct{}, 0) bindMounts := NewBindMounts(total) if total == 0 { return bindMounts, nil } + var wg sync.WaitGroup + for i := range mounts { + wg.Add(1) go func(mount *executor.CachedDependency) { + defer wg.Done() + limiterStart := time.Now() bm.downloadRateLimiter <- struct{}{} limiterTime := time.Now().Sub(limiterStart) @@ -64,7 +71,7 @@ func (bm *dependencyManager) DownloadCachedDependencies(logger lager.Logger, mou <-bm.downloadRateLimiter }() - cachedMount, err := bm.downloadCachedDependency(logger, mount, logConfig, metronClient) + cachedMount, err := bm.downloadCachedDependency(logger, mount, logConfig, metronClient, cancelChan) if err != nil { errChan <- err } else { @@ -76,6 +83,8 @@ func (bm *dependencyManager) DownloadCachedDependencies(logger lager.Logger, mou for { select { case err := <-errChan: + close(cancelChan) + wg.Wait() return bindMounts, err case cachedMount := <-mountChan: bindMounts.AddBindMount(cachedMount.CacheKey, cachedMount.BindMount) @@ -87,7 +96,7 @@ func (bm *dependencyManager) DownloadCachedDependencies(logger lager.Logger, mou } } -func (bm *dependencyManager) downloadCachedDependency(logger lager.Logger, mount *executor.CachedDependency, logConfig executor.LogConfig, metronClient loggingclient.IngressClient) (*cachedBindMount, error) { +func (bm *dependencyManager) downloadCachedDependency(logger lager.Logger, mount *executor.CachedDependency, logConfig executor.LogConfig, metronClient loggingclient.IngressClient, cancelChan <-chan struct{}) (*cachedBindMount, error) { sourceName, tags := logConfig.GetSourceNameAndTagsForLogging() if mount.LogSource != "" { sourceName = mount.LogSource @@ -114,7 +123,7 @@ func (bm *dependencyManager) downloadCachedDependency(logger lager.Logger, mount Algorithm: mount.ChecksumAlgorithm, Value: mount.ChecksumValue, }, - nil, + cancelChan, ) if err != nil { logger.Error("failed-fetching-cache-dependency", err, lager.Data{"download-url": downloadURL.String(), "cache-key": mount.CacheKey}) diff --git a/depot/containerstore/dependencymanager_test.go b/depot/containerstore/dependencymanager_test.go index b80ae889..ccd14021 100644 --- a/depot/containerstore/dependencymanager_test.go +++ b/depot/containerstore/dependencymanager_test.go @@ -3,6 +3,7 @@ package containerstore_test import ( "errors" "net/url" + "time" "code.cloudfoundry.org/cacheddownloader" "code.cloudfoundry.org/cacheddownloader/cacheddownloaderfakes" @@ -37,6 +38,45 @@ var _ = Describe("DependencyManager", func() { } }) + Context("when fetching one of the dependencies fails", func() { + var dependency2ReceivedCancel, dependency3ReceivedCancel bool + + BeforeEach(func() { + dependencies = append(dependencies, + executor.CachedDependency{CacheKey: "cache-key-3", LogSource: "log-source-3", From: "https://example.com:8080/download-3", To: "/var/data/buildpack-3"}, + ) + + cache.FetchAsDirectoryStub = func(logger lager.Logger, urlToFetch *url.URL, cacheKey string, checksum cacheddownloader.ChecksumInfoType, cancelChan <-chan struct{}) (dirPath string, size int64, err error) { + switch cacheKey { + case "cache-key-1": + time.Sleep(1 * time.Second) + return "", 0, errors.New("failed-to-fetch") + case "cache-key-2": + select { + case <-cancelChan: + dependency2ReceivedCancel = true + return "", 0, errors.New("canceled") + } + case "cache-key-3": + select { + case <-cancelChan: + dependency3ReceivedCancel = true + return "", 0, errors.New("canceled") + } + } + return "", 0, errors.New("unknown-cache-key") + } + }) + + It("stops downloading the rest of the dependencies", func() { + _, err := dependencyManager.DownloadCachedDependencies(logger, dependencies, logConfig, fakeClient) + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError("failed-to-fetch")) + Expect(dependency2ReceivedCancel).To(BeTrue()) + Expect(dependency3ReceivedCancel).To(BeTrue()) + }) + }) + Context("when fetching all of the dependencies succeeds", func() { var bindMounts containerstore.BindMounts