diff --git a/docs/sources/reference/components/faro/faro.receiver.md b/docs/sources/reference/components/faro/faro.receiver.md index 0a5462cf1f9..20e94799314 100644 --- a/docs/sources/reference/components/faro/faro.receiver.md +++ b/docs/sources/reference/components/faro/faro.receiver.md @@ -61,11 +61,13 @@ You can use the following blocks with `faro.receiver`: | [`server`][server] | Configures the HTTP server. | no | | `server` > [`rate_limiting`][rate_limiting] | Configures rate limiting for the HTTP server. | no | | [`sourcemaps`][sourcemaps] | Configures sourcemap retrieval. | no | +| `sourcemaps` > [`cache`][cache] | Configures sourcemap caching behavior. | no | | `sourcemaps` > [`location`][location] | Configures on-disk location for sourcemap retrieval. | no | The > symbol indicates deeper levels of nesting. For example, `sourcemaps` > `location` refers to a `location` block defined inside a `sourcemaps` block. +[cache]: #cache [location]: #location [output]: #output [rate_limiting]: #rate_limiting @@ -149,7 +151,7 @@ The `sourcemaps` block configures how to retrieve sourcemaps. Sourcemaps are then used to transform file and line information from minified code into the file and line information from the original source code. | Name | Type | Description | Default | Required | -|-------------------------|----------------|--------------------------------------------|---------|----------| +| ----------------------- | -------------- | ------------------------------------------ | ------- | -------- | | `download` | `bool` | Whether to download sourcemaps. | `true` | no | | `download_from_origins` | `list(string)` | Which origins to download sourcemaps from. | `["*"]` | no | | `download_timeout` | `duration` | Timeout when downloading sourcemaps. | `"1s"` | no | @@ -168,6 +170,25 @@ Setting `download_timeout` to `"0s"` disables timeouts. To retrieve sourcemaps from disk instead of the network, specify one or more [`location` blocks][location]. When `location` blocks are provided, they're checked first for sourcemaps before falling back to downloading. +#### `cache` + +The `cache` block configures sourcemap caching behavior. + +| Name | Type | Description | Default | Required | +| ------------------------ | ---------- | ----------------------------------------------------------------------------------------- | ------- | -------- | +| `cleanup_check_interval` | `duration` | How often {{< param "PRODUCT_NAME" >}} checks cached sourcemaps for cleanup. | `"30s"` | no | +| `error_cleanup_interval` | `duration` | How long {{< param "PRODUCT_NAME" >}} waits before retrying a failed source map download. | `"1h"` | no | +| `ttl` | `duration` | How long {{< param "PRODUCT_NAME" >}} keeps an unused source map in the cache. | `inf` | no | + +By default, {{< param "PRODUCT_NAME" >}} keeps sourcemaps in memory indefinitely. +Set `ttl` to remove sourcemaps that are not accessed within the specified duration. + +{{< param "PRODUCT_NAME" >}} caches errors that occur while downloading or parsing a sourcemap. +Use `error_cleanup_interval` to control how long these errors remain cached. + +Cached sourcemaps are checked for cleanup every 30 seconds by default. +Set `cleanup_check_interval` to adjust this frequency. + #### `location` The `location` block declares a location where sourcemaps are stored on the filesystem. @@ -223,7 +244,7 @@ The template value is replaced with the release value provided by the [Faro Web * `faro_receiver_request_message_bytes` (histogram): Size (in bytes) of HTTP requests received from clients. * `faro_receiver_response_message_bytes` (histogram): Size (in bytes) of HTTP responses sent to clients. * `faro_receiver_inflight_requests` (gauge): Current number of inflight requests. -* `faro_receiver_sourcemap_cache_size` (counter): Number of items in sourcemap cache per origin. +* `faro_receiver_sourcemap_cache_size` (gauge): Number of items in sourcemap cache per origin. * `faro_receiver_sourcemap_downloads_total` (counter): Total number of sourcemap downloads performed per origin and status. * `faro_receiver_sourcemap_file_reads_total` (counter): Total number of sourcemap retrievals using the filesystem per origin and status. * `faro_receiver_rate_limiter_active_app` (gauge): Number of active applications with rate limiters. Inactive limiters are cleaned up every 10 minutes. diff --git a/internal/component/faro/receiver/arguments.go b/internal/component/faro/receiver/arguments.go index e2879411170..e8a9c0ef7bb 100644 --- a/internal/component/faro/receiver/arguments.go +++ b/internal/component/faro/receiver/arguments.go @@ -3,6 +3,7 @@ package receiver import ( "encoding" "fmt" + "math" "time" "github.com/alecthomas/units" @@ -76,6 +77,7 @@ type SourceMapsArguments struct { Download bool `alloy:"download,attr,optional"` DownloadFromOrigins []string `alloy:"download_from_origins,attr,optional"` DownloadTimeout time.Duration `alloy:"download_timeout,attr,optional"` + Cache *CacheArguments `alloy:"cache,block,optional"` Locations []LocationArguments `alloy:"location,block,optional"` } @@ -84,6 +86,23 @@ func (s *SourceMapsArguments) SetToDefault() { Download: true, DownloadFromOrigins: []string{"*"}, DownloadTimeout: time.Second, + Cache: &CacheArguments{}, + } + s.Cache.SetToDefault() +} + +// CacheArguments configures sourcemap caching behavior. +type CacheArguments struct { + TTL time.Duration `alloy:"ttl,attr,optional"` + ErrorCleanupInterval time.Duration `alloy:"error_cleanup_interval,attr,optional"` + CleanupCheckInterval time.Duration `alloy:"cleanup_check_interval,attr,optional"` +} + +func (c *CacheArguments) SetToDefault() { + *c = CacheArguments{ + TTL: time.Duration(math.MaxInt64), + ErrorCleanupInterval: time.Hour, + CleanupCheckInterval: time.Second * 30, } } diff --git a/internal/component/faro/receiver/receiver.go b/internal/component/faro/receiver/receiver.go index 5dd6dce6d69..441d7dc4fad 100644 --- a/internal/component/faro/receiver/receiver.go +++ b/internal/component/faro/receiver/receiver.go @@ -131,13 +131,20 @@ func (c *Component) Update(args component.Arguments) error { c.handler.Update(newArgs.Server) - c.lazySourceMaps.SetInner(newSourceMapsStore( + // Stop old store's cleanup if there is one + c.lazySourceMaps.Stop() + + innerStore := newSourceMapsStore( log.With(c.log, "subcomponent", "handler"), newArgs.SourceMaps, c.sourceMapsMetrics, nil, // Use default HTTP client. nil, // Use default FS implementation. - )) + ) + c.lazySourceMaps.SetInner(innerStore) + + // Start cleanup for new store + c.lazySourceMaps.Start() c.logs.SetReceivers(newArgs.Output.Logs) c.traces.SetConsumers(newArgs.Output.Traces) @@ -243,3 +250,21 @@ func (vs *varSourceMapsStore) SetInner(inner sourceMapsStore) { vs.inner = inner } + +func (vs *varSourceMapsStore) Start() { + vs.mut.RLock() + defer vs.mut.RUnlock() + + if vs.inner != nil { + vs.inner.Start() + } +} + +func (vs *varSourceMapsStore) Stop() { + vs.mut.RLock() + defer vs.mut.RUnlock() + + if vs.inner != nil { + vs.inner.Stop() + } +} diff --git a/internal/component/faro/receiver/sourcemaps.go b/internal/component/faro/receiver/sourcemaps.go index 0041e4520f0..f6ff7214b7d 100644 --- a/internal/component/faro/receiver/sourcemaps.go +++ b/internal/component/faro/receiver/sourcemaps.go @@ -2,6 +2,7 @@ package receiver import ( "bytes" + "context" "fmt" "io" "io/fs" @@ -13,6 +14,7 @@ import ( "strings" "sync" "text/template" + "time" "github.com/go-kit/log" "github.com/go-sourcemap/sourcemap" @@ -28,6 +30,8 @@ import ( // transforming minified source locations to the original source location. type sourceMapsStore interface { GetSourceMap(sourceURL string, release string) (*sourcemap.Consumer, error) + Start() + Stop() } // Stub interfaces for easier mocking. @@ -67,14 +71,14 @@ func (fs osFileService) ReadFile(name string) ([]byte, error) { } type sourceMapMetrics struct { - cacheSize *prometheus.CounterVec + cacheSize *prometheus.GaugeVec downloads *prometheus.CounterVec fileReads *prometheus.CounterVec } func newSourceMapMetrics(reg prometheus.Registerer) *sourceMapMetrics { m := &sourceMapMetrics{ - cacheSize: prometheus.NewCounterVec(prometheus.CounterOpts{ + cacheSize: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "faro_receiver_sourcemap_cache_size", Help: "number of items in source map cache, per origin", }, []string{"origin"}), @@ -88,7 +92,7 @@ func newSourceMapMetrics(reg prometheus.Registerer) *sourceMapMetrics { }, []string{"origin", "status"}), } - m.cacheSize = util.MustRegisterOrGet(reg, m.cacheSize).(*prometheus.CounterVec) + m.cacheSize = util.MustRegisterOrGet(reg, m.cacheSize).(*prometheus.GaugeVec) m.downloads = util.MustRegisterOrGet(reg, m.downloads).(*prometheus.CounterVec) m.fileReads = util.MustRegisterOrGet(reg, m.fileReads).(*prometheus.CounterVec) return m @@ -99,6 +103,16 @@ type sourcemapFileLocation struct { pathTemplate *template.Template } +type timeSource interface { + Now() time.Time +} + +type realTimeSource struct{} + +func (realTimeSource) Now() time.Time { + return time.Now() +} + type sourceMapsStoreImpl struct { log log.Logger cli httpClient @@ -107,8 +121,18 @@ type sourceMapsStoreImpl struct { metrics *sourceMapMetrics locs []*sourcemapFileLocation - cacheMut sync.Mutex - cache map[string]*sourcemap.Consumer + cacheMut sync.Mutex + cache map[string]*cachedSourceMap + timeSource timeSource + cleanupCtx context.Context + cleanupCancel context.CancelFunc + cleanupWg sync.WaitGroup + isStarted bool +} + +type cachedSourceMap struct { + consumer *sourcemap.Consumer + lastUsed time.Time } // newSourceMapStore creates an implementation of sourceMapsStore. The returned @@ -141,27 +165,28 @@ func newSourceMapsStore(log log.Logger, args SourceMapsArguments, metrics *sourc } return &sourceMapsStoreImpl{ - log: log, - cli: cli, - fs: fs, - args: args, - cache: make(map[string]*sourcemap.Consumer), - metrics: metrics, - locs: locs, + log: log, + cli: cli, + fs: fs, + args: args, + cache: make(map[string]*cachedSourceMap), + metrics: metrics, + locs: locs, + timeSource: realTimeSource{}, } } func (store *sourceMapsStoreImpl) GetSourceMap(sourceURL string, release string) (*sourcemap.Consumer, error) { - // TODO(rfratto): GetSourceMap is weak to transient errors, since it always - // caches the result, even when there's an error. This means that transient - // errors will be cached forever, preventing source maps from being retrieved. - store.cacheMut.Lock() defer store.cacheMut.Unlock() cacheKey := fmt.Sprintf("%s__%s", sourceURL, release) - if sm, ok := store.cache[cacheKey]; ok { - return sm, nil + if cached, ok := store.cache[cacheKey]; ok { + if cached != nil { + cached.lastUsed = store.timeSource.Now() + return cached.consumer, nil + } + return nil, nil } content, sourceMapURL, err := store.getSourceMapContent(sourceURL, release) @@ -177,11 +202,113 @@ func (store *sourceMapsStoreImpl) GetSourceMap(sourceURL string, release string) return nil, err } level.Info(store.log).Log("msg", "successfully parsed source map", "url", sourceMapURL, "release", release) - store.cache[cacheKey] = consumer + store.cache[cacheKey] = &cachedSourceMap{ + consumer: consumer, + lastUsed: store.timeSource.Now(), + } store.metrics.cacheSize.WithLabelValues(getOrigin(sourceURL)).Inc() return consumer, nil } +func (store *sourceMapsStoreImpl) CleanOldCacheEntries() { + store.cacheMut.Lock() + defer store.cacheMut.Unlock() + + ttl := store.args.Cache.TTL + for key, cached := range store.cache { + if cached != nil && cached.lastUsed.Before(store.timeSource.Now().Add(-ttl)) { + srcUrl := strings.SplitN(key, "__", 2)[0] + origin := getOrigin(srcUrl) + store.metrics.cacheSize.WithLabelValues(origin).Dec() + delete(store.cache, key) + } + } +} + +func (store *sourceMapsStoreImpl) CleanCachedErrors() { + store.cacheMut.Lock() + defer store.cacheMut.Unlock() + + for key, cached := range store.cache { + if cached == nil { + delete(store.cache, key) + } + } +} + +// Start begins the cleanup routines based on configured cache intervals. +func (store *sourceMapsStoreImpl) Start() { + store.cacheMut.Lock() + defer store.cacheMut.Unlock() + + if store.isStarted { + return + } + store.isStarted = true + + cacheConfig := store.args.Cache + if cacheConfig == nil { + return + } + + store.cleanupCtx, store.cleanupCancel = context.WithCancel(context.Background()) + + if d := cacheConfig.CleanupCheckInterval; d > 0 { + store.cleanupWg.Add(1) + go func(interval time.Duration) { + defer store.cleanupWg.Done() + store.CleanOldCacheEntries() + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-store.cleanupCtx.Done(): + return + case <-ticker.C: + store.CleanOldCacheEntries() + } + } + }(d) + } + + if d := cacheConfig.ErrorCleanupInterval; d > 0 { + store.cleanupWg.Add(1) + go func(interval time.Duration) { + defer store.cleanupWg.Done() + store.CleanCachedErrors() + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-store.cleanupCtx.Done(): + return + case <-ticker.C: + store.CleanCachedErrors() + } + } + }(d) + } +} + +// Stop terminates all cleanup goroutines and waits for them to finish. +func (store *sourceMapsStoreImpl) Stop() { + store.cacheMut.Lock() + defer store.cacheMut.Unlock() + + if !store.isStarted { + return + } + store.isStarted = false + + if store.cleanupCancel != nil { + store.cleanupCancel() + store.cleanupCancel = nil + } + + store.cleanupWg.Wait() + store.cleanupCtx = nil +} + func (store *sourceMapsStoreImpl) getSourceMapContent(sourceURL string, release string) (content []byte, sourceMapURL string, err error) { // Attempt to find the source map in the filesystem first. for _, loc := range store.locs { diff --git a/internal/component/faro/receiver/sourcemaps_test.go b/internal/component/faro/receiver/sourcemaps_test.go index 50fc15455cd..b19eec69350 100644 --- a/internal/component/faro/receiver/sourcemaps_test.go +++ b/internal/component/faro/receiver/sourcemaps_test.go @@ -9,13 +9,24 @@ import ( "os" "path/filepath" "testing" + "time" "github.com/grafana/alloy/internal/component/faro/receiver/internal/payload" alloyutil "github.com/grafana/alloy/internal/util" + "github.com/grafana/pyroscope/ebpf/util" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" ) +// mockTimeSource is a test helper for controlling time. +type mockTimeSource struct { + now time.Time +} + +func (m *mockTimeSource) Now() time.Time { + return m.now +} + func Test_traceContextKeptWhenStacktraceDefined(t *testing.T) { input := &payload.Exception{ Stacktrace: &payload.Stacktrace{}, @@ -631,6 +642,148 @@ func Test_sourceMapsStoreImpl_RealWorldPathValidation(t *testing.T) { require.Empty(t, fileService.reads, "should not read file when stat fails") } +func TestSourceMapsStoreImpl_CleanCachedErrors(t *testing.T) { + tt := []struct { + name string + cache map[string]*cachedSourceMap + expectedCacheSize int + }{ + { + name: "should remove cached error", + cache: map[string]*cachedSourceMap{ + "http://shouldRemoveCachedErrors.com__v1": nil, + }, + expectedCacheSize: 0, + }, + { + name: "should not remove from map if no errors", + cache: map[string]*cachedSourceMap{ + "http://shouldNotRemoveFromCache.com__v2": {}, + }, + expectedCacheSize: 1, + }, + { + name: "should not remove from map if no errors", + cache: map[string]*cachedSourceMap{ + "http://shouldNotRemoveFromCache.com__v1": {}, + "http://shouldNotRemoveFromCache.com__v2": {}, + }, + expectedCacheSize: 2, + }, + { + name: "should remove only cached errors", + cache: map[string]*cachedSourceMap{ + "http://shouldNotRemoveFromCache.com__v1": nil, + "http://shouldNotRemoveFromCache.com__v2": {}, + }, + expectedCacheSize: 1, + }, + } + + logger := util.TestLogger(t) + + for _, tc := range tt { + reg := prometheus.NewRegistry() + metrics := newSourceMapMetrics(reg) + + store := &sourceMapsStoreImpl{ + log: logger, + args: SourceMapsArguments{Cache: &CacheArguments{TTL: 5 * time.Minute}}, + metrics: metrics, + cli: &mockHTTPClient{}, + fs: newTestFileService(), + cache: tc.cache, + timeSource: &mockTimeSource{now: time.Now()}, + } + + t.Run(tc.name, func(t *testing.T) { + store.CleanCachedErrors() + require.Equal(t, tc.expectedCacheSize, len(store.cache)) + }) + } +} + +func TestSourceMapsStoreImpl_CleanOldCachedEntries(t *testing.T) { + tt := []struct { + name string + cache map[string]*cachedSourceMap + timeSource *mockTimeSource + cacheTimeout time.Duration + expectedCacheSize int + }{ + { + name: "should clear entry from cache if too old", + cache: map[string]*cachedSourceMap{ + "http://shouldRemoveCachedErrors.com__v1": {lastUsed: time.Now()}, + }, + timeSource: &mockTimeSource{now: time.Now().Add(5 * time.Minute)}, + cacheTimeout: 5 * time.Minute, + expectedCacheSize: 0, + }, + { + name: "should not clear entry from cache if not too old", + cache: map[string]*cachedSourceMap{ + "http://shouldRemoveCachedErrors.com__v1": {lastUsed: time.Now()}, + }, + timeSource: &mockTimeSource{now: time.Now().Add(3 * time.Minute)}, + cacheTimeout: 5 * time.Minute, + expectedCacheSize: 1, + }, + { + name: "should clear only old entries from cache", + cache: map[string]*cachedSourceMap{ + "http://shouldRemoveCachedErrors.com__v1": {lastUsed: time.Now()}, + "http://shouldRemoveCachedErrors.com__v2": {lastUsed: time.Now().Add(-5 * time.Minute)}, + }, + timeSource: &mockTimeSource{now: time.Now()}, + cacheTimeout: 5 * time.Minute, + expectedCacheSize: 1, + }, + { + name: "should not clear multiple entries", + cache: map[string]*cachedSourceMap{ + "http://shouldRemoveCachedErrors.com__v1": {lastUsed: time.Now().Add(3 * time.Minute)}, + "http://shouldRemoveCachedErrors.com__v2": {lastUsed: time.Now().Add(4 * time.Minute)}, + }, + timeSource: &mockTimeSource{now: time.Now()}, + cacheTimeout: 5 * time.Minute, + expectedCacheSize: 2, + }, + { + name: "should clear multiple old entries from cache", + cache: map[string]*cachedSourceMap{ + "http://shouldRemoveCachedErrors.com__v1": {lastUsed: time.Now().Add(-10 * time.Minute)}, + "http://shouldRemoveCachedErrors.com__v2": {lastUsed: time.Now().Add(-7 * time.Minute)}, + }, + timeSource: &mockTimeSource{now: time.Now()}, + cacheTimeout: 5 * time.Minute, + expectedCacheSize: 0, + }, + } + + logger := util.TestLogger(t) + + for _, tc := range tt { + reg := prometheus.NewRegistry() + metrics := newSourceMapMetrics(reg) + + store := &sourceMapsStoreImpl{ + log: logger, + args: SourceMapsArguments{Cache: &CacheArguments{TTL: tc.cacheTimeout}}, + metrics: metrics, + cli: &mockHTTPClient{}, + fs: newTestFileService(), + cache: tc.cache, + timeSource: tc.timeSource, + } + + t.Run(tc.name, func(t *testing.T) { + store.CleanOldCacheEntries() + require.Equal(t, tc.expectedCacheSize, len(store.cache)) + }) + } +} + type mockHTTPClient struct { responses []struct { *http.Response