|
| 1 | +(ns consul.agent |
| 2 | + (:require [cheshire.core :as json] |
| 3 | + [clojure.core.async :refer [go-loop timeout <!]] |
| 4 | + [clojure.string :as string] |
| 5 | + [clojure.tools.logging :as log] |
| 6 | + [org.httpkit.client :as http])) |
| 7 | + |
| 8 | + |
| 9 | +(def ^:dynamic *consul-url* "http://127.0.0.1:8500/shit") |
| 10 | +(def ^:private alive (atom #{})) |
| 11 | + |
| 12 | + |
| 13 | +(defn- default-callback [{:keys [status] :as response}] |
| 14 | + (when (or |
| 15 | + (nil? status) |
| 16 | + (>= status 400)) |
| 17 | + (log/warn "Error requesting consul agent: " response))) |
| 18 | + |
| 19 | + |
| 20 | +(defn- http-get [path] |
| 21 | + (log/debug "GET" (str *consul-url* "/v1/agent" path)) |
| 22 | + (http/request {:url (str *consul-url* "/v1/agent" path)})) |
| 23 | + |
| 24 | + |
| 25 | +(defn- http-put [path body callback] |
| 26 | + (log/debug "PUT" (str *consul-url* "/v1/agent" path) "\n" (json/encode body {:pretty true})) |
| 27 | + (http/request {:method :put |
| 28 | + :url (str *consul-url* "/v1/agent" path) |
| 29 | + :body (if body (json/encode body) "")} |
| 30 | + (or callback default-callback))) |
| 31 | + |
| 32 | + |
| 33 | +(defn service-list [] |
| 34 | + (http-get "/services")) |
| 35 | + |
| 36 | + |
| 37 | +(defn service-details [service-id] |
| 38 | + (http-get (str "/service/" service-id))) |
| 39 | + |
| 40 | + |
| 41 | +(defn service-register [request & [callback]] |
| 42 | + (http-put "/service/register" request callback)) |
| 43 | + |
| 44 | + |
| 45 | +(defn service-deregister [service-id & [callback]] |
| 46 | + (http-put (str "/service/deregister/" service-id) nil callback)) |
| 47 | + |
| 48 | + |
| 49 | +(defn check-update [check-id status & [callback]] |
| 50 | + {:pre [(#{:passing :warning :critical} status)]} |
| 51 | + (http-put (str "/check/update/" check-id) {:status status} callback)) |
| 52 | + |
| 53 | + |
| 54 | +(defn check-update-with-register |
| 55 | + "check update with registration if not registered yet" |
| 56 | + [check-id check-status register-request] |
| 57 | + (let [consul-url *consul-url*] |
| 58 | + (check-update check-id check-status |
| 59 | + (fn [{:keys [status body] :as response}] |
| 60 | + (log/debug "-->" status body) |
| 61 | + (if (and status |
| 62 | + (>= status 400) |
| 63 | + (string/includes? body (format "\"%s\"" check-id)) |
| 64 | + (string/includes? body "TTL")) |
| 65 | + (binding [*consul-url* consul-url] |
| 66 | + (service-register register-request |
| 67 | + (fn [{:keys [status] :as response}] |
| 68 | + (if (= status 200) |
| 69 | + (binding [*consul-url* consul-url] |
| 70 | + (check-update check-id check-status)) |
| 71 | + (default-callback response))))) |
| 72 | + (default-callback response)))))) |
| 73 | + |
| 74 | + |
| 75 | +(defn heartbeat |
| 76 | + "single heartbeat" |
| 77 | + [{:keys [id name address port ttl deregister-critical-service-after]}] |
| 78 | + (let [check-id (str id ":ttl-check")] |
| 79 | + (check-update-with-register check-id :passing |
| 80 | + {:id id |
| 81 | + :name (or name id) |
| 82 | + :address address |
| 83 | + :port port |
| 84 | + :check {:CheckId check-id |
| 85 | + :TTL ttl |
| 86 | + :DeregisterCriticalServiceAfter deregister-critical-service-after}}))) |
| 87 | + |
| 88 | + |
| 89 | +(defn start-heartbeat [{:keys [id interval-ms] :as params}] |
| 90 | + (service-deregister id) |
| 91 | + (swap! alive #(conj % [*consul-url* id])) |
| 92 | + (go-loop [] |
| 93 | + (when (@alive [*consul-url* id]) |
| 94 | + (heartbeat params) |
| 95 | + (<! (timeout interval-ms)) |
| 96 | + (recur)))) |
| 97 | + |
| 98 | + |
| 99 | +(defn stop-heartbeat [service-id] |
| 100 | + (swap! alive #(disj % [*consul-url* service-id])) |
| 101 | + (service-deregister service-id)) |
0 commit comments