Skip to content
This repository was archived by the owner on Mar 10, 2020. It is now read-only.

Commit 6cf130f

Browse files
committed
chore: decompose modules and reuse import logic for expanding files
License: MIT Signed-off-by: achingbrain <[email protected]>
1 parent d06e73b commit 6cf130f

File tree

6 files changed

+275
-251
lines changed

6 files changed

+275
-251
lines changed

src/core/write/find-file-size.js

Lines changed: 0 additions & 15 deletions
This file was deleted.

src/core/write/import-node.js

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,35 +6,34 @@ const values = require('pull-stream/sources/values')
66
const collect = require('pull-stream/sinks/collect')
77
const importer = require('ipfs-unixfs-engine').importer
88
const {
9-
limitStreamBytes
9+
loadNode
1010
} = require('../utils')
1111

12-
const importNode = (ipfs, parent, fileName, source, options, callback) => {
12+
const defaultOptions = {
13+
progress: undefined,
14+
hash: undefined,
15+
cidVersion: undefined,
16+
strategy: undefined
17+
}
18+
19+
const importStream = (ipfs, source, options, callback) => {
20+
options = Object.assign({}, defaultOptions, options)
21+
1322
waterfall([
14-
(done) => pull(
23+
(cb) => pull(
1524
values([{
16-
content: pull(
17-
source,
18-
limitStreamBytes(options.length)
19-
)
25+
content: pull(source)
2026
}]),
2127
importer(ipfs._ipld, {
2228
progress: options.progress,
2329
hashAlg: options.hash,
2430
cidVersion: options.cidVersion,
2531
strategy: options.strategy
2632
}),
27-
collect(done)
33+
collect(cb)
2834
),
29-
(results, done) => {
30-
const imported = results[0]
31-
32-
done(null, {
33-
size: imported.size,
34-
multihash: imported.multihash
35-
})
36-
}
35+
(results, cb) => loadNode(ipfs, results[0], cb)
3736
], callback)
3837
}
3938

40-
module.exports = importNode
39+
module.exports = importStream

src/core/write/index.js

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ const {
99
traverseTo,
1010
addLink,
1111
updateTree,
12+
limitStreamBytes,
1213
FILE_SEPARATOR
1314
} = require('../utils')
1415
const values = require('pull-stream/sources/values')
@@ -22,6 +23,7 @@ const isNode = require('detect-node')
2223
const fileReaderStream = require('filereader-stream')
2324
const isPullStream = require('is-pull-stream')
2425
const cat = require('pull-cat')
26+
const pull = require('pull-stream/pull')
2527

2628
let fs
2729

@@ -165,8 +167,13 @@ module.exports = function mfsWrite (ipfs) {
165167
])
166168
}
167169

170+
source = pull(
171+
source,
172+
limitStreamBytes(options.length)
173+
)
174+
168175
log('Importing file', fileName)
169-
importNode(ipfs, containingFolder, fileName, source, options, next)
176+
importNode(ipfs, source, options, next)
170177
}
171178
},
172179

src/core/write/update-node.js

Lines changed: 12 additions & 218 deletions
Original file line numberDiff line numberDiff line change
@@ -6,34 +6,25 @@ const {
66
} = UnixFs
77
const pull = require('pull-stream/pull')
88
const cat = require('pull-cat')
9-
const values = require('pull-stream/sources/values')
109
const collect = require('pull-stream/sinks/collect')
1110
const pushable = require('pull-pushable')
1211
const map = require('pull-stream/throughs/map')
13-
const asyncMap = require('pull-stream/throughs/async-map')
1412
const filter = require('pull-stream/throughs/filter')
15-
const paramap = require('pull-paramap')
16-
const {
17-
leafFirst
18-
} = require('pull-traverse')
1913
const waterfall = require('async/waterfall')
2014
const parallel = require('async/parallel')
21-
const findFileSize = require('./find-file-size')
2215
const {
23-
DAGNode,
2416
DAGLink
2517
} = require('ipld-dag-pb')
2618
const log = require('debug')('mfs:write:update-node')
27-
const bs58 = require('bs58')
2819
const {
2920
limitStreamBytes,
30-
addLink,
3121
createNode,
3222
zeros,
3323
loadNode,
3424
MAX_CHUNK_SIZE
3525
} = require('../utils')
36-
const importer = require('ipfs-unixfs-engine').importer
26+
const importNode = require('./import-node')
27+
const updateNodeBytes = require('./update-tree')
3728

3829
const updateNode = (ipfs, cidToUpdate, source, options, callback) => {
3930
let offset = options.offset || 0
@@ -50,7 +41,8 @@ const updateNode = (ipfs, cidToUpdate, source, options, callback) => {
5041
waterfall([
5142
(done) => loadNode(ipfs, cidToUpdate, done),
5243
(node, done) => {
53-
const fileSize = findFileSize(node)
44+
const meta = unmarshal(node.data)
45+
const fileSize = meta.fileSize()
5446

5547
log(`Updating bytes ${streamStart}-${streamEnd} of ${fileSize} bytes from ${cidToUpdate.toBaseEncodedString()} with source`)
5648

@@ -118,10 +110,10 @@ const updateNode = (ipfs, cidToUpdate, source, options, callback) => {
118110
// wait for both streams to end
119111
parallel([
120112
// set up pull stream for replacing bytes
121-
(cb) => updateNodeBytes(node, fileSize, updateSource, cb),
113+
(cb) => updateNodeBytes(ipfs, node, fileSize, streamStart, streamEnd, updateSource, options, cb),
122114

123115
// setup pull stream for appending bytes
124-
(cb) => appendNodeBytes(appendSource, cb)
116+
(cb) => importNode(ipfs, appendSource, options, cb)
125117
], next)
126118
},
127119
([updatedNode, appendedNode], next) => {
@@ -143,8 +135,8 @@ const updateNode = (ipfs, cidToUpdate, source, options, callback) => {
143135
return createNode(ipfs, newMeta.marshal(), [], options, next)
144136
} else {
145137
// We expanded one DAGNode into two so create a tree
146-
const link1 = new DAGLink('', updatedMeta.fileSize(), updatedNode.multihash)
147-
const link2 = new DAGLink('', appendedMeta.fileSize(), appendedNode.multihash)
138+
const link1 = new DAGLink('', updatedNode.data.length, updatedNode.multihash)
139+
const link2 = new DAGLink('', appendedNode.data.length, appendedNode.multihash)
148140

149141
const newMeta = new UnixFs(updatedMeta.type)
150142
newMeta.addBlockSize(updatedMeta.fileSize())
@@ -156,219 +148,21 @@ const updateNode = (ipfs, cidToUpdate, source, options, callback) => {
156148
}
157149

158150
// if we added new bytes, add them to the root node of the original file
159-
// this is consistent with the go implementation but probably broken
151+
// this is consistent with the go implementation but probably not the right thing to do
160152

161153
// update UnixFs metadata on the root node
162154
updatedMeta.addBlockSize(appendedMeta.fileSize())
163155

164-
return waterfall([
165-
(cb) => DAGNode.create(updatedMeta.marshal(), updatedNode.links, cb),
166-
(newNode, cb) => addLink(ipfs, {
167-
parent: newNode,
168-
child: appendedNode
169-
}, cb)
170-
], next)
156+
return createNode(ipfs, updatedMeta.marshal(), updatedNode.links.concat(
157+
new DAGLink('', appendedNode.data.length, appendedNode.multihash)
158+
), options, next)
171159
}
172160

173161
next(null, updatedNode)
174162
}
175163
], done)
176164
}
177165
], callback)
178-
179-
function appendNodeBytes (source, callback) {
180-
waterfall([
181-
(cb) => pull(
182-
values([{
183-
content: pull(source)
184-
}]),
185-
importer(ipfs._ipld, {
186-
progress: options.progress,
187-
hashAlg: options.hash,
188-
cidVersion: options.cidVersion,
189-
strategy: options.strategy
190-
}),
191-
collect(cb)
192-
),
193-
(results, cb) => loadNode(ipfs, results[0], cb)
194-
], callback)
195-
}
196-
197-
function updateNodeBytes (node, fileSize, source, callback) {
198-
waterfall([
199-
(cb) => pull(
200-
source,
201-
asyncMap((buffer, done) => {
202-
// Find the DAGNodes that contain the data at the specified offset/length
203-
// Merge the data and create new DAGNodes with the merged data
204-
// Keep a record of the new CIDs and update the tree
205-
206-
pull(
207-
leafFirst({
208-
parent: null,
209-
link: null,
210-
index: null,
211-
node,
212-
nodeStart: streamPosition,
213-
nodeEnd: fileSize
214-
}, findDAGNodesWithRequestedData),
215-
paramap(updateNodeData(buffer)),
216-
filter(Boolean),
217-
asyncMap((link, next) => {
218-
if (!link.parent || link.index === undefined) {
219-
return next(null, link)
220-
}
221-
222-
// Create a new list of links
223-
const links = link.parent.node.links.map((existingLink, index) => {
224-
if (index === link.index) {
225-
return new DAGLink('', link.size, link.multihash)
226-
}
227-
228-
return existingLink
229-
})
230-
231-
// Update node's parent
232-
waterfall([
233-
// Create a DAGNode with the new data
234-
(cb) => createNode(ipfs, link.parent.node.data, links, options, cb),
235-
(newNode, cb) => {
236-
link.parent.node = newNode
237-
238-
cb(null, link)
239-
}
240-
], next)
241-
}),
242-
collect((error, results) => {
243-
let updatedRoot
244-
245-
if (!error) {
246-
updatedRoot = results[0]
247-
248-
while (updatedRoot.parent) {
249-
updatedRoot = updatedRoot.parent
250-
}
251-
252-
if (updatedRoot.node) {
253-
updatedRoot = updatedRoot.node
254-
}
255-
}
256-
257-
offset += buffer.length
258-
259-
log(`Updated root is ${bs58.encode(updatedRoot.multihash)}`)
260-
261-
done(error, updatedRoot)
262-
})
263-
)
264-
}),
265-
collect((error, results) => cb(error, results && results[0]))
266-
),
267-
(updatedNodeCID, cb) => loadNode(ipfs, updatedNodeCID, cb)
268-
], callback)
269-
}
270-
271-
// Where we currently are in the existing file
272-
let streamPosition = 0
273-
274-
// Returns a pull stream that will load the data from the children of the passed node
275-
function findDAGNodesWithRequestedData ({ node }) {
276-
const meta = unmarshal(node.data)
277-
278-
log(`Node links ${node.links.length}, block sizes ${meta.blockSizes}`)
279-
280-
const parent = {
281-
node: node
282-
}
283-
284-
// work out which child nodes contain the requested data
285-
const filteredLinks = node.links
286-
.map((link, index) => {
287-
const child = {
288-
parent,
289-
link: link,
290-
index: index,
291-
nodeStart: streamPosition,
292-
nodeEnd: streamPosition + meta.blockSizes[index]
293-
}
294-
295-
streamPosition = child.nodeEnd
296-
297-
return child
298-
})
299-
.filter((child, index) => {
300-
log('child.nodeStart', child.nodeStart, 'child.nodeEnd', child.nodeEnd, 'streamStart', streamStart, 'streamEnd', streamEnd)
301-
302-
return (streamStart >= child.nodeStart && streamStart < child.nodeEnd) || // child has begin byte
303-
(streamEnd > child.nodeStart && streamEnd <= child.nodeEnd) || // child has end byte
304-
(streamStart < child.nodeStart && streamEnd > child.nodeEnd) // child is between begin and end bytes
305-
})
306-
307-
if (filteredLinks.length) {
308-
// move stream position to the first node we're going to return data from
309-
streamPosition = filteredLinks[0].nodeStart
310-
311-
log(`Updating links with index(es) ${filteredLinks.map(link => link.index).join(',')}`)
312-
} else {
313-
log(`No links to update`)
314-
}
315-
316-
return pull(
317-
values(filteredLinks),
318-
paramap((child, cb) => {
319-
loadNode(ipfs, child.link, (error, node) => {
320-
cb(error, Object.assign({}, child, {
321-
node
322-
}))
323-
})
324-
})
325-
)
326-
}
327-
328-
function updateNodeData (newContent) {
329-
return ({ parent, link, nodeStart, node, index }, done) => {
330-
if (!node || !node.data) {
331-
return done()
332-
}
333-
334-
const meta = unmarshal(node.data)
335-
336-
if (!meta || !meta.data || !meta.data.length) {
337-
return done()
338-
}
339-
340-
const targetStart = streamStart - nodeStart
341-
const sourceStart = 0
342-
let sourceEnd = streamEnd - streamStart
343-
344-
if (meta.data.length < sourceEnd) {
345-
// we need to write to another DAGNode so increment the streamStart
346-
// by the number of bytes from buffer we've written
347-
streamStart += meta.data.length
348-
}
349-
350-
const newData = Buffer.from(meta.data)
351-
newContent.copy(newData, targetStart, sourceStart, sourceEnd)
352-
353-
const nodeData = new UnixFs(meta.type, newData).marshal()
354-
355-
waterfall([
356-
// Create a DAGNode with the new data
357-
(cb) => createNode(ipfs, nodeData, [], options, cb),
358-
(newNode, cb) => {
359-
log(`Created DAGNode with new data with hash ${bs58.encode(newNode.multihash)} to replace ${bs58.encode(node.multihash)}`)
360-
361-
// Store the CID and friends so we can update it's parent's links
362-
cb(null, {
363-
parent: parent,
364-
index: index,
365-
multihash: newNode.multihash,
366-
size: newNode.size
367-
})
368-
}
369-
], done)
370-
}
371-
}
372166
}
373167

374168
module.exports = updateNode

0 commit comments

Comments
 (0)