Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion ghproxy/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package(default_visibility = ["//visibility:public"])

load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library", "go_test")
load("//prow:def.bzl", "prow_image", "prow_push")

NAME = "ghproxy"
Expand Down Expand Up @@ -67,3 +67,13 @@ filegroup(
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

go_test(
name = "go_default_test",
srcs = ["ghproxy_test.go"],
embed = [":go_default_library"],
deps = [
"//prow/github:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)
2 changes: 2 additions & 0 deletions ghproxy/ghcache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ go_library(
"@com_github_peterbourgon_diskv//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library",
"@io_k8s_apimachinery//pkg/util/errors:go_default_library",
"@org_golang_x_sync//semaphore:go_default_library",
],
)
Expand Down
112 changes: 102 additions & 10 deletions ghproxy/ghcache/ghcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,20 @@ package ghcache

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"

"github.com/gomodule/redigo/redis"
"github.com/gregjones/httpcache"
"github.com/gregjones/httpcache/diskcache"
Expand Down Expand Up @@ -66,6 +74,10 @@ const (
// which metrics should be recorded if set. If unset, the sha256sum of
// the Authorization header will be used.
TokenBudgetIdentifierHeader = "X-PROW-GHCACHE-TOKEN-BUDGET-IDENTIFIER"

// TokenExpiryAtHeader includes a date at which the passed token expires and all associated caches
// can be cleaned up. It's value must be in RFC3339 format.
TokenExpiryAtHeader = "X-PROW-TOKEN-EXPIRES-AT"
)

func CacheModeIsFree(mode CacheResponseMode) bool {
Expand Down Expand Up @@ -214,7 +226,7 @@ const LogMessageWithDiskPartitionFields = "Not using a partitioned cache because
// NewDiskCache creates a GitHub cache RoundTripper that is backed by a disk
// cache.
// It supports a partitioned cache.
func NewDiskCache(delegate http.RoundTripper, cacheDir string, cacheSizeGB, maxConcurrency int, legacyDisablePartitioningByAuthHeader bool) http.RoundTripper {
func NewDiskCache(delegate http.RoundTripper, cacheDir string, cacheSizeGB, maxConcurrency int, legacyDisablePartitioningByAuthHeader bool, cachePruneInterval time.Duration) http.RoundTripper {
if legacyDisablePartitioningByAuthHeader {
diskCache := diskcache.NewWithDiskv(
diskv.New(diskv.Options{
Expand All @@ -223,7 +235,7 @@ func NewDiskCache(delegate http.RoundTripper, cacheDir string, cacheSizeGB, maxC
CacheSizeMax: uint64(cacheSizeGB) * uint64(1000000000), // convert G to B
}))
return NewFromCache(delegate,
func(partitionKey string) httpcache.Cache {
func(partitionKey string, _ *time.Time) httpcache.Cache {
logrus.WithField("cache-base-path", path.Join(cacheDir, "data", partitionKey)).
WithField("cache-temp-path", path.Join(cacheDir, "temp", partitionKey)).
Warning(LogMessageWithDiskPartitionFields)
Expand All @@ -232,37 +244,117 @@ func NewDiskCache(delegate http.RoundTripper, cacheDir string, cacheSizeGB, maxC
maxConcurrency,
)
}

go func() {
for range time.NewTicker(cachePruneInterval).C {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iirc there's some gotcha with this construct that leaks tickers?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if ghproxy uses interrupts, prefer interrupts.Tick()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the Ticket never gets garbage collected, however it runs as long as the binary so that doesn't matter

prune(cacheDir)
}
}()
return NewFromCache(delegate,
func(partitionKey string) httpcache.Cache {
func(partitionKey string, expiresAt *time.Time) httpcache.Cache {
basePath := path.Join(cacheDir, "data", partitionKey)
tempDir := path.Join(cacheDir, "temp", partitionKey)
if err := writecachePartitionMetadata(basePath, tempDir, expiresAt); err != nil {
logrus.WithError(err).Warn("Failed to write cache metadata file, pruning will not work")
}
return diskcache.NewWithDiskv(
diskv.New(diskv.Options{
BasePath: path.Join(cacheDir, "data", partitionKey),
TempDir: path.Join(cacheDir, "temp", partitionKey),
BasePath: basePath,
TempDir: tempDir,
CacheSizeMax: uint64(cacheSizeGB) * uint64(1000000000), // convert G to B
}))
},
maxConcurrency,
)
}

func prune(baseDir string) {
// All of this would be easier if the structure was base/partition/{data,temp}
// but because of compatibility we can not change it.
for _, dir := range []string{"data", "temp"} {
base := path.Join(baseDir, dir)
cachePartitionCandidates, err := os.ReadDir(base)
if err != nil {
logrus.WithError(err).Warn("os.ReadDir failed")
// no continue, os.ReadDir returns partial results if it encounters an error
}
for _, cachePartitionCandidate := range cachePartitionCandidates {
if !cachePartitionCandidate.IsDir() {
continue
}
metadataPath := path.Join(base, cachePartitionCandidate.Name(), cachePartitionMetadataFileName)

// Read optimistically and just ignore errors
raw, err := ioutil.ReadFile(metadataPath)
if err != nil {
continue
}
var metadata cachePartitionMetadata
if err := json.Unmarshal(raw, &metadata); err != nil {
logrus.WithError(err).WithField("filepath", metadataPath).Error("failed to deserialize metadata file")
continue
}
if metadata.ExpiresAt.After(time.Now()) {
continue
}
paritionPath := filepath.Dir(metadataPath)
logrus.WithField("path", paritionPath).WithField("expiresAt", metadata.ExpiresAt.String()).Info("Cleaning up expired cache parition")
if err := os.RemoveAll(paritionPath); err != nil {
logrus.WithError(err).WithField("path", paritionPath).Error("failed to delete expired cache parition")
}
}
}
}

func writecachePartitionMetadata(basePath, tempDir string, expiresAt *time.Time) error {
// No expiry header for the token was passed, likely it is a PAT which never expires.
if expiresAt == nil {
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this lead to leaks? Why not failsafe to writing metadata that expires at time.Now().Add(time.Hour) or something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it won't and we can't do that. The whole reason the expiry information is passed on from the client is tokens validity varies and in the case of PAT, it never expires which will gets by an empty expiresAt. If we unconditionally added 1h here, we would always delete all caches after one hour, we cant do that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see - this is to handle PAT. Perhaps a comment would be good to clarify this since it's implicit behavior?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I would have expected "no expiry header" -> "no call to writing metadata" rather than "no expiry header" -> "pass an invalid date" -> "do nothing" but maybe that's just me being confused by it all

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I guess it would not hurt to have a default cache TTL for PAT entries, too, since they could hit the same issues that apps auth hits, on a smaller machine or with fewer inodes free? Setting the TTL to a week or something should not cause adverse effects.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment and made it a pointer to further clarify this might not be set. The TTL is for the entire cache, not individual entries so we can never evict a PAT cache

}
metadata := cachePartitionMetadata{ExpiresAt: metav1.Time{Time: *expiresAt}}
serialized, err := json.Marshal(metadata)
if err != nil {
return fmt.Errorf("failed to serialize: %w", err)
}

var errs []error
for _, destBase := range []string{basePath, tempDir} {
if err := os.MkdirAll(destBase, 0755); err != nil {
errs = append(errs, fmt.Errorf("failed to create dir %s: %w", destBase, err))
}
dest := path.Join(destBase, cachePartitionMetadataFileName)
if err := ioutil.WriteFile(dest, serialized, 0644); err != nil {
errs = append(errs, fmt.Errorf("failed to write %s: %w", dest, err))
}
}

return utilerrors.NewAggregate(errs)
}

const cachePartitionMetadataFileName = ".cache_metadata.json"

type cachePartitionMetadata struct {
ExpiresAt metav1.Time `json:"expires_at"`
}

// NewMemCache creates a GitHub cache RoundTripper that is backed by a memory
// cache.
// It supports a partitioned cache.
func NewMemCache(delegate http.RoundTripper, maxConcurrency int) http.RoundTripper {
return NewFromCache(delegate,
func(_ string) httpcache.Cache { return httpcache.NewMemoryCache() },
func(_ string, _ *time.Time) httpcache.Cache { return httpcache.NewMemoryCache() },
maxConcurrency)
}

// CachePartitionCreator creates a new cache partition using the given key
type CachePartitionCreator func(partitionKey string) httpcache.Cache
type CachePartitionCreator func(partitionKey string, expiresAt *time.Time) httpcache.Cache

// NewFromCache creates a GitHub cache RoundTripper that is backed by the
// specified httpcache.Cache implementation.
func NewFromCache(delegate http.RoundTripper, cache CachePartitionCreator, maxConcurrency int) http.RoundTripper {
hasher := ghmetrics.NewCachingHasher()
return newPartitioningRoundTripper(func(partitionKey string) http.RoundTripper {
cacheTransport := httpcache.NewTransport(cache(partitionKey))
return newPartitioningRoundTripper(func(partitionKey string, expiresAt *time.Time) http.RoundTripper {
cacheTransport := httpcache.NewTransport(cache(partitionKey, expiresAt))
cacheTransport.Transport = newThrottlingTransport(maxConcurrency, upstreamTransport{delegate: delegate, hasher: hasher})
return &requestCoalescer{
keys: make(map[string]*responseWaiter),
Expand All @@ -284,6 +376,6 @@ func NewRedisCache(delegate http.RoundTripper, redisAddress string, maxConcurren
}
redisCache := rediscache.NewWithClient(conn)
return NewFromCache(delegate,
func(_ string) httpcache.Cache { return redisCache },
func(_ string, _ *time.Time) httpcache.Cache { return redisCache },
maxConcurrency)
}
23 changes: 21 additions & 2 deletions ghproxy/ghcache/partitioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ import (
"fmt"
"net/http"
"sync"
"time"

"github.com/sirupsen/logrus"
)

type roundTripperCreator func(partitionKey string) http.RoundTripper
type roundTripperCreator func(partitionKey string, expiresAt *time.Time) http.RoundTripper

// partitioningRoundTripper is a http.RoundTripper
var _ http.RoundTripper = &partitioningRoundTripper{}
Expand All @@ -49,15 +50,33 @@ func getCachePartition(r *http.Request) string {
return fmt.Sprintf("%x", sha256.Sum256([]byte(r.Header.Get("Authorization"))))
}

func getExpiry(r *http.Request) *time.Time {
raw := r.Header.Get(TokenExpiryAtHeader)
if raw == "" {
return nil
}
parsed, err := time.Parse(time.RFC3339, raw)
if err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"path": r.URL.Path,
"raw_value": raw,
"user-agent": r.Header.Get("User-Agent"),
}).Errorf("failed to parse value of %s header as RFC3339 time", TokenExpiryAtHeader)
return nil
}
return &parsed
}

func (prt *partitioningRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
cachePartition := getCachePartition(r)
expiresAt := getExpiry(r)

prt.lock.Lock()
roundTripper, found := prt.roundTrippers[cachePartition]
if !found {
logrus.WithField("cache-parition-key", cachePartition).Info("Creating a new cache for partition")
cachePartitionsCounter.WithLabelValues(cachePartition).Add(1)
prt.roundTrippers[cachePartition] = prt.roundTripperCreator(cachePartition)
prt.roundTrippers[cachePartition] = prt.roundTripperCreator(cachePartition, expiresAt)
roundTripper = prt.roundTrippers[cachePartition]
}
prt.lock.Unlock()
Expand Down
3 changes: 2 additions & 1 deletion ghproxy/ghcache/partitioner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net/http"
"sync"
"testing"
"time"
)

type fakeRoundTripperCreator struct {
Expand All @@ -28,7 +29,7 @@ type fakeRoundTripperCreator struct {
roundTrippers map[string]*fakeRoundTripper
}

func (frtc *fakeRoundTripperCreator) createRoundTripper(partitionKey string) http.RoundTripper {
func (frtc *fakeRoundTripperCreator) createRoundTripper(partitionKey string, expiresAt *time.Time) http.RoundTripper {
frtc.lock.Lock()
defer frtc.lock.Unlock()
_, alreadyExists := frtc.roundTrippers[partitionKey]
Expand Down
26 changes: 15 additions & 11 deletions ghproxy/ghproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,16 +160,6 @@ func main() {
logrus.Warningf("The deprecated `--legacy-disable-disk-cache-partitions-by-auth-header` flags value is `true`. If you are a bigger Prow setup, you should copy your existing cache directory to the directory mentioned in the `%s` messages to warm up the partitioned-by-auth-header cache, then set the flag to false. If you are a smaller Prow setup or just started using ghproxy you can just unconditionally set it to `false`.", ghcache.LogMessageWithDiskPartitionFields)
}

var cache http.RoundTripper
if o.redisAddress != "" {
cache = ghcache.NewRedisCache(apptokenequalizer.New(http.DefaultTransport), o.redisAddress, o.maxConcurrency)
} else if o.dir == "" {
cache = ghcache.NewMemCache(apptokenequalizer.New(http.DefaultTransport), o.maxConcurrency)
} else {
cache = ghcache.NewDiskCache(apptokenequalizer.New(http.DefaultTransport), o.dir, o.sizeGB, o.maxConcurrency, o.diskCacheDisableAuthHeaderPartitioning)
go diskMonitor(o.pushGatewayInterval, o.dir)
}

pprof.Instrument(o.instrumentationOptions)
defer interrupts.WaitForGracefulShutdown()
metrics.ExposeMetrics("ghproxy", config.PushGateway{
Expand All @@ -180,7 +170,7 @@ func main() {
ServeMetrics: o.serveMetrics,
}, o.instrumentationOptions.MetricsPort)

proxy := newReverseProxy(o.upstreamParsed, cache, 30*time.Second)
proxy := proxy(o, http.DefaultTransport, time.Hour)
server := &http.Server{Addr: ":" + strconv.Itoa(o.port), Handler: proxy}

health := pjutil.NewHealthOnPort(o.instrumentationOptions.HealthPort)
Expand All @@ -189,6 +179,20 @@ func main() {
interrupts.ListenAndServe(server, 30*time.Second)
}

func proxy(o *options, upstreamTransport http.RoundTripper, diskCachePruneInterval time.Duration) http.Handler {
var cache http.RoundTripper
if o.redisAddress != "" {
cache = ghcache.NewRedisCache(apptokenequalizer.New(upstreamTransport), o.redisAddress, o.maxConcurrency)
} else if o.dir == "" {
cache = ghcache.NewMemCache(apptokenequalizer.New(upstreamTransport), o.maxConcurrency)
} else {
cache = ghcache.NewDiskCache(apptokenequalizer.New(upstreamTransport), o.dir, o.sizeGB, o.maxConcurrency, o.diskCacheDisableAuthHeaderPartitioning, diskCachePruneInterval)
go diskMonitor(o.pushGatewayInterval, o.dir)
}

return newReverseProxy(o.upstreamParsed, cache, 30*time.Second)
}

func newReverseProxy(upstreamURL *url.URL, transport http.RoundTripper, timeout time.Duration) http.Handler {
proxy := httputil.NewSingleHostReverseProxy(upstreamURL)
// Wrap the director to change the upstream request 'Host' header to the
Expand Down
Loading