Skip to content

Commit

Permalink
Run the receive-queue workers without bindings (#709)
Browse files Browse the repository at this point in the history
  • Loading branch information
dwwoelfel authored Jan 14, 2025
1 parent d4b357b commit a970c83
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 31 deletions.
66 changes: 36 additions & 30 deletions server/src/instant/grouped_queue.clj
Original file line number Diff line number Diff line change
Expand Up @@ -151,38 +151,41 @@
process-fn
max-workers]
:or {max-workers 2}}]
(let [workers (atom #{})
(let [executor (ua/make-virtual-thread-executor)
workers (atom #{})
;; Use a promise so we can access it in the `on-add` function
grouped-queue (promise)
on-add (fn []
(when (< (count @workers) max-workers)
(ua/vfuture
(loop [worker-id (Object.)]
(when (contains? (swap! workers
(fn [workers]
(if (= (count workers) max-workers)
workers
(conj workers worker-id))))
worker-id)
(try
(loop []
(when (process! @grouped-queue {:reserve-fn reserve-fn
:process-fn process-fn})
;; Continue processing items until the queue is empty
(recur)))
(catch Throwable t
(tracer/record-exception-span! t {:name "grouped-queue/process-error"}))
(finally
(swap! workers disj worker-id)))
;; One last check to prevent a race where something is added to the queue
;; while we're removing ourselves from the workers
(when (and (peek @grouped-queue)
(< (count @workers) max-workers))
(recur worker-id)))))))]
(ua/worker-vfuture
executor
(loop [worker-id (Object.)]
(when (contains? (swap! workers
(fn [workers]
(if (= (count workers) max-workers)
workers
(conj workers worker-id))))
worker-id)
(try
(loop []
(when (process! @grouped-queue {:reserve-fn reserve-fn
:process-fn process-fn})
;; Continue processing items until the queue is empty
(recur)))
(catch Throwable t
(tracer/record-exception-span! t {:name "grouped-queue/process-error"}))
(finally
(swap! workers disj worker-id)))
;; One last check to prevent a race where something is added to the queue
;; while we're removing ourselves from the workers
(when (and (peek @grouped-queue)
(< (count @workers) max-workers))
(recur worker-id)))))))]
(deliver grouped-queue (create {:group-fn group-fn
:on-add on-add}))
{:grouped-queue @grouped-queue
:get-worker-count (fn [] (count @workers))}))
:get-worker-count (fn [] (count @workers))
:virtual-thread-executor executor}))

(comment
(def gq (create {:group-fn :k}))
Expand All @@ -205,6 +208,8 @@
#_(Thread/sleep 10000)
(println "done"))}))

(require 'clojure.tools.logging)

(defn test-grouped-queue []
(let [finished (promise)
started (promise)
Expand All @@ -217,15 +222,15 @@
(inflight-queue-reserve (max 1 (rand-int 25)) iq))
:process-fn (fn [_ workset]
@started
(tracer/record-info! {:name "workset"
:attributes {:workset-count (count workset)
:total total-items
:worker-count ((:get-worker-count @gq))}})
(clojure.tools.logging/info {:name "workset"
:attributes {:workset-count (count workset)
:total total-items
:worker-count ((:get-worker-count @gq))}})
(.addAndGet process-total (count workset))
(when (zero? (.addAndGet total-items (- (count workset))))
(deliver finished true))
nil)
:max-workers 100})
:max-workers 1000})
_ (deliver gq q)

wait (future
Expand All @@ -247,4 +252,5 @@
(deliver started true)
(tool/def-locals)
@wait))

(test-grouped-queue))
29 changes: 28 additions & 1 deletion server/src/instant/util/async.clj
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@
;; ---------------
;; virtual-threads

(def ^ExecutorService default-virtual-thread-executor (Executors/newVirtualThreadPerTaskExecutor))
(defn make-virtual-thread-executor ^ExecutorService []
(Executors/newVirtualThreadPerTaskExecutor))

(defonce ^ExecutorService default-virtual-thread-executor (make-virtual-thread-executor))

(defn ^:private deref-future
"Private function copied from clojure.core;
Expand All @@ -56,6 +59,30 @@
(catch java.util.concurrent.TimeoutException _
timeout-val))))

(defn worker-vfuture-call [^ExecutorService executor f]
(let [fut (.submit executor ^Callable f)]
(reify
clojure.lang.IDeref
(deref [_] (deref-future fut))
clojure.lang.IBlockingDeref
(deref [_ timeout-ms timeout-val]
(deref-future fut timeout-ms timeout-val))
clojure.lang.IPending
(isRealized [_] (.isDone fut))
java.util.concurrent.Future
(get [_] (.get fut))
(get [_ timeout unit] (.get fut timeout unit))
(isCancelled [_] (.isCancelled fut))
(isDone [_] (.isDone fut))
(cancel [_ interrupt?]
(.cancel fut interrupt?)))))

(defmacro worker-vfuture
"Creates a vfuture that does not propagate bindings and does not
track immediate children. Useful for starting a background worker."
[^ExecutorService executor & body]
`(worker-vfuture-call ~executor (^{:once true} fn* [] ~@body)))

;; Keeps track of child futures so that we can cancel them if the
;; parent is canceled.
(def ^:dynamic *child-vfutures* nil)
Expand Down

0 comments on commit a970c83

Please sign in to comment.