|
| 1 | +(ns taoensso.sente.server-adapters.community.jetty |
| 2 | + "Sente adapter for `ring-jetty-adapter`, |
| 3 | + Ref. <https://github.com/ring-clojure/ring/tree/master/ring-jetty-adapter>. |
| 4 | + Adapted from <https://github.com/taoensso/sente/pull/426#issuecomment-1647231979>." |
| 5 | + {:author "Alex Gunnarson (@alexandergunnarson)"} |
| 6 | + (:require |
| 7 | + [ring.core.protocols :as ring-protocols] |
| 8 | + [ring.util.response :as ring-response] |
| 9 | + [ring.websocket :as ws] |
| 10 | + [ring.websocket.protocols :as ws.protocols] |
| 11 | + [taoensso.sente.interfaces :as i] |
| 12 | + [taoensso.timbre :as timbre]) |
| 13 | + |
| 14 | + (:import |
| 15 | + [org.eclipse.jetty.websocket.api WebSocketListener] |
| 16 | + [ring.websocket.protocols Socket])) |
| 17 | + |
| 18 | +;;;; WebSockets |
| 19 | + |
| 20 | +(extend-type WebSocketListener |
| 21 | + i/IServerChan |
| 22 | + (sch-open? [sch] (.isOpen sch)) |
| 23 | + (sch-close! [sch] (.close sch)) |
| 24 | + (sch-send! [sch-listener _websocket? msg] |
| 25 | + (ws.protocols/-send sch-listener msg) |
| 26 | + true)) |
| 27 | + |
| 28 | +(extend-type Socket |
| 29 | + i/IServerChan |
| 30 | + (sch-open? [sch] (ws.protocols/-open? sch)) |
| 31 | + (sch-close! [sch] (ws.protocols/-close sch nil nil)) |
| 32 | + (sch-send! [sch-socket _websocket? msg] |
| 33 | + (ws.protocols/-send sch-socket msg) |
| 34 | + true)) |
| 35 | + |
| 36 | +(defn- respond-ws |
| 37 | + [{:keys [websocket-subprotocols]} |
| 38 | + {:keys [on-close on-error on-msg on-open]}] |
| 39 | + |
| 40 | + {:ring.websocket/protocol (first websocket-subprotocols) |
| 41 | + :ring.websocket/listener |
| 42 | + (reify ws.protocols/Listener |
| 43 | + (on-close [_ sch status _] (on-close sch true status)) |
| 44 | + (on-error [_ sch error] (on-error sch true error)) |
| 45 | + (on-message [_ sch msg] (on-msg sch true msg)) |
| 46 | + (on-open [_ sch] (on-open sch true)) |
| 47 | + (on-pong [_ sch data]))}) |
| 48 | + |
| 49 | +;;;; Ajax |
| 50 | + |
| 51 | +(defprotocol ISenteJettyAjaxChannel |
| 52 | + (ajax-read! [sch])) |
| 53 | + |
| 54 | +(deftype SenteJettyAjaxChannel [resp-promise_ open?_ on-close adapter-opts] |
| 55 | + i/IServerChan |
| 56 | + (sch-send! [sch _websocket? msg] (deliver resp-promise_ msg) (i/sch-close! sch)) |
| 57 | + (sch-open? [sch] @open?_) |
| 58 | + (sch-close! [sch] |
| 59 | + (when (compare-and-set! open?_ true false) |
| 60 | + (deliver resp-promise_ nil) |
| 61 | + (when on-close (on-close sch false nil)) |
| 62 | + true)) |
| 63 | + |
| 64 | + ISenteJettyAjaxChannel |
| 65 | + (ajax-read! [_sch] |
| 66 | + (let [{:keys [ajax-resp-timeout-ms]} adapter-opts] |
| 67 | + (if ajax-resp-timeout-ms |
| 68 | + (deref resp-promise_ ajax-resp-timeout-ms nil) |
| 69 | + (deref resp-promise_))))) |
| 70 | + |
| 71 | +(defn- ajax-ch |
| 72 | + [{:keys [on-open on-close]} adapter-opts] |
| 73 | + (let [open?_ (atom true) |
| 74 | + sch (SenteJettyAjaxChannel. (promise) open?_ on-close adapter-opts)] |
| 75 | + (when on-open (on-open sch false)) |
| 76 | + sch)) |
| 77 | + |
| 78 | +(extend-protocol ring-protocols/StreamableResponseBody |
| 79 | + SenteJettyAjaxChannel |
| 80 | + (write-body-to-stream [body _response ^java.io.OutputStream output-stream] |
| 81 | + ;; Use `future` because `output-stream` might block the thread, |
| 82 | + ;; Ref. <https://github.com/ring-clojure/ring/issues/254#issuecomment-236048380> |
| 83 | + (future |
| 84 | + (try |
| 85 | + (.write output-stream (.getBytes ^String (ajax-read! body) java.nio.charset.StandardCharsets/UTF_8)) |
| 86 | + (.flush output-stream) |
| 87 | + (catch Throwable t |
| 88 | + (timbre/error t)) |
| 89 | + (finally |
| 90 | + (.close output-stream)))))) |
| 91 | + |
| 92 | +;;;; Adapter |
| 93 | + |
| 94 | +(deftype JettyServerChanAdapter [adapter-opts] |
| 95 | + i/IServerChanAdapter |
| 96 | + (ring-req->server-ch-resp [_ request callbacks-map] |
| 97 | + (if (ws/upgrade-request? request) |
| 98 | + (respond-ws request callbacks-map) |
| 99 | + (ring-response/response (ajax-ch callbacks-map adapter-opts))))) |
| 100 | + |
| 101 | +(defn get-sch-adapter |
| 102 | + "Returns an Jetty ServerChanAdapter. Options: |
| 103 | + `:ajax-resp-timeout-ms` - Max msecs to wait for Ajax responses (default 60 secs)" |
| 104 | + ([] (get-sch-adapter nil)) |
| 105 | + ([{:as opts |
| 106 | + :keys [ajax-resp-timeout-ms] |
| 107 | + :or {ajax-resp-timeout-ms (* 60 1000)}}] |
| 108 | + |
| 109 | + (JettyServerChanAdapter. |
| 110 | + (assoc opts |
| 111 | + :ajax-resp-timeout-ms ajax-resp-timeout-ms)))) |
0 commit comments