diff --git a/depot/containerstore/dependencymanager.go b/depot/containerstore/dependencymanager.go index e4755485..a630de88 100644 --- a/depot/containerstore/dependencymanager.go +++ b/depot/containerstore/dependencymanager.go @@ -3,6 +3,7 @@ package containerstore import ( "fmt" "net/url" + "sync" "time" "code.cloudfoundry.org/bytefmt" @@ -47,14 +48,20 @@ func (bm *dependencyManager) DownloadCachedDependencies(logger lager.Logger, mou completed := 0 mountChan := make(chan *cachedBindMount, total) errChan := make(chan error, total) + cancelChan := make(chan struct{}, 0) bindMounts := NewBindMounts(total) if total == 0 { return bindMounts, nil } + var wg sync.WaitGroup + for i := range mounts { + wg.Add(1) go func(mount *executor.CachedDependency) { + defer wg.Done() + limiterStart := time.Now() bm.downloadRateLimiter <- struct{}{} limiterTime := time.Now().Sub(limiterStart) @@ -64,7 +71,7 @@ func (bm *dependencyManager) DownloadCachedDependencies(logger lager.Logger, mou <-bm.downloadRateLimiter }() - cachedMount, err := bm.downloadCachedDependency(logger, mount, logConfig, metronClient) + cachedMount, err := bm.downloadCachedDependency(logger, mount, logConfig, metronClient, cancelChan) if err != nil { errChan <- err } else { @@ -76,6 +83,8 @@ func (bm *dependencyManager) DownloadCachedDependencies(logger lager.Logger, mou for { select { case err := <-errChan: + close(cancelChan) + wg.Wait() return bindMounts, err case cachedMount := <-mountChan: bindMounts.AddBindMount(cachedMount.CacheKey, cachedMount.BindMount) @@ -87,7 +96,7 @@ func (bm *dependencyManager) DownloadCachedDependencies(logger lager.Logger, mou } } -func (bm *dependencyManager) downloadCachedDependency(logger lager.Logger, mount *executor.CachedDependency, logConfig executor.LogConfig, metronClient loggingclient.IngressClient) (*cachedBindMount, error) { +func (bm *dependencyManager) downloadCachedDependency(logger lager.Logger, mount *executor.CachedDependency, logConfig executor.LogConfig, metronClient loggingclient.IngressClient, cancelChan <-chan struct{}) (*cachedBindMount, error) { sourceName, tags := logConfig.GetSourceNameAndTagsForLogging() if mount.LogSource != "" { sourceName = mount.LogSource @@ -114,7 +123,7 @@ func (bm *dependencyManager) downloadCachedDependency(logger lager.Logger, mount Algorithm: mount.ChecksumAlgorithm, Value: mount.ChecksumValue, }, - nil, + cancelChan, ) if err != nil { logger.Error("failed-fetching-cache-dependency", err, lager.Data{"download-url": downloadURL.String(), "cache-key": mount.CacheKey}) diff --git a/depot/containerstore/dependencymanager_test.go b/depot/containerstore/dependencymanager_test.go index b80ae889..ccd14021 100644 --- a/depot/containerstore/dependencymanager_test.go +++ b/depot/containerstore/dependencymanager_test.go @@ -3,6 +3,7 @@ package containerstore_test import ( "errors" "net/url" + "time" "code.cloudfoundry.org/cacheddownloader" "code.cloudfoundry.org/cacheddownloader/cacheddownloaderfakes" @@ -37,6 +38,45 @@ var _ = Describe("DependencyManager", func() { } }) + Context("when fetching one of the dependencies fails", func() { + var dependency2ReceivedCancel, dependency3ReceivedCancel bool + + BeforeEach(func() { + dependencies = append(dependencies, + executor.CachedDependency{CacheKey: "cache-key-3", LogSource: "log-source-3", From: "https://example.com:8080/download-3", To: "/var/data/buildpack-3"}, + ) + + cache.FetchAsDirectoryStub = func(logger lager.Logger, urlToFetch *url.URL, cacheKey string, checksum cacheddownloader.ChecksumInfoType, cancelChan <-chan struct{}) (dirPath string, size int64, err error) { + switch cacheKey { + case "cache-key-1": + time.Sleep(1 * time.Second) + return "", 0, errors.New("failed-to-fetch") + case "cache-key-2": + select { + case <-cancelChan: + dependency2ReceivedCancel = true + return "", 0, errors.New("canceled") + } + case "cache-key-3": + select { + case <-cancelChan: + dependency3ReceivedCancel = true + return "", 0, errors.New("canceled") + } + } + return "", 0, errors.New("unknown-cache-key") + } + }) + + It("stops downloading the rest of the dependencies", func() { + _, err := dependencyManager.DownloadCachedDependencies(logger, dependencies, logConfig, fakeClient) + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError("failed-to-fetch")) + Expect(dependency2ReceivedCancel).To(BeTrue()) + Expect(dependency3ReceivedCancel).To(BeTrue()) + }) + }) + Context("when fetching all of the dependencies succeeds", func() { var bindMounts containerstore.BindMounts