Skip to content

Commit

Permalink
Cache the tarballs in pacote
Browse files Browse the repository at this point in the history
Tell `fetch` not to cache it's downloaded tarballs, leave that to us

Also see npm/cli#2976 for my first attempt & some discussions as to how
we got here.

Fix: #73
Fix: npm/cli#2160
Close: npm/cli#2976
Close: #74

EDIT(@isaacs): Updated to add test, allow make-fetch-happen to use the
cache, pipe into cache properly, and avoid cache double-writes.

PR-URL: #75
Credit: @mjsir911, @isaacs
Close: #75
Reviewed-by: @wraithgar, @mjsir911
  • Loading branch information
mjsir911 authored and isaacs committed Apr 22, 2021
1 parent 99738cd commit 500a34f
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 22 deletions.
55 changes: 40 additions & 15 deletions lib/fetcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const _istream = Symbol('_istream')
const _assertType = Symbol('_assertType')
const _tarballFromCache = Symbol('_tarballFromCache')
const _tarballFromResolved = Symbol.for('pacote.Fetcher._tarballFromResolved')
const _cacheFetches = Symbol.for('pacote.Fetcher._cacheFetches')

class FetcherBase {
constructor (spec, opts) {
Expand Down Expand Up @@ -166,25 +167,19 @@ class FetcherBase {
}

// private, should be overridden.
// Note that they should *not* calculate or check integrity, but *just*
// return the raw tarball data stream.
// Note that they should *not* calculate or check integrity or cache,
// but *just* return the raw tarball data stream.
[_tarballFromResolved] () {
throw this.notImplementedError
}

// public, should not be overridden
tarball () {
return this.tarballStream(stream => new Promise((res, rej) => {
const buf = []
stream.on('error', er => rej(er))
stream.on('end', () => {
const data = Buffer.concat(buf)
data.integrity = this.integrity && String(this.integrity)
data.resolved = this.resolved
data.from = this.from
return res(data)
})
stream.on('data', d => buf.push(d))
return this.tarballStream(stream => stream.concat().then(data => {
data.integrity = this.integrity && String(this.integrity)
data.resolved = this.resolved
data.from = this.from
return data
}))
}

Expand All @@ -194,6 +189,10 @@ class FetcherBase {
return cacache.get.stream.byDigest(this.cache, this.integrity, this.opts)
}

get [_cacheFetches] () {
return true
}

[_istream] (stream) {
// everyone will need one of these, either for verifying or calculating
// We always set it, because we have might only have a weak legacy hex
Expand All @@ -203,7 +202,31 @@ class FetcherBase {
// gets to the point of re-setting the integrity.
const istream = ssri.integrityStream(this.opts)
istream.on('integrity', i => this.integrity = i)
return stream.on('error', er => istream.emit('error', er)).pipe(istream)
stream.on('error', er => istream.emit('error', er))

// if not caching this, just pipe through to the istream and return it
if (!this.opts.cache || !this[_cacheFetches])
return stream.pipe(istream)

// we have to return a stream that gets ALL the data, and proxies errors,
// but then pipe from the original tarball stream into the cache as well.
// To do this without losing any data, and since the cacache put stream
// is not a passthrough, we have to pipe from the original stream into
// the cache AFTER we pipe into the istream. Since the cache stream
// has an asynchronous flush to write its contents to disk, we need to
// defer the istream end until the cache stream ends.
stream.pipe(istream, { end: false })
const cstream = cacache.put.stream(
this.opts.cache,
`pacote:tarball:${this.from}`,
this.opts
)
stream.pipe(cstream)
// defer istream end until after cstream
// cache write errors should not crash the fetch, this is best-effort.
cstream.promise().catch(() => {}).then(() => istream.end())

return istream
}

pickIntegrityAlgorithm () {
Expand Down Expand Up @@ -232,7 +255,9 @@ class FetcherBase {
// An ENOENT trying to read a tgz file, for example, is Right Out.
isRetriableError (er) {
// TODO: check error class, once those are rolled out to our deps
return this.isDataCorruptionError(er) || er.code === 'ENOENT'
return this.isDataCorruptionError(er) ||
er.code === 'ENOENT' ||
er.code === 'EISDIR'
}

// Mostly internal, but has some uses
Expand Down
7 changes: 7 additions & 0 deletions lib/remote.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const Minipass = require('minipass')
// The default registry URL is a string of great magic.
const magic = /^https?:\/\/registry\.npmjs\.org\//

const _cacheFetches = Symbol.for('pacote.Fetcher._cacheFetches')
const _headers = Symbol('_headers')
class RemoteFetcher extends Fetcher {
constructor (spec, opts) {
Expand All @@ -21,6 +22,12 @@ class RemoteFetcher extends Fetcher {
this.pkgid = opts.pkgid ? opts.pkgid : `remote:${nameat}${this.resolved}`
}

// Don't need to cache tarball fetches in pacote, because make-fetch-happen
// will write into cacache anyway.
get [_cacheFetches] () {
return false
}

[_tarballFromResolved] () {
const stream = new Minipass()
const fetchOpts = {
Expand Down
22 changes: 22 additions & 0 deletions test/fetcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,28 @@ t.test('tarballFile', t => {
t.end()
})

t.test('fs read stream error', t => {
const fsm = require('fs-minipass')
const ReadStream = fsm.ReadStream
t.teardown(() => fsm.ReadStream = ReadStream)
fsm.ReadStream = class extends ReadStream {
emit (ev, data) {
if (ev === 'close')
super.emit('error', new Error('poop'))
else
super.emit(ev, data)
}
}

t.rejects(new FileFetcher(abbrevspec, {cache})
.tarballFile(target + '/err.tgz'), { message: 'poop' }, 'tarballFile')

t.rejects(new FileFetcher(abbrevspec, {cache})
.tarball(), { message: 'poop' }, 'tarball stream')

t.end()
})

t.test('fs write stream error', t => {
const fsm = require('fs-minipass')
const WriteStream = fsm.WriteStream
Expand Down
17 changes: 10 additions & 7 deletions test/file.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,29 @@ const FileFetcher = require('../lib/file.js')
const t = require('tap')
const { relative, resolve, basename } = require('path')
const npa = require('npm-package-arg')
const { promisify } = require('util')
const rimraf = promisify(require('rimraf'))
const mkdirp = require('mkdirp')
const me = t.testdir()
const me = t.testdir({cache: {}})
const cache = resolve(me, 'cache')
t.cleanSnapshot = str => str.split(process.cwd()).join('${CWD}')

const abbrev = resolve(__dirname, 'fixtures/abbrev-1.1.1.tgz')
const abbrevspec = `file:${relative(process.cwd(), abbrev)}`

t.test('basic', async t => {
const f = new FileFetcher(abbrevspec, {})
const f = new FileFetcher(abbrevspec, { cache })
t.same(f.types, ['file'])
const fm = await f.manifest()
t.matchSnapshot(fm, 'manifest')
t.equal(fm, f.package)
t.equal(await f.manifest(), fm, 'cached manifest')
t.matchSnapshot(await f.packument(), 'packument')
const pj = me + '/extract/package.json'
return t.resolveMatchSnapshot(f.extract(me + '/extract'), 'extract')
.then(() => t.matchSnapshot(require(pj), 'package.json extracted'))
t.matchSnapshot(await f.extract(me + '/extract'), 'extract')
t.matchSnapshot(require(pj), 'package.json extracted')
const fs = require('fs')
// just verify that the file is there.
t.same(fs.readdirSync(resolve(cache, 'content-v2/sha512/9e/77')), [
'bdfc8890fe1cc8858ea97439db06dcfb0e33d32ab634d0fff3bcf4a6e69385925eb1b86ac69d79ff56d4cd35f36d01f67dff546d7a192ccd4f6a7138a2d1',
], 'write cache content file')
})

const binString = resolve(__dirname, 'fixtures/bin-string.tgz')
Expand Down

0 comments on commit 500a34f

Please sign in to comment.