@@ -10,7 +10,8 @@ const DirSharded = require('ipfs-unixfs-importer/src/importer/dir-sharded')
1010const series = require ( 'async/series' )
1111const log = require ( 'debug' ) ( 'ipfs:mfs:core:utils:add-link' )
1212const UnixFS = require ( 'ipfs-unixfs' )
13- const Bucket = require ( 'hamt-sharding' )
13+ const Bucket = require ( 'hamt-sharding/src/bucket' )
14+ const loadNode = require ( './load-node' )
1415
1516const defaultOptions = {
1617 parent : undefined ,
@@ -78,11 +79,29 @@ const addLink = (context, options, callback) => {
7879 return convertToShardedDirectory ( context , options , callback )
7980 }
8081
81- log ( ' Adding to regular directory' )
82+ log ( ` Adding ${ options . name } to regular directory` )
8283
8384 addToDirectory ( context , options , callback )
8485}
8586
87+ const convertToShardedDirectory = ( context , options , callback ) => {
88+ createShard ( context , options . parent . links . map ( link => ( {
89+ name : link . name ,
90+ size : link . size ,
91+ multihash : link . cid . buffer
92+ } ) ) . concat ( {
93+ name : options . name ,
94+ size : options . size ,
95+ multihash : options . cid . buffer
96+ } ) , { } , ( err , result ) => {
97+ if ( ! err ) {
98+ log ( 'Converted directory to sharded directory' , result . cid . toBaseEncodedString ( ) )
99+ }
100+
101+ callback ( err , result )
102+ } )
103+ }
104+
86105const addToDirectory = ( context , options , callback ) => {
87106 waterfall ( [
88107 ( done ) => {
@@ -112,125 +131,116 @@ const addToDirectory = (context, options, callback) => {
112131 ] , callback )
113132}
114133
115- const addToShardedDirectory = async ( context , options , callback ) => {
116- const bucket = new Bucket ( {
117- hashFn : DirSharded . hashFn
118- } )
119- const position = await bucket . _findNewBucketAndPos ( options . name )
120- const prefix = position . pos
134+ const addToShardedDirectory = ( context , options , callback ) => {
135+ return waterfall ( [
136+ ( cb ) => recreateHamtLevel ( options . parent . links , cb ) ,
137+ ( rootBucket , cb ) => findPosition ( options . name , rootBucket , ( err , position ) => cb ( err , { rootBucket, position } ) ) ,
138+ ( { rootBucket, position } , cb ) => {
139+ // the path to the root bucket
140+ let path = [ {
141+ position : position . pos ,
142+ bucket : position . bucket
143+ } ]
144+ let currentBucket = position . bucket
145+
146+ while ( currentBucket !== rootBucket ) {
147+ path . push ( {
148+ bucket : currentBucket ,
149+ position : currentBucket . _posAtParent
150+ } )
151+
152+ currentBucket = currentBucket . _parent
153+ }
154+
155+ cb ( null , {
156+ rootBucket,
157+ path
158+ } )
159+ } ,
160+ ( { rootBucket, path } , cb ) => updateShard ( context , options . parent , rootBucket , path , {
161+ name : options . name ,
162+ cid : options . cid ,
163+ size : options . size
164+ } , options , ( err , results = { } ) => cb ( err , { rootBucket, node : results . node } ) ) ,
165+ ( { rootBucket, node } , cb ) => updateHamtDirectory ( context , node . links , rootBucket , options , cb )
166+ ] , callback )
167+ }
168+
169+ const updateShard = ( context , parent , rootBucket , positions , child , options , callback ) => {
170+ const {
171+ bucket,
172+ position
173+ } = positions . pop ( )
174+
175+ const prefix = position
121176 . toString ( '16' )
122177 . toUpperCase ( )
123178 . padStart ( 2 , '0' )
124179 . substring ( 0 , 2 )
125180
126- const existingSubShard = options . parent . links
127- . filter ( link => link . name === prefix )
128- . pop ( )
129-
130- if ( existingSubShard ) {
131- log ( `Descending into sub-shard ${ prefix } to add link ${ options . name } ` )
132-
133- return addLink ( context , {
134- ...options ,
135- parent : null ,
136- parentCid : existingSubShard . cid
137- } , ( err , { cid, node } ) => {
138- if ( err ) {
139- return callback ( err )
181+ const link = parent . links
182+ . find ( link => link . name . substring ( 0 , 2 ) === prefix && link . name !== `${ prefix } ${ child . name } ` )
183+
184+ return waterfall ( [
185+ ( cb ) => {
186+ if ( link && link . name . length > 2 ) {
187+ log ( `Converting existing file ${ link . name } into sub-shard for ${ child . name } ` )
188+
189+ return waterfall ( [
190+ ( done ) => createShard ( context , [ {
191+ name : link . name . substring ( 2 ) ,
192+ size : link . size ,
193+ multihash : link . cid . buffer
194+ } , {
195+ name : child . name ,
196+ size : child . size ,
197+ multihash : child . cid . buffer
198+ } ] , { } , done ) ,
199+ ( { node : { links : [ shard ] } } , done ) => {
200+ return context . ipld . get ( shard . cid , ( err , result ) => {
201+ done ( err , { cid : shard . cid , node : result && result . value } )
202+ } )
203+ } ,
204+ ( { cid, node } , cb ) => updateShardParent ( context , bucket , parent , link . name , node , cid , prefix , options , cb )
205+ ] , cb )
140206 }
141207
142- // make sure parent is updated with new sub-shard cid
143- addToDirectory ( context , {
144- ...options ,
145- parent : options . parent ,
146- parentCid : options . parentCid ,
147- name : prefix ,
148- size : node . size ,
149- cid : cid
150- } , callback )
151- } )
152- }
153-
154- const existingFile = options . parent . links
155- . filter ( link => link . name . substring ( 2 ) === options . name )
156- . pop ( )
157-
158- if ( existingFile ) {
159- log ( `Updating file ${ existingFile . name } ` )
160-
161- return addToDirectory ( context , {
162- ...options ,
163- name : existingFile . name
164- } , callback )
165- }
166-
167- const existingUnshardedFile = options . parent . links
168- . filter ( link => link . name . substring ( 0 , 2 ) === prefix )
169- . pop ( )
170-
171- if ( existingUnshardedFile ) {
172- log ( `Replacing file ${ existingUnshardedFile . name } with sub-shard` )
173-
174- return createShard ( context , [ {
175- name : existingUnshardedFile . name . substring ( 2 ) ,
176- size : existingUnshardedFile . size ,
177- multihash : existingUnshardedFile . cid . buffer
178- } , {
179- name : options . name ,
180- size : options . size ,
181- multihash : options . cid . buffer
182- } ] , {
183- root : false
184- } , ( err , result ) => {
185- if ( err ) {
186- return callback ( err )
208+ if ( link && link . name . length === 2 ) {
209+ log ( `Descending into sub-shard` , child . name )
210+
211+ return waterfall ( [
212+ ( cb ) => loadNode ( context , link , cb ) ,
213+ ( { node } , cb ) => {
214+ Promise . all (
215+ node . links . map ( link => {
216+ if ( link . name . length === 2 ) {
217+ // add a bucket for the subshard of this subshard
218+ const pos = parseInt ( link . name , 16 )
219+
220+ bucket . _putObjectAt ( pos , new Bucket ( {
221+ hashFn : DirSharded . hashFn
222+ } , bucket , pos ) )
223+
224+ return Promise . resolve ( )
225+ }
226+
227+ // add to the root and let changes cascade down
228+ return rootBucket . put ( link . name . substring ( 2 ) , true )
229+ } )
230+ )
231+ . then ( ( ) => cb ( null , { node } ) )
232+ . catch ( error => cb ( error ) )
233+ } ,
234+ ( { node } , cb ) => updateShard ( context , node , bucket , positions , child , options , cb ) ,
235+ ( { cid, node } , cb ) => updateShardParent ( context , bucket , parent , link . name , node , cid , prefix , options , cb )
236+ ] , cb )
187237 }
188238
189- const newShard = result . node . links [ 0 ]
190-
191- waterfall ( [
192- ( done ) => DAGNode . rmLink ( options . parent , existingUnshardedFile . name , done ) ,
193- ( parent , done ) => DAGNode . addLink ( parent , newShard , done ) ,
194- ( parent , done ) => {
195- // Persist the new parent DAGNode
196- context . ipld . put ( parent , {
197- version : options . cidVersion ,
198- format : options . codec ,
199- hashAlg : options . hashAlg ,
200- hashOnly : ! options . flush
201- } , ( error , cid ) => done ( error , {
202- node : parent ,
203- cid
204- } ) )
205- }
206- ] , callback )
207- } )
208- }
209-
210- log ( `Appending ${ prefix + options . name } to shard` )
211-
212- return addToDirectory ( context , {
213- ...options ,
214- name : prefix + options . name
215- } , callback )
216- }
239+ log ( `Adding or replacing file` , prefix + child . name )
217240
218- const convertToShardedDirectory = ( context , options , callback ) => {
219- createShard ( context , options . parent . links . map ( link => ( {
220- name : link . name ,
221- size : link . size ,
222- multihash : link . cid . buffer
223- } ) ) . concat ( {
224- name : options . name ,
225- size : options . size ,
226- multihash : options . cid . buffer
227- } ) , { } , ( err , result ) => {
228- if ( ! err ) {
229- log ( 'Converted directory to sharded directory' , result . cid . toBaseEncodedString ( ) )
241+ updateShardParent ( context , bucket , parent , prefix + child . name , child , child . cid , prefix + child . name , options , cb )
230242 }
231-
232- callback ( err , result )
233- } )
243+ ] , callback )
234244}
235245
236246const createShard = ( context , contents , options , callback ) => {
@@ -267,4 +277,84 @@ const createShard = (context, contents, options, callback) => {
267277 )
268278}
269279
280+ const updateShardParent = ( context , bucket , parent , name , node , cid , prefix , options , callback ) => {
281+ waterfall ( [
282+ ( done ) => {
283+ if ( name ) {
284+ if ( name === prefix ) {
285+ log ( `Updating link ${ name } in shard parent` )
286+ } else {
287+ log ( `Removing link ${ name } from shard parent, adding link ${ prefix } ` )
288+ }
289+
290+ return DAGNode . rmLink ( parent , name , done )
291+ }
292+
293+ log ( `Adding link ${ prefix } to shard parent` )
294+ done ( null , parent )
295+ } ,
296+ ( parent , done ) => DAGNode . addLink ( parent , new DAGLink ( prefix , node . size , cid ) , done ) ,
297+ ( parent , done ) => updateHamtDirectory ( context , parent . links , bucket , options , done )
298+ ] , callback )
299+ }
300+
301+ const updateHamtDirectory = ( context , links , bucket , options , callback ) => {
302+ // update parent with new bit field
303+ waterfall ( [
304+ ( cb ) => {
305+ const data = Buffer . from ( bucket . _children . bitField ( ) . reverse ( ) )
306+ const dir = new UnixFS ( 'hamt-sharded-directory' , data )
307+ dir . fanout = bucket . tableSize ( )
308+ dir . hashType = DirSharded . hashFn . code
309+
310+ DAGNode . create ( dir . marshal ( ) , links , cb )
311+ } ,
312+ ( parent , done ) => {
313+ // Persist the new parent DAGNode
314+ context . ipld . put ( parent , {
315+ version : options . cidVersion ,
316+ format : options . codec ,
317+ hashAlg : options . hashAlg ,
318+ hashOnly : ! options . flush
319+ } , ( error , cid ) => done ( error , {
320+ node : parent ,
321+ cid
322+ } ) )
323+ }
324+ ] , callback )
325+ }
326+
327+ const recreateHamtLevel = ( links , callback ) => {
328+ // recreate this level of the HAMT
329+ const bucket = new Bucket ( {
330+ hashFn : DirSharded . hashFn
331+ } )
332+
333+ Promise . all (
334+ links . map ( link => {
335+ if ( link . name . length === 2 ) {
336+ const pos = parseInt ( link . name , 16 )
337+
338+ bucket . _putObjectAt ( pos , new Bucket ( {
339+ hashFn : DirSharded . hashFn
340+ } , bucket , pos ) )
341+
342+ return Promise . resolve ( )
343+ }
344+
345+ return bucket . put ( link . name . substring ( 2 ) , true )
346+ } )
347+ )
348+ . then ( ( ) => callback ( null , bucket ) )
349+ . catch ( error => callback ( error ) )
350+ }
351+
352+ const findPosition = async ( name , bucket , callback ) => {
353+ const position = await bucket . _findNewBucketAndPos ( name )
354+
355+ await bucket . put ( name , true )
356+
357+ callback ( null , position )
358+ }
359+
270360module . exports = addLink
0 commit comments