Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions depot/containerstore/dependencymanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package containerstore
import (
"fmt"
"net/url"
"sync"
"time"

"code.cloudfoundry.org/bytefmt"
Expand Down Expand Up @@ -47,14 +48,20 @@ func (bm *dependencyManager) DownloadCachedDependencies(logger lager.Logger, mou
completed := 0
mountChan := make(chan *cachedBindMount, total)
errChan := make(chan error, total)
cancelChan := make(chan struct{}, 0)
bindMounts := NewBindMounts(total)

if total == 0 {
return bindMounts, nil
}

var wg sync.WaitGroup

for i := range mounts {
wg.Add(1)
go func(mount *executor.CachedDependency) {
defer wg.Done()

limiterStart := time.Now()
bm.downloadRateLimiter <- struct{}{}
limiterTime := time.Now().Sub(limiterStart)
Expand All @@ -64,7 +71,7 @@ func (bm *dependencyManager) DownloadCachedDependencies(logger lager.Logger, mou
<-bm.downloadRateLimiter
}()

cachedMount, err := bm.downloadCachedDependency(logger, mount, logConfig, metronClient)
cachedMount, err := bm.downloadCachedDependency(logger, mount, logConfig, metronClient, cancelChan)
if err != nil {
errChan <- err
} else {
Expand All @@ -76,6 +83,8 @@ func (bm *dependencyManager) DownloadCachedDependencies(logger lager.Logger, mou
for {
select {
case err := <-errChan:
close(cancelChan)
wg.Wait()
return bindMounts, err
case cachedMount := <-mountChan:
bindMounts.AddBindMount(cachedMount.CacheKey, cachedMount.BindMount)
Expand All @@ -87,7 +96,7 @@ func (bm *dependencyManager) DownloadCachedDependencies(logger lager.Logger, mou
}
}

func (bm *dependencyManager) downloadCachedDependency(logger lager.Logger, mount *executor.CachedDependency, logConfig executor.LogConfig, metronClient loggingclient.IngressClient) (*cachedBindMount, error) {
func (bm *dependencyManager) downloadCachedDependency(logger lager.Logger, mount *executor.CachedDependency, logConfig executor.LogConfig, metronClient loggingclient.IngressClient, cancelChan <-chan struct{}) (*cachedBindMount, error) {
sourceName, tags := logConfig.GetSourceNameAndTagsForLogging()
if mount.LogSource != "" {
sourceName = mount.LogSource
Expand All @@ -114,7 +123,7 @@ func (bm *dependencyManager) downloadCachedDependency(logger lager.Logger, mount
Algorithm: mount.ChecksumAlgorithm,
Value: mount.ChecksumValue,
},
nil,
cancelChan,
)
if err != nil {
logger.Error("failed-fetching-cache-dependency", err, lager.Data{"download-url": downloadURL.String(), "cache-key": mount.CacheKey})
Expand Down
40 changes: 40 additions & 0 deletions depot/containerstore/dependencymanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package containerstore_test
import (
"errors"
"net/url"
"time"

"code.cloudfoundry.org/cacheddownloader"
"code.cloudfoundry.org/cacheddownloader/cacheddownloaderfakes"
Expand Down Expand Up @@ -37,6 +38,45 @@ var _ = Describe("DependencyManager", func() {
}
})

Context("when fetching one of the dependencies fails", func() {
var dependency2ReceivedCancel, dependency3ReceivedCancel bool

BeforeEach(func() {
dependencies = append(dependencies,
executor.CachedDependency{CacheKey: "cache-key-3", LogSource: "log-source-3", From: "https://example.com:8080/download-3", To: "/var/data/buildpack-3"},
)

cache.FetchAsDirectoryStub = func(logger lager.Logger, urlToFetch *url.URL, cacheKey string, checksum cacheddownloader.ChecksumInfoType, cancelChan <-chan struct{}) (dirPath string, size int64, err error) {
switch cacheKey {
case "cache-key-1":
time.Sleep(1 * time.Second)
return "", 0, errors.New("failed-to-fetch")
case "cache-key-2":
select {
case <-cancelChan:
dependency2ReceivedCancel = true
return "", 0, errors.New("canceled")
}
case "cache-key-3":
select {
case <-cancelChan:
dependency3ReceivedCancel = true
return "", 0, errors.New("canceled")
}
}
return "", 0, errors.New("unknown-cache-key")
}
})

It("stops downloading the rest of the dependencies", func() {
_, err := dependencyManager.DownloadCachedDependencies(logger, dependencies, logConfig, fakeClient)
Expect(err).To(HaveOccurred())
Expect(err).To(MatchError("failed-to-fetch"))
Expect(dependency2ReceivedCancel).To(BeTrue())
Expect(dependency3ReceivedCancel).To(BeTrue())
})
})

Context("when fetching all of the dependencies succeeds", func() {
var bindMounts containerstore.BindMounts

Expand Down