@@ -33,8 +33,7 @@ const Destructible = require('destructible')
33
33
const Amalgamator = require ( 'amalgamate' )
34
34
const Locker = require ( 'amalgamate/locker' )
35
35
36
- const callbackify = require ( 'prospective/callbackify' )
37
-
36
+ const cadence = require ( 'cadence' )
38
37
const packet = require ( './packet' )
39
38
40
39
// Inheritence.
@@ -54,7 +53,8 @@ const rescue = require('rescue')
54
53
const ascension = require ( 'ascension' )
55
54
56
55
const mvcc = {
57
- riffle : require ( 'riffle' )
56
+ riffle : require ( 'riffle' ) ,
57
+ satiate : require ( 'satiate' )
58
58
}
59
59
60
60
// TODO Let's see if we can get throught his without having to worry about
@@ -63,54 +63,50 @@ function encode (buffer) {
63
63
return Buffer . isBuffer ( buffer ) ? buffer : Buffer . from ( buffer )
64
64
}
65
65
66
- class Paginator {
67
- constructor ( iterator , constraint ) {
68
- this . _iterator = iterator
69
- this . _constraint = constraint
70
- this . _keyAsBuffer = constraint . options . keyAsBuffer
71
- this . _valueAsBuffer = constraint . options . valueAsBuffer
72
- this . _keys = constraint . options . keys
73
- this . _values = constraint . options . values
74
- this . _items = [ ]
75
- this . _index = 0
76
- }
66
+ function Paginator ( iterator , constraint ) {
67
+ this . _iterator = mvcc . satiate ( iterator , 1 )
68
+ this . _constraint = constraint
69
+ this . _keyAsBuffer = constraint . options . keyAsBuffer
70
+ this . _valueAsBuffer = constraint . options . valueAsBuffer
71
+ this . _keys = constraint . options . keys
72
+ this . _values = constraint . options . values
73
+ this . _items = [ ]
74
+ this . _index = 0
75
+ }
77
76
78
- async next ( ) {
79
- for ( ; ; ) {
80
- if ( this . _items . length != this . _index ) {
81
- const item = this . _items [ this . _index ++ ]
82
- if ( this . _constraint . included ( item ) ) {
83
- const result = new Array ( 2 )
84
- if ( this . _keys ) {
85
- result [ 0 ] = this . _keyAsBuffer ? item . parts [ 1 ] : item . parts [ 1 ] . toString ( )
86
- }
87
- if ( this . _values ) {
88
- result [ 1 ] = this . _valueAsBuffer ? item . parts [ 2 ] : item . parts [ 2 ] . toString ( )
89
- }
90
- return result
91
- } else {
92
- return [ ]
93
- }
94
- } else {
95
- let items = null
96
- const trampoline = new Trampoline
97
- this . _iterator . next ( trampoline , $items => items = $items )
98
- while ( trampoline . seek ( ) ) {
99
- await trampoline . shift ( )
100
- }
101
- if ( this . _iterator . done ) {
102
- return [ ]
103
- } else {
104
- this . _items = items
105
- this . _index = 0
106
- }
77
+ Paginator . prototype . next = cadence ( function ( step ) {
78
+ if ( this . _items . length != this . _index ) {
79
+ const item = this . _items [ this . _index ++ ]
80
+ if ( this . _constraint . included ( item ) ) {
81
+ const result = new Array ( 2 )
82
+ if ( this . _keys ) {
83
+ result [ 0 ] = this . _keyAsBuffer ? item . parts [ 1 ] : item . parts [ 1 ] . toString ( )
84
+ }
85
+ if ( this . _values ) {
86
+ result [ 1 ] = this . _valueAsBuffer ? item . parts [ 2 ] : item . parts [ 2 ] . toString ( )
107
87
}
88
+ return result
89
+ } else {
90
+ return [ ]
108
91
}
109
92
}
93
+ let items = null
94
+ step ( function ( ) {
95
+ const trampoline = new Trampoline
96
+ this . _iterator . next ( trampoline , $items => items = $items )
97
+ trampoline . bounce ( step ( ) )
98
+ } , function ( ) {
99
+ if ( this . _iterator . done ) {
100
+ return [ ]
101
+ } else {
102
+ this . _items = items
103
+ this . _index = 0
104
+ this . next ( step ( ) )
105
+ }
106
+ } )
107
+ } )
110
108
111
- release ( ) {
112
- this . _iterator [ 'return' ] ( )
113
- }
109
+ Paginator . prototype . release = function ( ) {
114
110
}
115
111
116
112
// An implementation of the LevelDOWN `Iterator` object.
@@ -137,9 +133,9 @@ util.inherits(Iterator, AbstractIterator)
137
133
// iterator modules.
138
134
139
135
//
140
- Iterator . prototype . _next = callbackify ( function ( ) {
141
- return this . _paginator . next ( )
142
- } )
136
+ Iterator . prototype . _next = function ( callback ) {
137
+ this . _paginator . next ( callback )
138
+ }
143
139
144
140
Iterator . prototype . _seek = function ( target ) {
145
141
const paginator = this . _paginator
@@ -168,7 +164,7 @@ Locket.prototype._serializeKey = encode
168
164
169
165
Locket . prototype . _serializeValue = encode
170
166
171
- Locket . prototype . _open = callbackify ( async function ( options ) {
167
+ Locket . prototype . _open = cadence ( function ( step , options ) {
172
168
this . _locker = new Locker ( { heft : coalesce ( options . heft , 1024 * 1024 ) } )
173
169
// TODO What is the behavior if you close while opening, or open while
174
170
// closing?
@@ -209,10 +205,15 @@ Locket.prototype._open = callbackify(async function (options) {
209
205
...this . _options ,
210
206
...options
211
207
} )
212
- await this . _amalgamator . ready
213
- await this . _amalgamator . count ( )
214
- await this . _amalgamator . locker . rotate ( )
215
- return [ ]
208
+ step ( function ( ) {
209
+ return this . _amalgamator . ready
210
+ } , function ( ) {
211
+ return this . _amalgamator . count ( )
212
+ } , function ( ) {
213
+ return this . _amalgamator . locker . rotate ( )
214
+ } , function ( ) {
215
+ return [ ]
216
+ } )
216
217
} )
217
218
218
219
// Iteration of the database requires merging the results from the deep storage
@@ -244,20 +245,21 @@ Locket.prototype._iterator = function (options) {
244
245
}
245
246
246
247
// TODO Maybe just leave this?
247
- Locket . prototype . _get = callbackify ( async function ( key , options ) {
248
+ Locket . prototype . _get = cadence ( function ( step , key , options ) {
248
249
const constraint = constrain ( Buffer . compare , encode , {
249
250
gte : key , keys : true , values : true , keyAsBuffer : true , valueAsBuffer : true
250
251
} )
251
252
const snapshot = this . _locker . snapshot ( )
252
253
const paginator = this . _paginator ( constraint , snapshot )
253
- // TODO How do I reuse Cursor.found out of Riffle et. al.? Eh, no good way
254
- // since we have to advance, merge, dilute, etc. anyway.
255
- const next = await paginator . next ( )
256
- this . _locker . release ( snapshot )
257
- if ( next . length != 0 && Buffer . compare ( next [ 0 ] , key ) == 0 ) {
258
- return [ options . asBuffer ? next [ 1 ] : next [ 1 ] . toString ( ) ]
259
- }
260
- throw new Error ( 'NotFoundError: not found' )
254
+ step ( function ( ) {
255
+ paginator . next ( step ( ) )
256
+ } , [ ] , function ( next ) {
257
+ this . _locker . release ( snapshot )
258
+ if ( next . length != 0 && Buffer . compare ( next [ 0 ] , key ) == 0 ) {
259
+ return [ options . asBuffer ? next [ 1 ] : next [ 1 ] . toString ( ) ]
260
+ }
261
+ throw new Error ( 'NotFoundError: not found' )
262
+ } )
261
263
} )
262
264
263
265
Locket . prototype . _put = function ( key , value , options , callback ) {
@@ -273,16 +275,19 @@ Locket.prototype._del = function (key, options, callback) {
273
275
// filter it. It does however mean at least two writes for every `put` or `del`
274
276
// and I suspect that common usage is ingle `put` or `del`, so going to include
275
277
// the count in ever record, it is only 32-bits.
276
- Locket . prototype . _batch = callbackify ( async function ( batch , options ) {
278
+ Locket . prototype . _batch = cadence ( function ( step , batch , options ) {
277
279
const version = ++ this . _version
278
280
this . _versions [ version ] = true
279
281
const mutator = this . _locker . mutator ( )
280
- await this . _amalgamator . merge ( mutator , batch , true )
281
- this . _locker . commit ( mutator )
282
- return [ ]
282
+ step ( function ( ) {
283
+ return this . _amalgamator . merge ( mutator , batch , true )
284
+ } , function ( ) {
285
+ this . _locker . commit ( mutator )
286
+ return [ ]
287
+ } )
283
288
} )
284
289
285
- Locket . prototype . _approximateSize = callbackify ( async function ( from , to ) {
290
+ Locket . prototype . _approximateSize = cadence ( function ( from , to ) {
286
291
const constraint = constrain ( Buffer . compare , encode , { gte : from , lte : to } )
287
292
let approximateSize = 0
288
293
for ( const items of this . _whatever ( ) ) {
@@ -294,7 +299,10 @@ Locket.prototype._approximateSize = callbackify(async function (from, to) {
294
299
} )
295
300
296
301
// TODO Countdown through the write queue.
297
- Locket . prototype . _close = callbackify ( async function ( ) {
298
- await this . _amalgamator . destructible . destroy ( ) . rejected
299
- return [ ]
302
+ Locket . prototype . _close = cadence ( function ( step ) {
303
+ step ( function ( ) {
304
+ return this . _amalgamator . destructible . destroy ( ) . rejected
305
+ } , function ( ) {
306
+ return [ ]
307
+ } )
300
308
} )
0 commit comments