Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
* cache integrity and size events so late listeners still get them
* pass expected integrity to cacache
* pass integrityEmitter to cacache to avoid a redundant integrity stream
* remove in-memory buffering in favor of full time streaming
  • Loading branch information
wraithgar authored and lukekarrys committed May 19, 2022
1 parent 632ce87 commit 7b2b77a
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 111 deletions.
138 changes: 43 additions & 95 deletions node_modules/make-fetch-happen/lib/cache/entry.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@
const { Request, Response } = require('minipass-fetch')
const Minipass = require('minipass')
const MinipassCollect = require('minipass-collect')
const MinipassFlush = require('minipass-flush')
const MinipassPipeline = require('minipass-pipeline')
const cacache = require('cacache')
const url = require('url')

const CachingMinipassPipeline = require('../pipeline.js')
const CachePolicy = require('./policy.js')
const cacheKey = require('./key.js')
const remote = require('../remote.js')

const hasOwnProperty = (obj, prop) => Object.prototype.hasOwnProperty.call(obj, prop)

// maximum amount of data we will buffer into memory
// if we'll exceed this, we switch to streaming
const MAX_MEM_SIZE = 5 * 1024 * 1024 // 5MB

// allow list for request headers that will be written to the cache index
// note: we will also store any request headers
// that are named in a response's vary header
Expand Down Expand Up @@ -256,13 +251,12 @@ class CacheEntry {
}

const size = this.response.headers.get('content-length')
const fitsInMemory = !!size && Number(size) < MAX_MEM_SIZE
const shouldBuffer = this.options.memoize !== false && fitsInMemory
const cacheOpts = {
algorithms: this.options.algorithms,
metadata: getMetadata(this.request, this.response, this.options),
size,
memoize: fitsInMemory && this.options.memoize,
integrity: this.options.integrity,
integrityEmitter: this.response.body.hasIntegrityEmitter && this.response.body,
}

let body = null
Expand All @@ -275,52 +269,31 @@ class CacheEntry {
cacheWriteReject = reject
})

body = new MinipassPipeline(new MinipassFlush({
body = new CachingMinipassPipeline({ events: ['integrity', 'size'] }, new MinipassFlush({
flush () {
return cacheWritePromise
},
}))

let abortStream, onResume
if (shouldBuffer) {
// if the result fits in memory, use a collect stream to gather
// the response and write it to cacache while also passing it through
// to the user
onResume = () => {
const collector = new MinipassCollect.PassThrough()
abortStream = collector
collector.on('collect', (data) => {
// TODO if the cache write fails, log a warning but return the response anyway
cacache.put(this.options.cachePath, this.key, data, cacheOpts)
.then(cacheWriteResolve, cacheWriteReject)
})
body.unshift(collector)
body.unshift(this.response.body)
}
} else {
// if it does not fit in memory, create a tee stream and use
// that to pipe to both the cache and the user simultaneously
onResume = () => {
const tee = new Minipass()
const cacheStream = cacache.put.stream(this.options.cachePath, this.key, cacheOpts)
abortStream = cacheStream
tee.pipe(cacheStream)
// TODO if the cache write fails, log a warning but return the response anyway
cacheStream.promise().then(cacheWriteResolve, cacheWriteReject)
body.unshift(tee)
body.unshift(this.response.body)
}
// this is always true since if we aren't reusing the one from the remote fetch, we
// are using the one from cacache
body.hasIntegrityEmitter = true

const onResume = () => {
const tee = new Minipass()
const cacheStream = cacache.put.stream(this.options.cachePath, this.key, cacheOpts)
// re-emit the integrity and size events on our new response body so they can be reused
cacheStream.on('integrity', i => body.emit('integrity', i))
cacheStream.on('size', s => body.emit('size', s))
// stick a flag on here so downstream users will know if they can expect integrity events
tee.pipe(cacheStream)
// TODO if the cache write fails, log a warning but return the response anyway
cacheStream.promise().then(cacheWriteResolve, cacheWriteReject)
body.unshift(tee)
body.unshift(this.response.body)
}

body.once('resume', onResume)
body.once('end', () => body.removeListener('resume', onResume))
this.response.body.on('error', (err) => {
// the abortStream will either be a MinipassCollect if we buffer
// or a cacache write stream, either way be sure to listen for
// errors from the actual response and avoid writing data that we
// know to be invalid to the cache
abortStream.destroy(err)
})
} else {
await cacache.index.insert(this.options.cachePath, this.key, null, cacheOpts)
}
Expand All @@ -331,7 +304,7 @@ class CacheEntry {
// the header anyway
this.response.headers.set('x-local-cache', encodeURIComponent(this.options.cachePath))
this.response.headers.set('x-local-cache-key', encodeURIComponent(this.key))
this.response.headers.set('x-local-cache-mode', shouldBuffer ? 'buffer' : 'stream')
this.response.headers.set('x-local-cache-mode', 'stream')
this.response.headers.set('x-local-cache-status', status)
this.response.headers.set('x-local-cache-time', new Date().toISOString())
const newResponse = new Response(body, {
Expand All @@ -346,9 +319,6 @@ class CacheEntry {
// use the cached data to create a response and return it
async respond (method, options, status) {
let response
const size = Number(this.response.headers.get('content-length'))
const fitsInMemory = !!size && size < MAX_MEM_SIZE
const shouldBuffer = this.options.memoize !== false && fitsInMemory
if (method === 'HEAD' || [301, 308].includes(this.response.status)) {
// if the request is a HEAD, or the response is a redirect,
// then the metadata in the entry already includes everything
Expand All @@ -358,66 +328,44 @@ class CacheEntry {
// we're responding with a full cached response, so create a body
// that reads from cacache and attach it to a new Response
const body = new Minipass()
const removeOnResume = () => body.removeListener('resume', onResume)
let onResume
if (shouldBuffer) {
onResume = async () => {
removeOnResume()
try {
const content = await cacache.get.byDigest(
const headers = { ...this.policy.responseHeaders() }
const onResume = () => {
const cacheStream = cacache.get.stream.byDigest(
this.options.cachePath, this.entry.integrity, { memoize: this.options.memoize }
)
cacheStream.on('error', async (err) => {
cacheStream.pause()
if (err.code === 'EINTEGRITY') {
await cacache.rm.content(
this.options.cachePath, this.entry.integrity, { memoize: this.options.memoize }
)
body.end(content)
} catch (err) {
if (err.code === 'EINTEGRITY') {
await cacache.rm.content(
this.options.cachePath, this.entry.integrity, { memoize: this.options.memoize }
)
}
if (err.code === 'ENOENT' || err.code === 'EINTEGRITY') {
await CacheEntry.invalidate(this.request, this.options)
}
body.emit('error', err)
}
}
} else {
onResume = () => {
const cacheStream = cacache.get.stream.byDigest(
this.options.cachePath, this.entry.integrity, { memoize: this.options.memoize }
)
cacheStream.on('error', async (err) => {
cacheStream.pause()
if (err.code === 'EINTEGRITY') {
await cacache.rm.content(
this.options.cachePath, this.entry.integrity, { memoize: this.options.memoize }
)
}
if (err.code === 'ENOENT' || err.code === 'EINTEGRITY') {
await CacheEntry.invalidate(this.request, this.options)
}
body.emit('error', err)
cacheStream.resume()
})
cacheStream.pipe(body)
}
if (err.code === 'ENOENT' || err.code === 'EINTEGRITY') {
await CacheEntry.invalidate(this.request, this.options)
}
body.emit('error', err)
cacheStream.resume()
})
// emit the integrity and size events based on our metadata so we're consistent
body.emit('integrity', this.entry.integrity)
body.emit('size', Number(headers['content-length']))
cacheStream.pipe(body)
}

body.once('resume', onResume)
body.once('end', removeOnResume)
body.once('end', () => body.removeListener('resume', onResume))
response = new Response(body, {
url: this.entry.metadata.url,
counter: options.counter,
status: 200,
headers: {
...this.policy.responseHeaders(),
},
headers,
})
}

response.headers.set('x-local-cache', encodeURIComponent(this.options.cachePath))
response.headers.set('x-local-cache-hash', encodeURIComponent(this.entry.integrity))
response.headers.set('x-local-cache-key', encodeURIComponent(this.key))
response.headers.set('x-local-cache-mode', shouldBuffer ? 'buffer' : 'stream')
response.headers.set('x-local-cache-mode', 'stream')
response.headers.set('x-local-cache-status', status)
response.headers.set('x-local-cache-time', new Date(this.entry.metadata.time).toUTCString())
return response
Expand Down
41 changes: 41 additions & 0 deletions node_modules/make-fetch-happen/lib/pipeline.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
'use strict'

const MinipassPipeline = require('minipass-pipeline')

class CachingMinipassPipeline extends MinipassPipeline {
#events = []
#data = new Map()

constructor (opts, ...streams) {
// CRITICAL: do NOT pass the streams to the call to super(), this will start
// the flow of data and potentially cause the events we need to catch to emit
// before we've finished our own setup. instead we call super() with no args,
// finish our setup, and then push the streams into ourselves to start the
// data flow
super()
this.#events = opts.events

/* istanbul ignore next - coverage disabled because this is pointless to test here */
if (streams.length) {
this.push(...streams)
}
}

on (event, handler) {
if (this.#events.includes(event) && this.#data.has(event)) {
return handler(...this.#data.get(event))
}

return super.on(event, handler)
}

emit (event, ...data) {
if (this.#events.includes(event)) {
this.#data.set(event, data)
}

return super.emit(event, ...data)
}
}

module.exports = CachingMinipassPipeline
13 changes: 11 additions & 2 deletions node_modules/make-fetch-happen/lib/remote.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
const Minipass = require('minipass')
const MinipassPipeline = require('minipass-pipeline')
const fetch = require('minipass-fetch')
const promiseRetry = require('promise-retry')
const ssri = require('ssri')

const CachingMinipassPipeline = require('./pipeline.js')
const getAgent = require('./agent.js')
const pkg = require('../package.json')

Expand Down Expand Up @@ -53,7 +53,16 @@ const remoteFetch = (request, options) => {
// we got a 200 response and the user has specified an expected
// integrity value, so wrap the response in an ssri stream to verify it
const integrityStream = ssri.integrityStream({ integrity: _opts.integrity })
res = new fetch.Response(new MinipassPipeline(res.body, integrityStream), res)
const pipeline = new CachingMinipassPipeline({
events: ['integrity', 'size'],
}, res.body, integrityStream)
// we also propagate the integrity and size events out to the pipeline so we can use
// this new response body as an integrityEmitter for cacache
integrityStream.on('integrity', i => pipeline.emit('integrity', i))
integrityStream.on('size', s => pipeline.emit('size', s))
res = new fetch.Response(pipeline, res)
// set an explicit flag so we know if our response body will emit integrity and size
res.body.hasIntegrityEmitter = true
}

res.headers.set('x-fetch-attempts', attemptNum)
Expand Down
8 changes: 4 additions & 4 deletions node_modules/make-fetch-happen/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "make-fetch-happen",
"version": "10.1.3",
"version": "10.1.5",
"description": "Opinionated, caching, retrying fetch client",
"main": "lib/index.js",
"files": [
Expand Down Expand Up @@ -37,7 +37,7 @@
"license": "ISC",
"dependencies": {
"agentkeepalive": "^4.2.1",
"cacache": "^16.0.2",
"cacache": "^16.1.0",
"http-cache-semantics": "^4.1.0",
"http-proxy-agent": "^5.0.0",
"https-proxy-agent": "^5.0.0",
Expand All @@ -55,7 +55,7 @@
},
"devDependencies": {
"@npmcli/eslint-config": "^3.0.1",
"@npmcli/template-oss": "3.4.3",
"@npmcli/template-oss": "3.5.0",
"mkdirp": "^1.0.4",
"nock": "^13.2.4",
"rimraf": "^3.0.2",
Expand All @@ -73,6 +73,6 @@
},
"templateOSS": {
"//@npmcli/template-oss": "This file is partially managed by @npmcli/template-oss. Edits may be overwritten.",
"version": "3.4.3"
"version": "3.5.0"
}
}
18 changes: 9 additions & 9 deletions package-lock.json
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@
"libnpmsearch": "^5.0.2",
"libnpmteam": "^4.0.2",
"libnpmversion": "^3.0.1",
"make-fetch-happen": "^10.1.3",
"make-fetch-happen": "^10.1.5",
"minipass": "^3.1.6",
"minipass-pipeline": "^1.2.4",
"mkdirp": "^1.0.4",
Expand Down Expand Up @@ -4609,13 +4609,13 @@
"peer": true
},
"node_modules/make-fetch-happen": {
"version": "10.1.3",
"resolved": "https://registry.npmjs.org/make-fetch-happen/-/make-fetch-happen-10.1.3.tgz",
"integrity": "sha512-s/UjmGjUHn9m52cctFhN2ITObbT+axoUhgeir8xGrOlPbKDyJsdhQzb8PGncPQQ28uduHybFJ6Iumy2OZnreXw==",
"version": "10.1.5",
"resolved": "https://registry.npmjs.org/make-fetch-happen/-/make-fetch-happen-10.1.5.tgz",
"integrity": "sha512-mucOj2H0Jn/ax7H9K9T1bf0p1nn/mBFa551Os7ed9xRfLEx20aZhZeLslmRYfAaAqXZUGipcs+m5KOKvOH0XKA==",
"inBundle": true,
"dependencies": {
"agentkeepalive": "^4.2.1",
"cacache": "^16.0.2",
"cacache": "^16.1.0",
"http-cache-semantics": "^4.1.0",
"http-proxy-agent": "^5.0.0",
"https-proxy-agent": "^5.0.0",
Expand Down Expand Up @@ -13269,12 +13269,12 @@
"peer": true
},
"make-fetch-happen": {
"version": "10.1.3",
"resolved": "https://registry.npmjs.org/make-fetch-happen/-/make-fetch-happen-10.1.3.tgz",
"integrity": "sha512-s/UjmGjUHn9m52cctFhN2ITObbT+axoUhgeir8xGrOlPbKDyJsdhQzb8PGncPQQ28uduHybFJ6Iumy2OZnreXw==",
"version": "10.1.5",
"resolved": "https://registry.npmjs.org/make-fetch-happen/-/make-fetch-happen-10.1.5.tgz",
"integrity": "sha512-mucOj2H0Jn/ax7H9K9T1bf0p1nn/mBFa551Os7ed9xRfLEx20aZhZeLslmRYfAaAqXZUGipcs+m5KOKvOH0XKA==",
"requires": {
"agentkeepalive": "^4.2.1",
"cacache": "^16.0.2",
"cacache": "^16.1.0",
"http-cache-semantics": "^4.1.0",
"http-proxy-agent": "^5.0.0",
"https-proxy-agent": "^5.0.0",
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
"libnpmsearch": "^5.0.2",
"libnpmteam": "^4.0.2",
"libnpmversion": "^3.0.1",
"make-fetch-happen": "^10.1.3",
"make-fetch-happen": "^10.1.5",
"minipass": "^3.1.6",
"minipass-pipeline": "^1.2.4",
"mkdirp": "^1.0.4",
Expand Down

0 comments on commit 7b2b77a

Please sign in to comment.