diff --git a/ocurrent-plugin/connection.ml b/ocurrent-plugin/connection.ml index 957d9efe..49a5ebcb 100644 --- a/ocurrent-plugin/connection.ml +++ b/ocurrent-plugin/connection.ml @@ -29,6 +29,13 @@ type t = { max_pipeline : int; } +(* Replace this with the version in capnp-rpc 1.2, once released *) +let await_settled_exn t = + Capability.wait_until_settled t >|= fun () -> + match Capability.problem t with + | None -> () + | Some e -> Fmt.failwith "%a" Capnp_rpc.Exception.pp e + (* Return a proxy to the scheduler, starting a new connection if we don't currently have a working one. *) let sched ~job t = @@ -44,7 +51,7 @@ let sched ~job t = Lwt.catch (fun () -> Sturdy_ref.connect_exn conn.sr >>= fun cap -> - Capability.wait_until_settled cap >|= fun () -> + await_settled_exn cap >|= fun () -> cap ) (fun ex -> @@ -107,11 +114,11 @@ let submit ~job ~pool ~action ~cache_hint ?src ?secrets ~urgent t ~priority ~swi let build_job = Cluster_api.Ticket.job ticket in stage := `Get_ticket ticket; (* Allow the user to cancel it now. *) Prometheus.Gauge.inc_one Metrics.queue_get_ticket; - Capability.wait_until_settled ticket >>= fun () -> + await_settled_exn ticket >>= fun () -> Prometheus.Gauge.dec_one Metrics.queue_get_ticket; Current.Job.log job "Waiting for worker..."; Prometheus.Gauge.inc_one Metrics.queue_get_worker; - Capability.wait_until_settled build_job >>= fun () -> + await_settled_exn build_job >>= fun () -> Prometheus.Gauge.dec_one Metrics.queue_get_worker; Capability.dec_ref ticket; stage := `Got_worker;