From 0932bb2f737110f3d046dda3c3b6cf234262dda3 Mon Sep 17 00:00:00 2001 From: Rob Browning Date: Thu, 7 Sep 2023 11:30:21 -0500 Subject: [PATCH] (PDB-5696) query.monitor: accommodate query socket reuse Previously, any query a client issued after the first on a given connection might provoke a CancelledKeyException when it tried to register the query/socket/channel with the monitor because the selection key for a given channel for a given selector is unique, and a cancelled key is only cleaned up during the next call to select for the relevant selector. Any attempt to register a channel with a selector between the cancellation of the previous key for that channel and the clean up in the next select will provoke the CancelledKeyException. To address the issue, adjust the registration code to retry whenever it encounters that exception until it succeeds. While it's retrying, the monitor select loop, which will have been woken up by the previous query's forget, will issue a new select call, removing the canceled key. Fixes: https://github.com/puppetlabs/puppetdb/issues/3866 --- src/puppetlabs/puppetdb/query/monitor.clj | 50 ++++++++++++++++--- .../puppetdb/query/monitor_test.clj | 31 ++++++++++++ 2 files changed, 73 insertions(+), 8 deletions(-) diff --git a/src/puppetlabs/puppetdb/query/monitor.clj b/src/puppetlabs/puppetdb/query/monitor.clj index 4dc7168359..c78f971bf6 100644 --- a/src/puppetlabs/puppetdb/query/monitor.clj +++ b/src/puppetlabs/puppetdb/query/monitor.clj @@ -39,12 +39,16 @@ some cases customizably) time out. Every monitored query will have a SelectionKey associated with it, - and that key should be canceled by the monitor code once the query - has been abandoned or has timed out. + The key is cancelled during forget, but won't be removed from the + selector's cancelled set until the next call to select. During that + time, another query on the same socket/connection could try to + re-register the cancelled key. This will throw an exception, which + we suppress and retry until the select loop finally removes the + cancelled key, and we can re-register the socket. Every monitored query may also have a postgres pid associated with it, and whenever it does, that pid should be terminated (in - corrdination with the :terminated promise) once the query has been + coordination with the :terminated promise) once the query has been abandoned or has timed out. The terminated promise and :forget value coordinate between the @@ -56,7 +60,20 @@ The client socket monitoring depends on access to the jetty query response which (at least at the moment) provides indirect access to the java socket channel which can be read to determine whether the - client is still connected." + client is still connected. + + The current implementation is completely incompatible with http + \"pipelining\", but it looks like that is no longer a realistic + concern: + https://daniel.haxx.se/blog/2019/04/06/curl-says-bye-bye-to-pipelining/ + + If that turns out to be an incorrect assumption, then we'll have to + reevaluate the implementation and/or feasibility of the monitoring. + That's because so far, the only way we've found to detect a client + disconnection is to attempt to read a byte. At the moment, that's + acceptable because the client shouldn't be sending any data during + the response (which of course wouldn't be true with pipelining, + where it could be sending additional requests)." (:require [clojure.tools.logging :as log] @@ -331,12 +348,29 @@ (.join thread)) (not (.isAlive thread)))))) +(defn- register-selector + "Loops until the channel is registered with the selector while + ignoring canceled key exceptions, which should only occur when a + client is re-using the channel, and the previous query has called + forget (and cancelled the key), and the main select loop hasn't + removed it from the cancelled set yet. Returns the new + SelectionKey." + [channel selector ops] + (or (try + (.register ^SelectableChannel channel selector ops) + (catch CancelledKeyException _)) + (do + ;; The competing forget should have already called wakeup. + ;; REVIEW: 10? + (Thread/sleep 10) + (recur channel selector ops)))) + (defn stop-query-at-deadline-or-disconnect [{:keys [^Selector selector queries ^Thread thread] :as _monitor} id ^SelectableChannel channel deadline-ns db] (assert (.isAlive thread)) (assert (instance? SelectableChannel channel)) - (let [select-key (.register channel selector SelectionKey/OP_READ) + (let [select-key (register-selector channel selector SelectionKey/OP_READ) info {:query-id id :selection-key select-key :deadline-ns deadline-ns @@ -355,7 +389,7 @@ (-> prev (update :selector-keys assoc select-key info) (update :deadlines conj [[deadline-ns select-key] info])))) - (.wakeup selector) ;; so it can recompute deadline + (.wakeup selector) ;; to recompute deadline and start watching select-key select-key)) (defn register-pg-pid @@ -386,8 +420,6 @@ [{:keys [queries ^Selector selector ^Thread thread] :as _monitor} ^SelectionKey select-key] (assert (.isAlive thread)) - ;; After this some key methods will throw CancelledKeyException - (.cancel select-key) (let [maybe-await-termination (atom nil)] (swap! queries (fn [{:keys [selector-keys] :as state}] @@ -399,6 +431,8 @@ (update :selector-keys assoc select-key info) (update :deadlines assoc [deadline-ns select-key] info))) state))) + (.cancel select-key) + (.wakeup selector) ;; clear out the cancelled keys (see stop-query-at-...) (if (= ::timeout (some-> @maybe-await-termination (deref 2000 ::timeout))) ::timeout true))) diff --git a/test/puppetlabs/puppetdb/query/monitor_test.clj b/test/puppetlabs/puppetdb/query/monitor_test.clj index c91a4842a8..c7f6f451ce 100644 --- a/test/puppetlabs/puppetdb/query/monitor_test.clj +++ b/test/puppetlabs/puppetdb/query/monitor_test.clj @@ -1,6 +1,7 @@ (ns puppetlabs.puppetdb.query.monitor-test (:require [clj-http.client :as http] + [clojure.java.shell :refer [sh]] [clojure.string :as str] [clojure.test :refer :all] [murphy :refer [try!]] @@ -280,3 +281,33 @@ (update :info dissoc :query-id) (update-in [:info :pg-pid] int?)) summary))))))) + +(deftest connection-reuse + ;; Test multiple queries over the same connection. Run the test + ;; multiple times because the first problem we encountered could + ;; only occur during a narrow window in the monitor loop if a new + ;; request came in between select invocations, after the key had + ;; been cancelled. + ;; https://github.com/puppetlabs/puppetdb/issues/3866 + (with-puppetdb nil + (jdbc/with-db-transaction [] (add-certnames certnames)) + ;; Just use curl, since it's trivally easy to get it to do what we + ;; need, and we already require it for test setup (via ext/). (It + ;; looks like both clj-http and the JDK HttpClient have more + ;; indirect control over reuse.) + (let [nodes (-> (assoc *base-url* :prefix "/pdb/query/v4/nodes") + base-url->str-with-prefix) + cmd ["curl" "--no-progress-meter" "--show-error" "--fail-early" + "--fail" nodes "-o" "/dev/null" + "--next" "--fail" nodes "-o" "/dev/null" + "--next" "--fail" nodes "-o" "/dev/null"]] + (loop [i 0] + (let [{:keys [exit out err]} (apply sh cmd)] + (prn :loop [cmd exit err]) + (when (< i 10) + (if (is (= 0 exit)) + (recur (inc i)) + (do + (apply println "Failed:" cmd) + (print out) + (print err)))))))))