diff --git a/ghproxy/BUILD.bazel b/ghproxy/BUILD.bazel index 2914c15dffc9..42d3d411deca 100644 --- a/ghproxy/BUILD.bazel +++ b/ghproxy/BUILD.bazel @@ -1,6 +1,6 @@ package(default_visibility = ["//visibility:public"]) -load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library", "go_test") load("//prow:def.bzl", "prow_image", "prow_push") NAME = "ghproxy" @@ -67,3 +67,13 @@ filegroup( tags = ["automanaged"], visibility = ["//visibility:public"], ) + +go_test( + name = "go_default_test", + srcs = ["ghproxy_test.go"], + embed = [":go_default_library"], + deps = [ + "//prow/github:go_default_library", + "@com_github_sirupsen_logrus//:go_default_library", + ], +) diff --git a/ghproxy/ghcache/BUILD.bazel b/ghproxy/ghcache/BUILD.bazel index b2b019cf7d48..3002ae5a614e 100644 --- a/ghproxy/ghcache/BUILD.bazel +++ b/ghproxy/ghcache/BUILD.bazel @@ -18,6 +18,8 @@ go_library( "@com_github_peterbourgon_diskv//:go_default_library", "@com_github_prometheus_client_golang//prometheus:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", + "@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library", + "@io_k8s_apimachinery//pkg/util/errors:go_default_library", "@org_golang_x_sync//semaphore:go_default_library", ], ) diff --git a/ghproxy/ghcache/ghcache.go b/ghproxy/ghcache/ghcache.go index e1209c7a2d3a..ed322c78429e 100644 --- a/ghproxy/ghcache/ghcache.go +++ b/ghproxy/ghcache/ghcache.go @@ -27,12 +27,20 @@ package ghcache import ( "context" + "encoding/json" + "fmt" + "io/ioutil" "net/http" + "os" "path" + "path/filepath" "strconv" "strings" "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "github.com/gomodule/redigo/redis" "github.com/gregjones/httpcache" "github.com/gregjones/httpcache/diskcache" @@ -66,6 +74,10 @@ const ( // which metrics should be recorded if set. If unset, the sha256sum of // the Authorization header will be used. TokenBudgetIdentifierHeader = "X-PROW-GHCACHE-TOKEN-BUDGET-IDENTIFIER" + + // TokenExpiryAtHeader includes a date at which the passed token expires and all associated caches + // can be cleaned up. It's value must be in RFC3339 format. + TokenExpiryAtHeader = "X-PROW-TOKEN-EXPIRES-AT" ) func CacheModeIsFree(mode CacheResponseMode) bool { @@ -214,7 +226,7 @@ const LogMessageWithDiskPartitionFields = "Not using a partitioned cache because // NewDiskCache creates a GitHub cache RoundTripper that is backed by a disk // cache. // It supports a partitioned cache. -func NewDiskCache(delegate http.RoundTripper, cacheDir string, cacheSizeGB, maxConcurrency int, legacyDisablePartitioningByAuthHeader bool) http.RoundTripper { +func NewDiskCache(delegate http.RoundTripper, cacheDir string, cacheSizeGB, maxConcurrency int, legacyDisablePartitioningByAuthHeader bool, cachePruneInterval time.Duration) http.RoundTripper { if legacyDisablePartitioningByAuthHeader { diskCache := diskcache.NewWithDiskv( diskv.New(diskv.Options{ @@ -223,7 +235,7 @@ func NewDiskCache(delegate http.RoundTripper, cacheDir string, cacheSizeGB, maxC CacheSizeMax: uint64(cacheSizeGB) * uint64(1000000000), // convert G to B })) return NewFromCache(delegate, - func(partitionKey string) httpcache.Cache { + func(partitionKey string, _ *time.Time) httpcache.Cache { logrus.WithField("cache-base-path", path.Join(cacheDir, "data", partitionKey)). WithField("cache-temp-path", path.Join(cacheDir, "temp", partitionKey)). Warning(LogMessageWithDiskPartitionFields) @@ -232,12 +244,23 @@ func NewDiskCache(delegate http.RoundTripper, cacheDir string, cacheSizeGB, maxC maxConcurrency, ) } + + go func() { + for range time.NewTicker(cachePruneInterval).C { + prune(cacheDir) + } + }() return NewFromCache(delegate, - func(partitionKey string) httpcache.Cache { + func(partitionKey string, expiresAt *time.Time) httpcache.Cache { + basePath := path.Join(cacheDir, "data", partitionKey) + tempDir := path.Join(cacheDir, "temp", partitionKey) + if err := writecachePartitionMetadata(basePath, tempDir, expiresAt); err != nil { + logrus.WithError(err).Warn("Failed to write cache metadata file, pruning will not work") + } return diskcache.NewWithDiskv( diskv.New(diskv.Options{ - BasePath: path.Join(cacheDir, "data", partitionKey), - TempDir: path.Join(cacheDir, "temp", partitionKey), + BasePath: basePath, + TempDir: tempDir, CacheSizeMax: uint64(cacheSizeGB) * uint64(1000000000), // convert G to B })) }, @@ -245,24 +268,93 @@ func NewDiskCache(delegate http.RoundTripper, cacheDir string, cacheSizeGB, maxC ) } +func prune(baseDir string) { + // All of this would be easier if the structure was base/partition/{data,temp} + // but because of compatibility we can not change it. + for _, dir := range []string{"data", "temp"} { + base := path.Join(baseDir, dir) + cachePartitionCandidates, err := os.ReadDir(base) + if err != nil { + logrus.WithError(err).Warn("os.ReadDir failed") + // no continue, os.ReadDir returns partial results if it encounters an error + } + for _, cachePartitionCandidate := range cachePartitionCandidates { + if !cachePartitionCandidate.IsDir() { + continue + } + metadataPath := path.Join(base, cachePartitionCandidate.Name(), cachePartitionMetadataFileName) + + // Read optimistically and just ignore errors + raw, err := ioutil.ReadFile(metadataPath) + if err != nil { + continue + } + var metadata cachePartitionMetadata + if err := json.Unmarshal(raw, &metadata); err != nil { + logrus.WithError(err).WithField("filepath", metadataPath).Error("failed to deserialize metadata file") + continue + } + if metadata.ExpiresAt.After(time.Now()) { + continue + } + paritionPath := filepath.Dir(metadataPath) + logrus.WithField("path", paritionPath).WithField("expiresAt", metadata.ExpiresAt.String()).Info("Cleaning up expired cache parition") + if err := os.RemoveAll(paritionPath); err != nil { + logrus.WithError(err).WithField("path", paritionPath).Error("failed to delete expired cache parition") + } + } + } +} + +func writecachePartitionMetadata(basePath, tempDir string, expiresAt *time.Time) error { + // No expiry header for the token was passed, likely it is a PAT which never expires. + if expiresAt == nil { + return nil + } + metadata := cachePartitionMetadata{ExpiresAt: metav1.Time{Time: *expiresAt}} + serialized, err := json.Marshal(metadata) + if err != nil { + return fmt.Errorf("failed to serialize: %w", err) + } + + var errs []error + for _, destBase := range []string{basePath, tempDir} { + if err := os.MkdirAll(destBase, 0755); err != nil { + errs = append(errs, fmt.Errorf("failed to create dir %s: %w", destBase, err)) + } + dest := path.Join(destBase, cachePartitionMetadataFileName) + if err := ioutil.WriteFile(dest, serialized, 0644); err != nil { + errs = append(errs, fmt.Errorf("failed to write %s: %w", dest, err)) + } + } + + return utilerrors.NewAggregate(errs) +} + +const cachePartitionMetadataFileName = ".cache_metadata.json" + +type cachePartitionMetadata struct { + ExpiresAt metav1.Time `json:"expires_at"` +} + // NewMemCache creates a GitHub cache RoundTripper that is backed by a memory // cache. // It supports a partitioned cache. func NewMemCache(delegate http.RoundTripper, maxConcurrency int) http.RoundTripper { return NewFromCache(delegate, - func(_ string) httpcache.Cache { return httpcache.NewMemoryCache() }, + func(_ string, _ *time.Time) httpcache.Cache { return httpcache.NewMemoryCache() }, maxConcurrency) } // CachePartitionCreator creates a new cache partition using the given key -type CachePartitionCreator func(partitionKey string) httpcache.Cache +type CachePartitionCreator func(partitionKey string, expiresAt *time.Time) httpcache.Cache // NewFromCache creates a GitHub cache RoundTripper that is backed by the // specified httpcache.Cache implementation. func NewFromCache(delegate http.RoundTripper, cache CachePartitionCreator, maxConcurrency int) http.RoundTripper { hasher := ghmetrics.NewCachingHasher() - return newPartitioningRoundTripper(func(partitionKey string) http.RoundTripper { - cacheTransport := httpcache.NewTransport(cache(partitionKey)) + return newPartitioningRoundTripper(func(partitionKey string, expiresAt *time.Time) http.RoundTripper { + cacheTransport := httpcache.NewTransport(cache(partitionKey, expiresAt)) cacheTransport.Transport = newThrottlingTransport(maxConcurrency, upstreamTransport{delegate: delegate, hasher: hasher}) return &requestCoalescer{ keys: make(map[string]*responseWaiter), @@ -284,6 +376,6 @@ func NewRedisCache(delegate http.RoundTripper, redisAddress string, maxConcurren } redisCache := rediscache.NewWithClient(conn) return NewFromCache(delegate, - func(_ string) httpcache.Cache { return redisCache }, + func(_ string, _ *time.Time) httpcache.Cache { return redisCache }, maxConcurrency) } diff --git a/ghproxy/ghcache/partitioner.go b/ghproxy/ghcache/partitioner.go index 16cf1d22cc05..2831893d1b38 100644 --- a/ghproxy/ghcache/partitioner.go +++ b/ghproxy/ghcache/partitioner.go @@ -21,11 +21,12 @@ import ( "fmt" "net/http" "sync" + "time" "github.com/sirupsen/logrus" ) -type roundTripperCreator func(partitionKey string) http.RoundTripper +type roundTripperCreator func(partitionKey string, expiresAt *time.Time) http.RoundTripper // partitioningRoundTripper is a http.RoundTripper var _ http.RoundTripper = &partitioningRoundTripper{} @@ -49,15 +50,33 @@ func getCachePartition(r *http.Request) string { return fmt.Sprintf("%x", sha256.Sum256([]byte(r.Header.Get("Authorization")))) } +func getExpiry(r *http.Request) *time.Time { + raw := r.Header.Get(TokenExpiryAtHeader) + if raw == "" { + return nil + } + parsed, err := time.Parse(time.RFC3339, raw) + if err != nil { + logrus.WithError(err).WithFields(logrus.Fields{ + "path": r.URL.Path, + "raw_value": raw, + "user-agent": r.Header.Get("User-Agent"), + }).Errorf("failed to parse value of %s header as RFC3339 time", TokenExpiryAtHeader) + return nil + } + return &parsed +} + func (prt *partitioningRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { cachePartition := getCachePartition(r) + expiresAt := getExpiry(r) prt.lock.Lock() roundTripper, found := prt.roundTrippers[cachePartition] if !found { logrus.WithField("cache-parition-key", cachePartition).Info("Creating a new cache for partition") cachePartitionsCounter.WithLabelValues(cachePartition).Add(1) - prt.roundTrippers[cachePartition] = prt.roundTripperCreator(cachePartition) + prt.roundTrippers[cachePartition] = prt.roundTripperCreator(cachePartition, expiresAt) roundTripper = prt.roundTrippers[cachePartition] } prt.lock.Unlock() diff --git a/ghproxy/ghcache/partitioner_test.go b/ghproxy/ghcache/partitioner_test.go index b4d8c373698d..6e7b1727fbcf 100644 --- a/ghproxy/ghcache/partitioner_test.go +++ b/ghproxy/ghcache/partitioner_test.go @@ -20,6 +20,7 @@ import ( "net/http" "sync" "testing" + "time" ) type fakeRoundTripperCreator struct { @@ -28,7 +29,7 @@ type fakeRoundTripperCreator struct { roundTrippers map[string]*fakeRoundTripper } -func (frtc *fakeRoundTripperCreator) createRoundTripper(partitionKey string) http.RoundTripper { +func (frtc *fakeRoundTripperCreator) createRoundTripper(partitionKey string, expiresAt *time.Time) http.RoundTripper { frtc.lock.Lock() defer frtc.lock.Unlock() _, alreadyExists := frtc.roundTrippers[partitionKey] diff --git a/ghproxy/ghproxy.go b/ghproxy/ghproxy.go index 3e11d6e88602..9c9d5078c6df 100644 --- a/ghproxy/ghproxy.go +++ b/ghproxy/ghproxy.go @@ -160,16 +160,6 @@ func main() { logrus.Warningf("The deprecated `--legacy-disable-disk-cache-partitions-by-auth-header` flags value is `true`. If you are a bigger Prow setup, you should copy your existing cache directory to the directory mentioned in the `%s` messages to warm up the partitioned-by-auth-header cache, then set the flag to false. If you are a smaller Prow setup or just started using ghproxy you can just unconditionally set it to `false`.", ghcache.LogMessageWithDiskPartitionFields) } - var cache http.RoundTripper - if o.redisAddress != "" { - cache = ghcache.NewRedisCache(apptokenequalizer.New(http.DefaultTransport), o.redisAddress, o.maxConcurrency) - } else if o.dir == "" { - cache = ghcache.NewMemCache(apptokenequalizer.New(http.DefaultTransport), o.maxConcurrency) - } else { - cache = ghcache.NewDiskCache(apptokenequalizer.New(http.DefaultTransport), o.dir, o.sizeGB, o.maxConcurrency, o.diskCacheDisableAuthHeaderPartitioning) - go diskMonitor(o.pushGatewayInterval, o.dir) - } - pprof.Instrument(o.instrumentationOptions) defer interrupts.WaitForGracefulShutdown() metrics.ExposeMetrics("ghproxy", config.PushGateway{ @@ -180,7 +170,7 @@ func main() { ServeMetrics: o.serveMetrics, }, o.instrumentationOptions.MetricsPort) - proxy := newReverseProxy(o.upstreamParsed, cache, 30*time.Second) + proxy := proxy(o, http.DefaultTransport, time.Hour) server := &http.Server{Addr: ":" + strconv.Itoa(o.port), Handler: proxy} health := pjutil.NewHealthOnPort(o.instrumentationOptions.HealthPort) @@ -189,6 +179,20 @@ func main() { interrupts.ListenAndServe(server, 30*time.Second) } +func proxy(o *options, upstreamTransport http.RoundTripper, diskCachePruneInterval time.Duration) http.Handler { + var cache http.RoundTripper + if o.redisAddress != "" { + cache = ghcache.NewRedisCache(apptokenequalizer.New(upstreamTransport), o.redisAddress, o.maxConcurrency) + } else if o.dir == "" { + cache = ghcache.NewMemCache(apptokenequalizer.New(upstreamTransport), o.maxConcurrency) + } else { + cache = ghcache.NewDiskCache(apptokenequalizer.New(upstreamTransport), o.dir, o.sizeGB, o.maxConcurrency, o.diskCacheDisableAuthHeaderPartitioning, diskCachePruneInterval) + go diskMonitor(o.pushGatewayInterval, o.dir) + } + + return newReverseProxy(o.upstreamParsed, cache, 30*time.Second) +} + func newReverseProxy(upstreamURL *url.URL, transport http.RoundTripper, timeout time.Duration) http.Handler { proxy := httputil.NewSingleHostReverseProxy(upstreamURL) // Wrap the director to change the upstream request 'Host' header to the diff --git a/ghproxy/ghproxy_test.go b/ghproxy/ghproxy_test.go new file mode 100644 index 000000000000..cad31289a67d --- /dev/null +++ b/ghproxy/ghproxy_test.go @@ -0,0 +1,133 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "bytes" + "crypto/rand" + "crypto/rsa" + "encoding/json" + "fmt" + "io" + "net/http" + "net/http/httptest" + "net/url" + "os" + "path" + "testing" + "time" + + "github.com/sirupsen/logrus" + "k8s.io/test-infra/prow/github" +) + +func TestDiskCachePruning(t *testing.T) { + cacheDir := t.TempDir() + o := &options{ + dir: cacheDir, + maxConcurrency: 25, + pushGatewayInterval: time.Minute, + upstreamParsed: &url.URL{}, + } + + expiryDuration := 5 * time.Second + roundTripper := func(r *http.Request) (*http.Response, error) { + switch r.URL.Path { + case "/app": + return jsonResponse(github.App{Slug: "app-slug"}, 200) + case "/app/installations": + return jsonResponse([]github.AppInstallation{{Account: github.User{Login: "org"}}}, 200) + case "/app/installations/0/access_tokens": + return jsonResponse(github.AppInstallationToken{Token: "abc", ExpiresAt: time.Now().Add(expiryDuration)}, 201) + case "/repos/org/repo/git/refs/dev": + return jsonResponse(github.GetRefResult{}, 200) + default: + return nil, fmt.Errorf("got unexpected request for %s", r.URL.Path) + } + } + + rsaKey, err := rsa.GenerateKey(rand.Reader, 512) + if err != nil { + t.Fatalf("Failed to generate RSA key: %v", err) + } + + server := httptest.NewServer(proxy(o, httpRoundTripper(roundTripper), time.Second)) + t.Cleanup(server.Close) + _, _, client := github.NewClientFromOptions(logrus.Fields{}, github.ClientOptions{ + MaxRetries: 1, + Censor: func(b []byte) []byte { return b }, + AppID: "123", + AppPrivateKey: func() *rsa.PrivateKey { return rsaKey }, + Bases: []string{server.URL}, + GraphqlEndpoint: server.URL, + }) + + if _, err := client.GetRef("org", "repo", "dev"); err != nil { + t.Fatalf("GetRef failed: %v", err) + } + + numberPartitions, err := getNumberOfCachePartitions(cacheDir) + if err != nil { + t.Fatalf("failed to get number of cache paritions: %v", err) + } + if numberPartitions != 2 { + t.Errorf("expected two cache paritions, one for the app and one for the app installation, got %d", numberPartitions) + } + + time.Sleep(2 * expiryDuration) + + numberPartitions, err = getNumberOfCachePartitions(cacheDir) + if err != nil { + t.Fatalf("failed to get number of cache paritions: %v", err) + } + if numberPartitions != 1 { + t.Errorf("expected one cache partition for the app as the one for the installation should be cleaned up, got %d", numberPartitions) + } +} + +func getNumberOfCachePartitions(cacheDir string) (int, error) { + var result int + for _, suffix := range []string{"temp", "data"} { + entries, err := os.ReadDir(path.Join(cacheDir, suffix)) + if err != nil { + return result, fmt.Errorf("faield to list: %w", err) + } + if result == 0 { + result = len(entries) + continue + } + if n := len(entries); n != result { + return result, fmt.Errorf("temp and datadir don't have the same number of partitions: %d vs %d", result, n) + } + } + + return result, nil +} + +type httpRoundTripper func(*http.Request) (*http.Response, error) + +func (rt httpRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { + return rt(r) +} + +func jsonResponse(body interface{}, statusCode int) (*http.Response, error) { + serialized, err := json.Marshal(body) + if err != nil { + return nil, err + } + return &http.Response{StatusCode: statusCode, Body: io.NopCloser(bytes.NewBuffer(serialized)), Header: http.Header{}}, nil +} diff --git a/prow/github/app_auth_roundtripper.go b/prow/github/app_auth_roundtripper.go index 2804697124a8..a7a405ac2499 100644 --- a/prow/github/app_auth_roundtripper.go +++ b/prow/github/app_auth_roundtripper.go @@ -78,9 +78,10 @@ func (arr *appsRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) } func (arr *appsRoundTripper) addAppAuth(r *http.Request) *appsAuthError { + expiresAt := time.Now().UTC().Add(10 * time.Minute) token, err := jwt.NewWithClaims(jwt.SigningMethodRS256, &jwt.StandardClaims{ IssuedAt: jwt.NewTime(float64(time.Now().Unix())), - ExpiresAt: jwt.NewTime(float64(time.Now().UTC().Add(10 * time.Minute).Unix())), + ExpiresAt: jwt.NewTime(float64(expiresAt.Unix())), Issuer: arr.appID, }).SignedString(arr.privateKey()) if err != nil { @@ -88,6 +89,7 @@ func (arr *appsRoundTripper) addAppAuth(r *http.Request) *appsAuthError { } r.Header.Set("Authorization", "Bearer "+token) + r.Header.Set(ghcache.TokenExpiryAtHeader, expiresAt.Format(time.RFC3339)) // We call the /app endpoint to resolve the slug, so we can't set it there if r.URL.Path == "/app" { @@ -113,12 +115,13 @@ func extractOrgFromContext(ctx context.Context) string { func (arr *appsRoundTripper) addAppInstallationAuth(r *http.Request) *appsAuthError { org := extractOrgFromContext(r.Context()) - token, err := arr.installationTokenFor(org) + token, expiresAt, err := arr.installationTokenFor(org) if err != nil { return &appsAuthError{err} } r.Header.Set("Authorization", "Bearer "+token) + r.Header.Set(ghcache.TokenExpiryAtHeader, expiresAt.Format(time.RFC3339)) slug, err := arr.getSlug() if err != nil { return &appsAuthError{err} @@ -131,18 +134,18 @@ func (arr *appsRoundTripper) addAppInstallationAuth(r *http.Request) *appsAuthEr return nil } -func (arr *appsRoundTripper) installationTokenFor(org string) (string, error) { +func (arr *appsRoundTripper) installationTokenFor(org string) (string, time.Time, error) { installationID, err := arr.installationIDFor(org) if err != nil { - return "", fmt.Errorf("failed to get installation id for org %s: %w", org, err) + return "", time.Time{}, fmt.Errorf("failed to get installation id for org %s: %w", org, err) } - token, err := arr.getTokenForInstallation(installationID) + token, expiresAt, err := arr.getTokenForInstallation(installationID) if err != nil { - return "", fmt.Errorf("failed to get an installation token for org %s: %w", org, err) + return "", time.Time{}, fmt.Errorf("failed to get an installation token for org %s: %w", org, err) } - return token, nil + return token, expiresAt, nil } // installationIDFor returns the installation id for the given org. Unfortunately, @@ -190,13 +193,13 @@ func (arr *appsRoundTripper) installationIDFor(org string) (int64, error) { return id.ID, nil } -func (arr *appsRoundTripper) getTokenForInstallation(installation int64) (string, error) { +func (arr *appsRoundTripper) getTokenForInstallation(installation int64) (string, time.Time, error) { arr.tokenLock.RLock() token, found := arr.tokens[installation] arr.tokenLock.RUnlock() if found && token.ExpiresAt.Add(-time.Minute).After(time.Now()) { - return token.Token, nil + return token.Token, token.ExpiresAt, nil } arr.tokenLock.Lock() @@ -205,12 +208,12 @@ func (arr *appsRoundTripper) getTokenForInstallation(installation int64) (string // Check again in case a concurrent routine got a token while we waited for the lock token, found = arr.tokens[installation] if found && token.ExpiresAt.Add(-time.Minute).After(time.Now()) { - return token.Token, nil + return token.Token, token.ExpiresAt, nil } token, err := arr.githubClient.getAppInstallationToken(installation) if err != nil { - return "", fmt.Errorf("failed to get installation token from GitHub: %w", err) + return "", time.Time{}, fmt.Errorf("failed to get installation token from GitHub: %w", err) } if arr.tokens == nil { @@ -218,7 +221,7 @@ func (arr *appsRoundTripper) getTokenForInstallation(installation int64) (string } arr.tokens[installation] = token - return token.Token, nil + return token.Token, token.ExpiresAt, nil } func (arr *appsRoundTripper) getSlug() (string, error) { diff --git a/prow/github/client.go b/prow/github/client.go index db3479ad18a4..5125fba81fae 100644 --- a/prow/github/client.go +++ b/prow/github/client.go @@ -682,6 +682,8 @@ func NewAppsAuthClientWithFields(fields logrus.Fields, censor func([]byte) []byt // NewClientFromOptions creates a new client from the options we expose. This method should be used over the more-specific ones. func NewClientFromOptions(fields logrus.Fields, options ClientOptions) (TokenGenerator, UserGenerator, Client) { + options = options.Default() + // Will be nil if github app authentication is used if options.GetToken == nil { options.GetToken = func() []byte { return nil } @@ -737,7 +739,10 @@ func NewClientFromOptions(fields logrus.Fields, options ClientOptions) (TokenGen // Use github apps auth for git actions // https://docs.github.com/en/free-pro-team@latest/developers/apps/authenticating-with-github-apps#http-based-git-access-by-an-installation= - tokenGenerator = appsTransport.installationTokenFor + tokenGenerator = func(org string) (string, error) { + res, _, err := appsTransport.installationTokenFor(org) + return res, err + } userGenerator = func() (string, error) { return "x-access-token", nil } @@ -1110,7 +1115,6 @@ func (c *client) requestRetryWithContext(ctx context.Context, method, path, acce } else if errors.Is(err, &appsAuthError{}) { c.logger.WithError(err).Error("Stopping retry due to appsAuthError") return resp, err - } else if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { return resp, err } else {