Skip to content

Commit 980767e

Browse files
committed
run the grouped-queue workers without bindings
1 parent d4b357b commit 980767e

File tree

2 files changed

+63
-31
lines changed

2 files changed

+63
-31
lines changed

server/src/instant/grouped_queue.clj

+36-30
Original file line numberDiff line numberDiff line change
@@ -151,38 +151,41 @@
151151
process-fn
152152
max-workers]
153153
:or {max-workers 2}}]
154-
(let [workers (atom #{})
154+
(let [executor (ua/make-virtual-thread-executor)
155+
workers (atom #{})
155156
;; Use a promise so we can access it in the `on-add` function
156157
grouped-queue (promise)
157158
on-add (fn []
158159
(when (< (count @workers) max-workers)
159-
(ua/vfuture
160-
(loop [worker-id (Object.)]
161-
(when (contains? (swap! workers
162-
(fn [workers]
163-
(if (= (count workers) max-workers)
164-
workers
165-
(conj workers worker-id))))
166-
worker-id)
167-
(try
168-
(loop []
169-
(when (process! @grouped-queue {:reserve-fn reserve-fn
170-
:process-fn process-fn})
171-
;; Continue processing items until the queue is empty
172-
(recur)))
173-
(catch Throwable t
174-
(tracer/record-exception-span! t {:name "grouped-queue/process-error"}))
175-
(finally
176-
(swap! workers disj worker-id)))
177-
;; One last check to prevent a race where something is added to the queue
178-
;; while we're removing ourselves from the workers
179-
(when (and (peek @grouped-queue)
180-
(< (count @workers) max-workers))
181-
(recur worker-id)))))))]
160+
(ua/worker-vfuture
161+
executor
162+
(loop [worker-id (Object.)]
163+
(when (contains? (swap! workers
164+
(fn [workers]
165+
(if (= (count workers) max-workers)
166+
workers
167+
(conj workers worker-id))))
168+
worker-id)
169+
(try
170+
(loop []
171+
(when (process! @grouped-queue {:reserve-fn reserve-fn
172+
:process-fn process-fn})
173+
;; Continue processing items until the queue is empty
174+
(recur)))
175+
(catch Throwable t
176+
(tracer/record-exception-span! t {:name "grouped-queue/process-error"}))
177+
(finally
178+
(swap! workers disj worker-id)))
179+
;; One last check to prevent a race where something is added to the queue
180+
;; while we're removing ourselves from the workers
181+
(when (and (peek @grouped-queue)
182+
(< (count @workers) max-workers))
183+
(recur worker-id)))))))]
182184
(deliver grouped-queue (create {:group-fn group-fn
183185
:on-add on-add}))
184186
{:grouped-queue @grouped-queue
185-
:get-worker-count (fn [] (count @workers))}))
187+
:get-worker-count (fn [] (count @workers))
188+
:virtual-thread-executor executor}))
186189

187190
(comment
188191
(def gq (create {:group-fn :k}))
@@ -205,6 +208,8 @@
205208
#_(Thread/sleep 10000)
206209
(println "done"))}))
207210

211+
(require 'clojure.tools.logging)
212+
208213
(defn test-grouped-queue []
209214
(let [finished (promise)
210215
started (promise)
@@ -217,15 +222,15 @@
217222
(inflight-queue-reserve (max 1 (rand-int 25)) iq))
218223
:process-fn (fn [_ workset]
219224
@started
220-
(tracer/record-info! {:name "workset"
221-
:attributes {:workset-count (count workset)
222-
:total total-items
223-
:worker-count ((:get-worker-count @gq))}})
225+
(clojure.tools.logging/info {:name "workset"
226+
:attributes {:workset-count (count workset)
227+
:total total-items
228+
:worker-count ((:get-worker-count @gq))}})
224229
(.addAndGet process-total (count workset))
225230
(when (zero? (.addAndGet total-items (- (count workset))))
226231
(deliver finished true))
227232
nil)
228-
:max-workers 100})
233+
:max-workers 1000})
229234
_ (deliver gq q)
230235

231236
wait (future
@@ -247,4 +252,5 @@
247252
(deliver started true)
248253
(tool/def-locals)
249254
@wait))
255+
250256
(test-grouped-queue))

server/src/instant/util/async.clj

+27-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,10 @@
4343
;; ---------------
4444
;; virtual-threads
4545

46-
(def ^ExecutorService default-virtual-thread-executor (Executors/newVirtualThreadPerTaskExecutor))
46+
(defn make-virtual-thread-executor ^ExecutorService []
47+
(Executors/newVirtualThreadPerTaskExecutor))
48+
49+
(defonce ^ExecutorService default-virtual-thread-executor (make-virtual-thread-executor))
4750

4851
(defn ^:private deref-future
4952
"Private function copied from clojure.core;
@@ -56,6 +59,29 @@
5659
(catch java.util.concurrent.TimeoutException _
5760
timeout-val))))
5861

62+
(defn worker-vfuture-call [^ExecutorService executor f]
63+
(let [fut (.submit executor ^Callable f)]
64+
(reify
65+
clojure.lang.IDeref
66+
(deref [_] (deref-future fut))
67+
clojure.lang.IBlockingDeref
68+
(deref [_ timeout-ms timeout-val]
69+
(deref-future fut timeout-ms timeout-val))
70+
clojure.lang.IPending
71+
(isRealized [_] (.isDone fut))
72+
java.util.concurrent.Future
73+
(get [_] (.get fut))
74+
(get [_ timeout unit] (.get fut timeout unit))
75+
(isCancelled [_] (.isCancelled fut))
76+
(isDone [_] (.isDone fut))
77+
(cancel [_ interrupt?]
78+
(.cancel fut interrupt?)))))
79+
80+
(defmacro worker-vfuture
81+
"Creates a "
82+
[^ExecutorService executor & body]
83+
`(worker-vfuture-call ~executor (^{:once true} fn* [] ~@body)))
84+
5985
;; Keeps track of child futures so that we can cancel them if the
6086
;; parent is canceled.
6187
(def ^:dynamic *child-vfutures* nil)

0 commit comments

Comments
 (0)