18
18
t (timeout max-time)]
19
19
(let [[v p] (alts! [in t])]
20
20
(cond
21
- ; ; timed out, so process what we got
21
+ ; ; timed out, so process what we got
22
22
(= p t)
23
23
(do
24
24
(when (seq buf)
25
25
(>! out buf))
26
26
(recur [] (timeout max-time)))
27
27
28
- ; ; `in` is closed, so process what we got
28
+ ; ; `in` is closed, so process what we got
29
29
(nil? v)
30
30
(do
31
31
(when (seq buf)
32
32
(>! out buf))
33
33
(close! out))
34
34
35
- ; ; we will fill up the batch, so process it
35
+ ; ; we will fill up the batch, so process it
36
36
(== lim-1 (count buf))
37
37
(do
38
38
(>! out (conj buf v))
39
39
(recur [] (timeout max-time)))
40
40
41
- ; ; accumulate the buffer
41
+ ; ; accumulate the buffer
42
42
:else
43
43
(recur (conj buf v) t))))))
44
44
64
64
([fetch-fn] (start! fetch-fn nil ))
65
65
([fetch-fn opts]
66
66
(let [{:keys [max-batch-size max-batch-time buffer-size]
67
- :or {max-batch-size Integer/MAX_VALUE, max-batch-time 5 , buffer-size 1000 }} opts
67
+ :or {max-batch-size Integer/MAX_VALUE, max-batch-time 5 , buffer-size 1000 }} opts
68
68
c-in (chan buffer-size)
69
69
c-batches (chan )
70
70
promises (atom {})
76
76
ps @promises]
77
77
78
78
(cond
79
- ; ; got a batch to process
79
+ ; ; got a batch to process
80
80
(seq ids)
81
81
(do
82
82
(log/debug " processing id" ids)
86
86
e))]
87
87
(if (instance? Exception res)
88
88
(doseq [k ids]
89
- ; ; Deliver the exception but clear the cache
89
+ ; ; Deliver the exception but clear the cache
90
90
(deliver (get ps k) res)
91
91
(swap! promises dissoc k))
92
92
(doseq [[k v] (zipmap ids res)]
93
- ; ; Deliver the value
93
+ ; ; Deliver the value
94
94
(deliver (get ps k) v))))
95
95
(recur ))
96
96
97
- ; ; chan is closed
97
+ ; ; chan is closed
98
98
(nil? ids)
99
99
(log/debug " c-batches closed" )
100
100
101
- ; ; keep listening
101
+ ; ; keep listening
102
102
:else
103
103
(recur ))))
104
104
176
176
(stop! loader)
177
177
178
178
(with-open [ld (start! batch-fn {})]
179
- (println ) (deref (load-many ld [" alice" " bob" ])))
179
+ (doto (deref (load-many ld [" alice" " bob" ]))
180
+ println))
180
181
181
182
(with-open [ld (start! batch-fn {})]
182
- (println ( deref (load-one ld " clair" ) ))))
183
+ (deref (load-one ld " clair" ))))
0 commit comments