|
1 | 1 | import type { ComposableCacheEntry, ComposableCacheHandler } from "types/cache"; |
2 | | -import type { CacheValue } from "types/overrides"; |
3 | 2 | import { writeTags } from "utils/cache"; |
4 | 3 | import { fromReadableStream, toReadableStream } from "utils/stream"; |
5 | 4 | import { debug } from "./logger"; |
6 | 5 |
|
7 | | -const pendingWritePromiseMap = new Map< |
8 | | - string, |
9 | | - Promise<CacheValue<"composable">> |
10 | | ->(); |
| 6 | +const pendingWritePromiseMap = new Map<string, Promise<ComposableCacheEntry>>(); |
11 | 7 |
|
12 | 8 | export default { |
13 | 9 | async get(cacheKey: string) { |
14 | 10 | try { |
15 | | - // We first check if we have a pending write for this cache key |
16 | | - // If we do, we return the pending promise instead of fetching the cache |
17 | | - if (pendingWritePromiseMap.has(cacheKey)) { |
18 | | - const stored = pendingWritePromiseMap.get(cacheKey); |
19 | | - if (stored) { |
20 | | - return stored.then((entry) => ({ |
21 | | - ...entry, |
22 | | - value: toReadableStream(entry.value), |
23 | | - })); |
24 | | - } |
25 | | - } |
| 11 | + const stored = pendingWritePromiseMap.get(cacheKey); |
| 12 | + if (stored) return stored; |
| 13 | + |
26 | 14 | const result = await globalThis.incrementalCache.get( |
27 | 15 | cacheKey, |
28 | 16 | "composable", |
29 | 17 | ); |
30 | | - if (!result?.value?.value) { |
31 | | - return undefined; |
32 | | - } |
| 18 | + if (!result?.value?.value) return undefined; |
33 | 19 |
|
34 | 20 | debug("composable cache result", result); |
35 | 21 |
|
36 | | - // We need to check if the tags associated with this entry has been revalidated |
37 | 22 | if ( |
38 | 23 | globalThis.tagCache.mode === "nextMode" && |
39 | 24 | result.value.tags.length > 0 |
@@ -69,73 +54,77 @@ export default { |
69 | 54 | }, |
70 | 55 |
|
71 | 56 | async set(cacheKey: string, pendingEntry: Promise<ComposableCacheEntry>) { |
72 | | - const promiseEntry = pendingEntry.then(async (entry) => ({ |
73 | | - ...entry, |
74 | | - value: await fromReadableStream(entry.value), |
75 | | - })); |
76 | | - pendingWritePromiseMap.set(cacheKey, promiseEntry); |
| 57 | + const teedPromise = pendingEntry.then((entry) => { |
| 58 | + // Optimization: We avoid consuming and stringifying the stream here, |
| 59 | + // because it creates double copies just to be discarded when this function |
| 60 | + // ends. This avoids unnecessary memory usage, and reduces GC pressure. |
| 61 | + const [stream1, stream2] = entry.value.tee(); |
| 62 | + return [ |
| 63 | + { ...entry, value: stream1 }, |
| 64 | + { ...entry, value: stream2 }, |
| 65 | + ] as const; |
| 66 | + }); |
77 | 67 |
|
78 | | - const entry = await promiseEntry.finally(() => { |
| 68 | + pendingWritePromiseMap.set( |
| 69 | + cacheKey, |
| 70 | + teedPromise.then(([entry]) => entry), |
| 71 | + ); |
| 72 | + |
| 73 | + const [, entryForStorage] = await teedPromise.finally(() => { |
79 | 74 | pendingWritePromiseMap.delete(cacheKey); |
80 | 75 | }); |
| 76 | + |
81 | 77 | await globalThis.incrementalCache.set( |
82 | 78 | cacheKey, |
83 | 79 | { |
84 | | - ...entry, |
85 | | - value: entry.value, |
| 80 | + ...entryForStorage, |
| 81 | + value: await fromReadableStream(entryForStorage.value), |
86 | 82 | }, |
87 | 83 | "composable", |
88 | 84 | ); |
| 85 | + |
89 | 86 | if (globalThis.tagCache.mode === "original") { |
90 | 87 | const storedTags = await globalThis.tagCache.getByPath(cacheKey); |
91 | | - const tagsToWrite = entry.tags.filter((tag) => !storedTags.includes(tag)); |
| 88 | + const tagsToWrite = []; |
| 89 | + for (const tag of entryForStorage.tags) { |
| 90 | + if (!storedTags.includes(tag)) { |
| 91 | + tagsToWrite.push({ tag, path: cacheKey }); |
| 92 | + } |
| 93 | + } |
92 | 94 | if (tagsToWrite.length > 0) { |
93 | | - await writeTags(tagsToWrite.map((tag) => ({ tag, path: cacheKey }))); |
| 95 | + await writeTags(tagsToWrite); |
94 | 96 | } |
95 | 97 | } |
96 | 98 | }, |
97 | 99 |
|
98 | | - async refreshTags() { |
99 | | - // We don't do anything for now, do we want to do something here ??? |
100 | | - return; |
101 | | - }, |
| 100 | + async refreshTags() {}, |
| 101 | + |
102 | 102 | async getExpiration(...tags: string[]) { |
103 | | - if (globalThis.tagCache.mode === "nextMode") { |
104 | | - return globalThis.tagCache.getLastRevalidated(tags); |
105 | | - } |
106 | | - // We always return 0 here, original tag cache are handled directly in the get part |
107 | | - // TODO: We need to test this more, i'm not entirely sure that this is working as expected |
108 | | - return 0; |
| 103 | + return globalThis.tagCache.mode === "nextMode" |
| 104 | + ? globalThis.tagCache.getLastRevalidated(tags) |
| 105 | + : 0; |
109 | 106 | }, |
| 107 | + |
110 | 108 | async expireTags(...tags: string[]) { |
111 | 109 | if (globalThis.tagCache.mode === "nextMode") { |
112 | 110 | return writeTags(tags); |
113 | 111 | } |
| 112 | + |
114 | 113 | const tagCache = globalThis.tagCache; |
115 | 114 | const revalidatedAt = Date.now(); |
116 | | - // For the original mode, we have more work to do here. |
117 | | - // We need to find all paths linked to to these tags |
118 | 115 | const pathsToUpdate = await Promise.all( |
119 | 116 | tags.map(async (tag) => { |
120 | 117 | const paths = await tagCache.getByTag(tag); |
121 | | - return paths.map((path) => ({ |
122 | | - path, |
123 | | - tag, |
124 | | - revalidatedAt, |
125 | | - })); |
| 118 | + return paths.map((path) => ({ path, tag, revalidatedAt })); |
126 | 119 | }), |
127 | 120 | ); |
128 | | - // We need to deduplicate paths, we use a set for that |
129 | | - const setToWrite = new Set<{ path: string; tag: string }>(); |
| 121 | + |
| 122 | + const dedupeMap = new Map(); |
130 | 123 | for (const entry of pathsToUpdate.flat()) { |
131 | | - setToWrite.add(entry); |
| 124 | + dedupeMap.set(`${entry.path}|${entry.tag}`, entry); |
132 | 125 | } |
133 | | - await writeTags(Array.from(setToWrite)); |
| 126 | + await writeTags(Array.from(dedupeMap.values())); |
134 | 127 | }, |
135 | 128 |
|
136 | | - // This one is necessary for older versions of next |
137 | | - async receiveExpiredTags(...tags: string[]) { |
138 | | - // This function does absolutely nothing |
139 | | - return; |
140 | | - }, |
| 129 | + async receiveExpiredTags() {}, |
141 | 130 | } satisfies ComposableCacheHandler; |
0 commit comments