From b3e417edf153d44f2f9154cd6563b78e84a95a1b Mon Sep 17 00:00:00 2001 From: Michal Piechowiak Date: Tue, 21 Oct 2025 17:32:09 +0200 Subject: [PATCH] refactor: abstract storage from default cache handler implementation --- src/run/handlers/use-cache-handler.ts | 464 ++++++++++++++------------ 1 file changed, 247 insertions(+), 217 deletions(-) diff --git a/src/run/handlers/use-cache-handler.ts b/src/run/handlers/use-cache-handler.ts index f5610a71af..ce4251e0b5 100644 --- a/src/run/handlers/use-cache-handler.ts +++ b/src/run/handlers/use-cache-handler.ts @@ -21,8 +21,8 @@ import { getTracer, withActiveSpan } from './tracer.cjs' // Most of this code is copied and adapted from Next.js default 'use cache' handler implementation // https://github.com/vercel/next.js/blob/84fde91e03918344c5d356986914ab68a5083462/packages/next/src/server/lib/cache-handlers/default.ts // this includes: -// - PrivateCacheEntry (with removed `isErrored` and `errorRetryCount` as those are not actually used there) -// - Main logic of .get and .set methods +// - PrivateCacheEntry (with removed `isErrored` and `errorRetryCount` as those are not actually used there) for default use cache handler +// - Main logic of .get and .set methods (especially for default cache handler which will use in-memory LRU cache) // Main difference is: // - Tag handling - default Next.js implementation handles tags in memory only, but we need to support tag // invalidation cross serverless instances, so we do use same persistent storage as we use for response and fetch cache @@ -30,14 +30,6 @@ import { getTracer, withActiveSpan } from './tracer.cjs' // and our serverless instances also can handle any page template so implementing it would not have good perf tradeoffs // - Addition of tracing -type PrivateCacheEntry = { - entry: CacheEntry - // compute size on set since we need to read size - // of the ReadableStream for LRU evicting - size: number -} - -type CacheHandleLRUCache = LRUCache type PendingSets = Map> type MultiVersionCacheHandler = CacheHandlerV2 & CacheHandlerWithUpdateTag @@ -60,22 +52,6 @@ const extendedGlobalThis = globalThis as typeof globalThis & { } } -function getLRUCache(): CacheHandleLRUCache { - if (extendedGlobalThis[LRU_CACHE_GLOBAL_KEY]) { - return extendedGlobalThis[LRU_CACHE_GLOBAL_KEY] - } - - const lruCache = new LRUCache({ - max: 1000, - maxSize: 50 * 1024 * 1024, // same as hardcoded default in Next.js - sizeCalculation: (value) => value.size, - }) - - extendedGlobalThis[LRU_CACHE_GLOBAL_KEY] = lruCache - - return lruCache -} - function getPendingSets(): PendingSets { if (extendedGlobalThis[PENDING_SETS_GLOBAL_KEY]) { return extendedGlobalThis[PENDING_SETS_GLOBAL_KEY] @@ -89,218 +65,272 @@ function getPendingSets(): PendingSets { // eslint-disable-next-line @typescript-eslint/no-empty-function const tmpResolvePendingBeforeCreatingAPromise = () => {} -export const NetlifyDefaultUseCacheHandler = { - get(cacheKey: string): ReturnType { - return withActiveSpan( - getTracer(), - 'DefaultUseCacheHandler.get', - async (span): ReturnType => { - getLogger().withFields({ cacheKey }).debug(`[NetlifyDefaultUseCacheHandler] get`) - span?.setAttributes({ - cacheKey, - }) - - const pendingPromise = getPendingSets().get(cacheKey) - if (pendingPromise) { - await pendingPromise - } - - const privateEntry = getLRUCache().get(cacheKey) - if (!privateEntry) { - getLogger() - .withFields({ cacheKey, status: 'MISS' }) - .debug(`[NetlifyDefaultUseCacheHandler] get result`) +function createUseCacheHandler({ + cacheHandlerName, + shouldCloneStreamForReturning, + storageGet, + storageSet, +}: { + cacheHandlerName: string + /** + * returning entry will cause stream to be consumed + * so we might need to clone it first if we store cache-entry in memory with stream instance + * to avoid consuming stream from storage + */ + shouldCloneStreamForReturning: boolean + storageGet: (key: string) => CacheEntry | undefined | Promise + storageSet: (key: string, entry: CacheEntry) => void | Promise +}): MultiVersionCacheHandler { + return { + get(cacheKey: string): ReturnType { + return withActiveSpan( + getTracer(), + `${cacheHandlerName}.get`, + async (span): ReturnType => { + getLogger().withFields({ cacheKey }).debug(`[${cacheHandlerName}] get`) span?.setAttributes({ - cacheStatus: 'miss', + cacheKey, }) - return undefined - } - - const { entry } = privateEntry - const ttl = (entry.timestamp + entry.revalidate * 1000 - Date.now()) / 1000 - if (ttl < 0) { - // In-memory caches should expire after revalidate time because it is - // unlikely that a new entry will be able to be used before it is dropped - // from the cache. + + const pendingPromise = getPendingSets().get(cacheKey) + if (pendingPromise) { + await pendingPromise + } + + const entry = await storageGet(cacheKey) + if (!entry) { + getLogger() + .withFields({ cacheKey, status: 'MISS' }) + .debug(`[${cacheHandlerName}] get result`) + span?.setAttributes({ + cacheStatus: 'miss', + }) + return undefined + } + + const ttl = (entry.timestamp + entry.revalidate * 1000 - Date.now()) / 1000 + if (ttl < 0) { + // In-memory caches should expire after revalidate time because it is + // unlikely that a new entry will be able to be used before it is dropped + // from the cache. + getLogger() + .withFields({ cacheKey, ttl, status: 'STALE' }) + .debug(`[${cacheHandlerName}] get result`) + span?.setAttributes({ + cacheStatus: 'expired, discarded', + ttl, + }) + return undefined + } + + const { stale, expired } = await isAnyTagStaleOrExpired(entry.tags, entry.timestamp) + + if (expired) { + getLogger() + .withFields({ cacheKey, ttl, status: 'EXPIRED BY TAG' }) + .debug(`[${cacheHandlerName}] get result`) + + span?.setAttributes({ + cacheStatus: 'expired tag, discarded', + ttl, + }) + return undefined + } + + let { revalidate, value } = entry + if (stale) { + revalidate = -1 + } + getLogger() - .withFields({ cacheKey, ttl, status: 'STALE' }) - .debug(`[NetlifyDefaultUseCacheHandler] get result`) + .withFields({ cacheKey, ttl, status: stale ? 'STALE' : 'HIT' }) + .debug(`[${cacheHandlerName}] get result`) span?.setAttributes({ - cacheStatus: 'expired, discarded', + cacheStatus: stale ? 'stale' : 'hit', ttl, }) - return undefined - } - - const { stale, expired } = await isAnyTagStaleOrExpired(entry.tags, entry.timestamp) - if (expired) { - getLogger() - .withFields({ cacheKey, ttl, status: 'EXPIRED BY TAG' }) - .debug(`[NetlifyDefaultUseCacheHandler] get result`) + if (shouldCloneStreamForReturning) { + // returning entry will cause stream to be consumed + // so we need to clone it first, so in-memory cache can + // be used again + const [returnStream, newSaved] = value.tee() + // mutate in-memory cache entry to one of tee'd streams as we did just consume its value + entry.value = newSaved + // use other tee'd stream for return value + value = returnStream + } + return { + ...entry, + revalidate, + value, + } + }, + ) + }, + set( + cacheKey: string, + pendingEntry: Promise, + ): ReturnType { + return withActiveSpan( + getTracer(), + `${cacheHandlerName}.set`, + async (span): ReturnType => { + getLogger().withFields({ cacheKey }).debug(`[${cacheHandlerName}] set`) span?.setAttributes({ - cacheStatus: 'expired tag, discarded', - ttl, + cacheKey, + }) + + let resolvePending: () => void = tmpResolvePendingBeforeCreatingAPromise + const pendingPromise = new Promise((resolve) => { + resolvePending = resolve }) - return undefined - } - - let { revalidate, value } = entry - if (stale) { - revalidate = -1 - } - - // returning entry will cause stream to be consumed - // so we need to clone it first, so in-memory cache can - // be used again - const [returnStream, newSaved] = value.tee() - entry.value = newSaved - - getLogger() - .withFields({ cacheKey, ttl, status: stale ? 'STALE' : 'HIT' }) - .debug(`[NetlifyDefaultUseCacheHandler] get result`) - span?.setAttributes({ - cacheStatus: stale ? 'stale' : 'hit', - ttl, - }) - - return { - ...entry, - revalidate, - value: returnStream, - } - }, - ) - }, - set( - cacheKey: string, - pendingEntry: Promise, - ): ReturnType { - return withActiveSpan( - getTracer(), - 'DefaultUseCacheHandler.set', - async (span): ReturnType => { - getLogger().withFields({ cacheKey }).debug(`[NetlifyDefaultUseCacheHandler]: set`) - span?.setAttributes({ - cacheKey, - }) - - let resolvePending: () => void = tmpResolvePendingBeforeCreatingAPromise - const pendingPromise = new Promise((resolve) => { - resolvePending = resolve - }) - - const pendingSets = getPendingSets() - - pendingSets.set(cacheKey, pendingPromise) - - const entry = await pendingEntry - - span?.setAttributes({ - cacheKey, - }) - - let size = 0 - try { - const [value, clonedValue] = entry.value.tee() - entry.value = value - const reader = clonedValue.getReader() - - for (let chunk; !(chunk = await reader.read()).done; ) { - size += Buffer.from(chunk.value).byteLength - } + + const pendingSets = getPendingSets() + + pendingSets.set(cacheKey, pendingPromise) + + const entry = await pendingEntry span?.setAttributes({ + cacheKey, tags: entry.tags, timestamp: entry.timestamp, revalidate: entry.revalidate, expire: entry.expire, }) - getLRUCache().set(cacheKey, { - entry, - size, + try { + await storageSet(cacheKey, entry) + } catch (error) { + getLogger().withError(error).error(`[${cacheHandlerName}.set] error`) + if (error instanceof Error) { + span?.recordException(error) + } + } finally { + resolvePending() + pendingSets.delete(cacheKey) + } + }, + ) + }, + async refreshTags(): Promise { + // we check tags on demand, so we don't need to do anything here + // additionally this is blocking and we do need to check tags in + // persisted storage, so if we would maintain in-memory tags manifests + // we would need to check more tags than current request needs + // while blocking pipeline + }, + getExpiration: function ( + // supporting both (...tags: string[]) and (tags: string[]) signatures + ...notNormalizedTags: string[] | string[][] + ): ReturnType { + return withActiveSpan( + getTracer(), + `${cacheHandlerName}.getExpiration`, + async (span): ReturnType => { + const tags = notNormalizedTags.flat() + + span?.setAttributes({ + tags, }) - } catch (error) { - getLogger().withError(error).error('[NetlifyDefaultUseCacheHandler.set] error') - } finally { - resolvePending() - pendingSets.delete(cacheKey) - } - }, - ) - }, - async refreshTags(): Promise { - // we check tags on demand, so we don't need to do anything here - // additionally this is blocking and we do need to check tags in - // persisted storage, so if we would maintain in-memory tags manifests - // we would need to check more tags than current request needs - // while blocking pipeline - }, - getExpiration: function ( - // supporting both (...tags: string[]) and (tags: string[]) signatures - ...notNormalizedTags: string[] | string[][] - ): ReturnType { - return withActiveSpan( - getTracer(), - 'DefaultUseCacheHandler.getExpiration', - async (span): ReturnType => { - const tags = notNormalizedTags.flat() - - span?.setAttributes({ - tags, - }) - - const expiration = await getMostRecentTagExpirationTimestamp(tags) - - getLogger() - .withFields({ tags, expiration }) - .debug(`[NetlifyDefaultUseCacheHandler] getExpiration`) - span?.setAttributes({ - expiration, - }) - - return expiration - }, - ) - }, - // this is for CacheHandlerV2 - expireTags(...tags: string[]): ReturnType { - return withActiveSpan( - getTracer(), - 'DefaultUseCacheHandler.expireTags', - async (span): ReturnType => { - getLogger().withFields({ tags }).debug(`[NetlifyDefaultUseCacheHandler] expireTags`) - span?.setAttributes({ - tags, - }) - - await markTagsAsStaleAndPurgeEdgeCache(tags) - }, - ) + + const expiration = await getMostRecentTagExpirationTimestamp(tags) + + getLogger().withFields({ tags, expiration }).debug(`[${cacheHandlerName}] getExpiration`) + span?.setAttributes({ + expiration, + }) + + return expiration + }, + ) + }, + // this is for CacheHandlerV2 + expireTags(...tags: string[]): ReturnType { + return withActiveSpan( + getTracer(), + `${cacheHandlerName}.expireTags`, + async (span): ReturnType => { + getLogger().withFields({ tags }).debug(`[${cacheHandlerName}] expireTags`) + span?.setAttributes({ + tags, + }) + + await markTagsAsStaleAndPurgeEdgeCache(tags) + }, + ) + }, + // this is for CacheHandlerV3 / Next 16 + updateTags( + tags: string[], + durations: RevalidateTagDurations, + ): ReturnType { + return withActiveSpan( + getTracer(), + 'DefaultUseCacheHandler.updateTags', + async (span): ReturnType => { + getLogger().withFields({ tags, durations }).debug(`[${cacheHandlerName}] updateTags`) + span?.setAttributes({ + tags, + durations: JSON.stringify(durations), + }) + await markTagsAsStaleAndPurgeEdgeCache(tags, durations) + }, + ) + }, + } satisfies MultiVersionCacheHandler +} + +type PrivateCacheEntry = { + entry: CacheEntry + // compute size on set since we need to read size + // of the ReadableStream for LRU evicting + size: number +} + +type CacheHandleLRUCache = LRUCache + +function getLRUCache(): CacheHandleLRUCache { + if (extendedGlobalThis[LRU_CACHE_GLOBAL_KEY]) { + return extendedGlobalThis[LRU_CACHE_GLOBAL_KEY] + } + + const lruCache = new LRUCache({ + max: 1000, + maxSize: 50 * 1024 * 1024, // same as hardcoded default in Next.js + sizeCalculation: (value) => value.size, + }) + + extendedGlobalThis[LRU_CACHE_GLOBAL_KEY] = lruCache + + return lruCache +} + +const NetlifyDefaultUseCacheHandler = createUseCacheHandler({ + cacheHandlerName: 'NetlifyDefaultUseCacheHandler', + // we store ReadableStream in in-memory LRU cache so we must ensure it stays usable + shouldCloneStreamForReturning: true, + storageGet: async (cacheKey: string) => { + return getLRUCache().get(cacheKey)?.entry }, - // this is for CacheHandlerV3 / Next 16 - updateTags( - tags: string[], - durations: RevalidateTagDurations, - ): ReturnType { - return withActiveSpan( - getTracer(), - 'DefaultUseCacheHandler.updateTags', - async (span): ReturnType => { - getLogger() - .withFields({ tags, durations }) - .debug(`[NetlifyDefaultUseCacheHandler] updateTags`) - span?.setAttributes({ - tags, - durations: JSON.stringify(durations), - }) - await markTagsAsStaleAndPurgeEdgeCache(tags, durations) - }, - ) + storageSet: async (cacheKey: string, entry: CacheEntry) => { + let size = 0 + const [value, clonedValue] = entry.value.tee() + entry.value = value + const reader = clonedValue.getReader() + + for (let chunk; !(chunk = await reader.read()).done; ) { + size += Buffer.from(chunk.value).byteLength + } + + getLRUCache().set(cacheKey, { + entry, + size, + }) }, -} satisfies MultiVersionCacheHandler +}) export function configureUseCacheHandlers() { extendedGlobalThis[cacheHandlersSymbol] = {