Skip to content
This repository was archived by the owner on Oct 1, 2021. It is now read-only.

Commit 2b14578

Browse files
authored
feat: add migration 9 to migrate pins to the datastore and back (#15)
Stores pins in the datastore for greatly increased speed of access. Borrows the pin-set code from js-ipfs to perform the reverse migration.
1 parent ca69f8b commit 2b14578

File tree

12 files changed

+650
-49
lines changed

12 files changed

+650
-49
lines changed

.aegir.js

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ module.exports = {
44
webpack: {
55
node: {
66
// this is needed until level stops using node buffers in browser code
7-
Buffer: true
7+
Buffer: true,
8+
9+
// needed by cbor, binary-parse-stream and nofilter
10+
stream: true
811
}
912
}
1013
}

README.md

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Migration tool for JS IPFS Repo
1+
# Migration tool for JS IPFS Repo <!-- omit in toc -->
22

33
[![Travis CI](https://flat.badgen.net/travis/ipfs/js-ipfs-repo-migrations)](https://travis-ci.com/ipfs/js-ipfs-repo-migrations)
44
[![codecov](https://codecov.io/gh/ipfs/js-ipfs-repo-migrations/branch/master/graph/badge.svg)](https://codecov.io/gh/ipfs/js-ipfs-repo-migrations)
@@ -15,30 +15,36 @@
1515
1616
This package is inspired by the [go-ipfs repo migration tool](https://github.com/ipfs/fs-repo-migrations/)
1717

18-
## Lead Maintainer
18+
## Lead Maintainer <!-- omit in toc -->
1919

20-
[Adam Uhlíř](https://github.com/auhau/)
20+
[Alex Potsides](http://github.com/achingbrain)
2121

22-
## Table of Contents
22+
## Table of Contents <!-- omit in toc -->
2323

2424
- [Background](#background)
2525
- [Install](#install)
2626
- [npm](#npm)
2727
- [Use in Node.js](#use-in-nodejs)
2828
- [Use in a browser with browserify, webpack or any other bundler](#use-in-a-browser-with-browserify-webpack-or-any-other-bundler)
29-
- [Use in a browser Using a script tag](#use-in-a-browser-using-a-script-tag)
3029
- [Usage](#usage)
3130
- [API](#api)
31+
- [`.migrate(path, toVersion, {ignoreLock, repoOptions, onProgress, isDryRun}) -> Promise<void>`](#migratepath-toversion-ignorelock-repooptions-onprogress-isdryrun---promisevoid)
32+
- [`onProgress(migration, counter, totalMigrations)`](#onprogressmigration-counter-totalmigrations)
33+
- [`.revert(path, toVersion, {ignoreLock, repoOptions, onProgress, isDryRun}) -> Promise<void>`](#revertpath-toversion-ignorelock-repooptions-onprogress-isdryrun---promisevoid)
34+
- [`getLatestMigrationVersion() -> int`](#getlatestmigrationversion---int)
3235
- [CLI](#cli)
3336
- [Creating a new migration](#creating-a-new-migration)
3437
- [Architecture of a migration](#architecture-of-a-migration)
38+
- [`.migrate(repoPath, repoOptions)`](#migraterepopath-repooptions)
39+
- [`.revert(repoPath, repoOptions)`](#revertrepopath-repooptions)
3540
- [Browser vs. NodeJS environments](#browser-vs-nodejs-environments)
3641
- [Guidelines](#guidelines)
3742
- [Integration with js-ipfs](#integration-with-js-ipfs)
43+
- [Tests](#tests)
3844
- [Empty migrations](#empty-migrations)
3945
- [Migrations matrix](#migrations-matrix)
4046
- [Developer](#developer)
41-
- [Module versioning notes](#module-versioning-notes)
47+
- [Module versioning notes](#module-versioning-notes)
4248
- [Contribute](#contribute)
4349
- [License](#license)
4450

@@ -266,7 +272,9 @@ This will create an empty migration with the next version.
266272

267273
| IPFS repo version | JS IPFS version |
268274
| -----------------: |:----------------:|
269-
| 7 | v0.0.0 - latest |
275+
| 7 | v0.0.0 |
276+
| 8 | v0.48.0 |
277+
| 9 | v0.49.0 |
270278

271279
## Developer
272280

migrations/migration-9/index.js

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
'use strict'
2+
3+
const CID = require('cids')
4+
const dagpb = require('ipld-dag-pb')
5+
const cbor = require('cbor')
6+
const multicodec = require('multicodec')
7+
const multibase = require('multibase')
8+
const pinset = require('./pin-set')
9+
const { createStore, cidToKey, PIN_DS_KEY, PinTypes } = require('./utils')
10+
11+
async function pinsToDatastore (blockstore, datastore, pinstore) {
12+
const mh = await datastore.get(PIN_DS_KEY)
13+
const cid = new CID(mh)
14+
15+
const pinRootBuf = await blockstore.get(cidToKey(cid))
16+
const pinRoot = dagpb.util.deserialize(pinRootBuf)
17+
18+
for await (const cid of pinset.loadSet(blockstore, pinRoot, PinTypes.recursive)) {
19+
const pin = {}
20+
21+
if (cid.version !== 0) {
22+
pin.version = version
23+
}
24+
25+
if (cid.codec !== 'dag-pb') {
26+
pin.codec = multicodec.getNumber(cid.codec)
27+
}
28+
29+
await pinstore.put(cidToKey(cid), cbor.encode(pin))
30+
}
31+
32+
for await (const cid of pinset.loadSet(blockstore, pinRoot, PinTypes.direct)) {
33+
const pin = {
34+
depth: 0
35+
}
36+
37+
if (cid.version !== 0) {
38+
pin.version = version
39+
}
40+
41+
if (cid.codec !== 'dag-pb') {
42+
pin.codec = multicodec.getNumber(cid.codec)
43+
}
44+
45+
await pinstore.put(cidToKey(cid), cbor.encode(pin))
46+
}
47+
48+
await blockstore.delete(cidToKey(cid))
49+
await datastore.delete(PIN_DS_KEY)
50+
}
51+
52+
async function pinsToDAG (blockstore, datastore, pinstore) {
53+
let recursivePins = []
54+
let directPins = []
55+
56+
for await (const { key, value } of pinstore.query({})) {
57+
const pin = cbor.decode(value)
58+
const cid = new CID(pin.version || 0, pin.codec && multicodec.getName(pin.codec) || 'dag-pb', multibase.decode('b' + key.toString().split('/').pop()))
59+
60+
if (pin.depth === 0) {
61+
directPins.push(cid)
62+
} else {
63+
recursivePins.push(cid)
64+
}
65+
}
66+
67+
const pinRoot = new dagpb.DAGNode(Buffer.alloc(0), [
68+
await pinset.storeSet(blockstore, PinTypes.recursive, recursivePins),
69+
await pinset.storeSet(blockstore, PinTypes.direct, directPins)
70+
])
71+
const buf = pinRoot.serialize()
72+
const cid = await dagpb.util.cid(buf, {
73+
cidVersion: 0,
74+
hashAlg: multicodec.SHA2_256,
75+
})
76+
await blockstore.put(cidToKey(cid), buf)
77+
await datastore.put(PIN_DS_KEY, cid.multihash)
78+
}
79+
80+
async function process (repoPath, options, fn) {
81+
const blockstore = await createStore(repoPath, 'blocks', options)
82+
const datastore = await createStore(repoPath, 'datastore', options)
83+
const pinstore = await createStore(repoPath, 'pins', options)
84+
85+
await blockstore.open()
86+
await datastore.open()
87+
await pinstore.open()
88+
89+
try {
90+
await fn(blockstore, datastore, pinstore)
91+
} finally {
92+
await pinstore.close()
93+
await datastore.close()
94+
await blockstore.close()
95+
}
96+
}
97+
98+
module.exports = {
99+
version: 9,
100+
description: 'Migrates pins to datastore',
101+
migrate: (repoPath, options = {}) => {
102+
return process(repoPath, options, pinsToDatastore)
103+
},
104+
revert: (repoPath, options = {}) => {
105+
return process(repoPath, options, pinsToDAG)
106+
}
107+
}

migrations/migration-9/pin-set.js

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
'use strict'
2+
3+
const CID = require('cids')
4+
const protobuf = require('protons')
5+
const fnv1a = require('fnv1a')
6+
const varint = require('varint')
7+
const dagpb = require('ipld-dag-pb')
8+
const { DAGNode, DAGLink } = dagpb
9+
const multicodec = require('multicodec')
10+
const pbSchema = require('./pin.proto')
11+
const { Buffer } = require('buffer')
12+
const { cidToKey, DEFAULT_FANOUT, MAX_ITEMS, EMPTY_KEY } = require('./utils')
13+
14+
const pb = protobuf(pbSchema)
15+
16+
function toB58String (hash) {
17+
return new CID(hash).toBaseEncodedString()
18+
}
19+
20+
function readHeader (rootNode) {
21+
// rootNode.data should be a buffer of the format:
22+
// < varint(headerLength) | header | itemData... >
23+
const rootData = rootNode.Data
24+
const hdrLength = varint.decode(rootData)
25+
const vBytes = varint.decode.bytes
26+
27+
if (vBytes <= 0) {
28+
throw new Error('Invalid Set header length')
29+
}
30+
31+
if (vBytes + hdrLength > rootData.length) {
32+
throw new Error('Impossibly large set header length')
33+
}
34+
35+
const hdrSlice = rootData.slice(vBytes, hdrLength + vBytes)
36+
const header = pb.Set.decode(hdrSlice)
37+
38+
if (header.version !== 1) {
39+
throw new Error(`Unsupported Set version: ${header.version}`)
40+
}
41+
42+
if (header.fanout > rootNode.Links.length) {
43+
throw new Error('Impossibly large fanout')
44+
}
45+
46+
return {
47+
header: header,
48+
data: rootData.slice(hdrLength + vBytes)
49+
}
50+
}
51+
52+
function hash (seed, key) {
53+
const buf = Buffer.alloc(4)
54+
buf.writeUInt32LE(seed, 0)
55+
const data = Buffer.concat([
56+
buf, Buffer.from(toB58String(key))
57+
])
58+
return fnv1a(data.toString('binary'))
59+
}
60+
61+
async function * walkItems (blockstore, node) {
62+
const pbh = readHeader(node)
63+
let idx = 0
64+
65+
for (const link of node.Links) {
66+
if (idx < pbh.header.fanout) {
67+
// the first pbh.header.fanout links are fanout bins
68+
// if a fanout bin is not 'empty', dig into and walk its DAGLinks
69+
const linkHash = link.Hash
70+
71+
if (!EMPTY_KEY.equals(linkHash.buffer)) {
72+
// walk the links of this fanout bin
73+
const buf = await blockstore.get(cidToKey(linkHash))
74+
const node = dagpb.util.deserialize(buf)
75+
76+
yield * walkItems(blockstore, node)
77+
}
78+
} else {
79+
// otherwise, the link is a pin
80+
yield link.Hash
81+
}
82+
83+
idx++
84+
}
85+
}
86+
87+
async function * loadSet (blockstore, rootNode, name) {
88+
const link = rootNode.Links.find(l => l.Name === name)
89+
90+
if (!link) {
91+
throw new Error('No link found with name ' + name)
92+
}
93+
94+
const buf = await blockstore.get(cidToKey(link.Hash))
95+
const node = dagpb.util.deserialize(buf)
96+
97+
yield * walkItems(blockstore, node)
98+
}
99+
100+
function storeItems (blockstore, items) {
101+
return storePins(items, 0)
102+
103+
async function storePins (pins, depth) {
104+
const pbHeader = pb.Set.encode({
105+
version: 1,
106+
fanout: DEFAULT_FANOUT,
107+
seed: depth
108+
})
109+
const headerBuf = Buffer.concat([
110+
Buffer.from(varint.encode(pbHeader.length)), pbHeader
111+
])
112+
const fanoutLinks = []
113+
114+
for (let i = 0; i < DEFAULT_FANOUT; i++) {
115+
fanoutLinks.push(new DAGLink('', 1, EMPTY_KEY))
116+
}
117+
118+
if (pins.length <= MAX_ITEMS) {
119+
const nodes = pins
120+
.map(item => {
121+
return ({
122+
link: new DAGLink('', 1, item.key),
123+
data: item.data || Buffer.alloc(0)
124+
})
125+
})
126+
// sorting makes any ordering of `pins` produce the same DAGNode
127+
.sort((a, b) => Buffer.compare(a.link.Hash.buffer, b.link.Hash.buffer))
128+
129+
const rootLinks = fanoutLinks.concat(nodes.map(item => item.link))
130+
const rootData = Buffer.concat(
131+
[headerBuf].concat(nodes.map(item => item.data))
132+
)
133+
134+
return new DAGNode(rootData, rootLinks)
135+
} else {
136+
// If the array of pins is > MAX_ITEMS, we:
137+
// - distribute the pins among `DEFAULT_FANOUT` bins
138+
// - create a DAGNode for each bin
139+
// - add each pin as a DAGLink to that bin
140+
// - create a root DAGNode
141+
// - add each bin as a DAGLink
142+
// - send that root DAGNode via callback
143+
// (using go-ipfs' "wasteful but simple" approach for consistency)
144+
// https://github.com/ipfs/go-ipfs/blob/master/pin/set.go#L57
145+
146+
const bins = pins.reduce((bins, pin) => {
147+
const n = hash(depth, pin.key) % DEFAULT_FANOUT
148+
bins[n] = n in bins ? bins[n].concat([pin]) : [pin]
149+
return bins
150+
}, [])
151+
152+
let idx = 0
153+
for (const bin of bins) {
154+
const child = await storePins(bin, depth + 1)
155+
156+
await storeChild(child, idx)
157+
158+
idx++
159+
}
160+
161+
return new DAGNode(headerBuf, fanoutLinks)
162+
}
163+
164+
async function storeChild (child, binIdx) {
165+
const opts = {
166+
version: 0,
167+
format: multicodec.DAG_PB,
168+
hashAlg: multicodec.SHA2_256,
169+
preload: false
170+
}
171+
172+
const buf = dagpb.util.serialize(child)
173+
const cid = dagpb.util.cid(buf, {
174+
cidVersion: 0,
175+
hashAlg: multicodec.SHA2_256,
176+
})
177+
await blockstore.put(cidToKey(cid), buf)
178+
179+
fanoutLinks[binIdx] = new DAGLink('', child.size, cid)
180+
}
181+
}
182+
}
183+
184+
async function storeSet (blockstore, type, cids) {
185+
const rootNode = await storeItems(blockstore, cids.map(cid => {
186+
return {
187+
key: cid,
188+
data: null
189+
}
190+
}))
191+
const buf = rootNode.serialize(rootNode)
192+
const cid = await dagpb.util.cid(buf, {
193+
cidVersion: 0,
194+
hashAlg: multicodec.SHA2_256
195+
})
196+
197+
await blockstore.put(cidToKey(cid), buf)
198+
199+
return new DAGLink(type, rootNode.size, cid)
200+
}
201+
202+
module.exports = {
203+
loadSet,
204+
storeSet
205+
}

0 commit comments

Comments
 (0)