diff --git a/awa-mirage.opam b/awa-conduit.opam similarity index 88% rename from awa-mirage.opam rename to awa-conduit.opam index 78eced5..b40da05 100644 --- a/awa-mirage.opam +++ b/awa-conduit.opam @@ -20,9 +20,9 @@ depends: [ "cstruct" {>= "1.9.0"} "mtime" "lwt" - "mirage-flow" {>= "2.0.0"} + "conduit" "mirage-clock" {>= "3.0.0"} "logs" ] synopsis: "SSH implementation in OCaml" -description: """The OpenSSH protocol implemented in OCaml.""" \ No newline at end of file +description: """The OpenSSH protocol implemented in OCaml.""" diff --git a/conduit/awa_conduit.ml b/conduit/awa_conduit.ml new file mode 100644 index 0000000..92703c0 --- /dev/null +++ b/conduit/awa_conduit.ml @@ -0,0 +1,185 @@ +type endpoint = + { authenticator : Awa.Keys.authenticator option + ; user : string + ; key : Awa.Hostkey.priv + ; req : Awa.Ssh.channel_request } + +module Make + (IO : Conduit.IO) + (Conduit : Conduit.S + with type input = Cstruct.t + and type output = Cstruct.t + and type +'a io = 'a IO.t) + (M : Mirage_clock.MCLOCK) += struct + let return x = IO.return x + let ( >>= ) x f = IO.bind x f + let ( >>| ) x f = x >>= fun x -> return (f x) + let ( >>? ) x f = x >>= function + | Ok x -> f x + | Error _ as err -> return err + + let reword_error f = function + | Ok _ as v -> v + | Error err -> Error (f err) + + type 'flow protocol_with_ssh = { + mutable ssh : Awa.Client.t ; + mutable uid : int32 option ; + mutable exited : int32 option ; + mutable closed : bool ; + raw : Cstruct.t ; + flow : 'flow ; + queue : (char, Bigarray.int8_unsigned_elt) Ke.Rke.t ; + } + + let src = Logs.Src.create "conduit-ssh" + + module Log = (val Logs.src_log src : Logs.LOG) + + module Make_protocol + (Flow : Conduit.PROTOCOL + with type input = Conduit.input + and type output = Conduit.output + and type +'a io = 'a IO.t) = + struct + type input = Conduit.input + type output = Conduit.output + type +'a io = 'a Conduit.io + + type nonrec endpoint = Flow.endpoint * endpoint + + type flow = Flow.flow protocol_with_ssh + + type error = + [ `Flow of Flow.error + | `SSH of string + | `Closed_by_peer + | `Handshake_aborted ] + + let pp_error : error Fmt.t = fun ppf -> function + | `Flow err -> Flow.pp_error ppf err + | `SSH err -> Fmt.string ppf err + | `Closed_by_peer -> Fmt.string ppf "Closed by peer" + | `Handshake_aborted -> Fmt.string ppf "Handshake aborted" + + let flow_error err = `Flow err + + let writev flow cs = + let rec one v = + if Cstruct.len v = 0 then return (Ok ()) + else Flow.send flow v >>? fun len -> + one (Cstruct.shift v len) + and go = function + | [] -> return (Ok ()) + | x :: r -> one x >>? fun () -> go r in + go cs + + let blit src src_off dst dst_off len = + let src = Cstruct.to_bigarray src in + Bigstringaf.blit src ~src_off dst ~dst_off ~len + + let write queue v = + Log.debug (fun m -> m "Got %S." (Cstruct.to_string v)) ; + Ke.Rke.N.push queue ~blit ~length:Cstruct.len ~off:0 v + + let handle_event t = function + | `Established uid -> t.uid <- Some uid + | `Channel_data (uid, data) -> + if Option.(fold ~none:false ~some:(Int32.equal uid) t.uid) + then write t.queue data else () + | `Channel_eof uid -> + if Option.(fold ~none:false ~some:(Int32.equal uid) t.uid) + then t.closed <- true else () + | `Channel_exit_status (uid, n) -> + if Option.(fold ~none:false ~some:(Int32.equal uid) t.uid) + then t.exited <- Some n else () + | `Disconnected -> t.uid <- None + + let rec handle t = + Flow.recv t.flow t.raw >>| reword_error flow_error >>? function + | `End_of_flow -> + Log.debug (fun m -> m "Underlying connection returns [`End_of_flow].") ; + t.uid <- None ; + t.closed <- true ; + return (Ok ()) + | `Input len -> + let raw = Cstruct.sub t.raw 0 len in + match t.uid, Awa.Client.incoming t.ssh (Mtime.of_uint64_ns (M.elapsed_ns ())) raw with + | _, Error err -> return (Error (`SSH err)) + | None, Ok (ssh, out, events) -> + List.iter (handle_event t) events ; t.ssh <- ssh ; + writev t.flow out >>| reword_error flow_error >>? fun () -> + if Option.is_none t.uid && not t.closed + then handle t else return (Ok ()) + | Some _, Ok (ssh, out, events) -> + List.iter (handle_event t) events ; t.ssh <- ssh ; + writev t.flow out >>| reword_error flow_error >>? fun () -> + return (Ok ()) + + let connect (edn, { authenticator; user; key; req; }) = + Log.debug (fun m -> m "Start a SSH connection with a peer.") ; + Flow.connect edn >>| reword_error flow_error >>? fun flow -> + Log.debug (fun m -> m "Connected to our peer.") ; + let ssh, bufs = Awa.Client.make ?authenticator ~user key in + Log.debug (fun m -> m "SSH State initialized.") ; + let raw = Cstruct.create 0x1000 in + let queue = Ke.Rke.create ~capacity:0x1000 Bigarray.Char in + Log.debug (fun m -> m "Start a handshake SSH.") ; + writev flow bufs >>| reword_error flow_error >>? fun () -> + let t = { ssh; uid= None; closed= false; exited= None; flow; raw; queue; } in + handle t >>? fun () -> + match t.uid with + | None -> t.closed <- true ; return (Error `Handshake_aborted) + | Some uid -> + Log.debug (fun m -> m "Handshake is done.") ; + match Awa.Client.outgoing_request t.ssh ~id:uid req with + | Error err -> return (Error (`SSH err)) + | Ok (ssh, out) -> + t.ssh <- ssh ; writev flow [ out ] >>| reword_error flow_error >>? fun () -> + return (Ok t) + + let blit src src_off dst dst_off len = + let dst = Cstruct.to_bigarray dst in + Bigstringaf.blit src ~src_off dst ~dst_off ~len + + let rec recv t raw = + Log.debug (fun m -> m "Start to read incoming data.") ; + match Ke.Rke.N.peek t.queue with + | [] -> + if not t.closed + then handle t >>? fun () -> recv t raw + else return (Ok `End_of_flow) + | _ -> + let max = Cstruct.len raw in + let len = min (Ke.Rke.length t.queue) max in + Ke.Rke.N.keep_exn t.queue ~blit ~length:Cstruct.len ~off:0 ~len raw ; + Ke.Rke.N.shift_exn t.queue len ; + return (Ok (`Input len)) + + let send t raw = + if t.closed + then return (Error `Closed_by_peer) + else + ( Log.debug (fun m -> m "Start encrypt outgoing data.\n%!" ) + ; Log.debug (fun m -> m "Send %S." (Cstruct.to_string raw)) + ; match Awa.Client.outgoing_data t.ssh raw with + | Ok (ssh, out) -> + writev t.flow out >>| reword_error flow_error >>? fun () -> + t.ssh <- ssh ; return (Ok (Cstruct.len raw)) + | Error err -> + return (Error (`SSH err)) ) + + let close t = + t.closed <- true ; Flow.close t.flow >>| reword_error flow_error + end + + let protocol_with_ssh : + type edn flow. + (edn, flow) Conduit.protocol -> + (edn * endpoint, flow protocol_with_ssh) Conduit.protocol = + fun protocol -> + let module Flow = (val (Conduit.impl protocol)) in + let module M = Make_protocol (Flow) in + Conduit.register ~protocol:(module M) +end diff --git a/conduit/awa_conduit.mli b/conduit/awa_conduit.mli new file mode 100644 index 0000000..c402883 --- /dev/null +++ b/conduit/awa_conduit.mli @@ -0,0 +1,19 @@ +type endpoint = + { authenticator : Awa.Keys.authenticator option + ; user : string + ; key : Awa.Hostkey.priv + ; req : Awa.Ssh.channel_request } + +module Make + (IO : Conduit.IO) + (Conduit : Conduit.S + with type input = Cstruct.t + and type output = Cstruct.t + and type +'a io = 'a IO.t) + (M : Mirage_clock.MCLOCK) : sig + type 'flow protocol_with_ssh + + val protocol_with_ssh : + ('edn, 'flow) Conduit.protocol -> + ('edn * endpoint, 'flow protocol_with_ssh) Conduit.protocol +end diff --git a/conduit/dune b/conduit/dune new file mode 100644 index 0000000..95c8ec2 --- /dev/null +++ b/conduit/dune @@ -0,0 +1,4 @@ +(library + (name awa_conduit) + (public_name awa-conduit) + (libraries logs mtime bigstringaf ke mirage-clock conduit awa)) diff --git a/mirage/awa_mirage.ml b/mirage/awa_mirage.ml deleted file mode 100644 index 465fa7f..0000000 --- a/mirage/awa_mirage.ml +++ /dev/null @@ -1,137 +0,0 @@ -open Lwt - -module Make (F : Mirage_flow.S) (M : Mirage_clock.MCLOCK) = struct - - module FLOW = F - - type error = [ `Msg of string - | `Read of F.error - | `Write of F.write_error ] - type write_error = [ Mirage_flow.write_error | error ] - - let pp_error ppf = function - | `Msg e -> Fmt.string ppf e - | `Read e -> F.pp_error ppf e - | `Write e -> F.pp_write_error ppf e - - let pp_write_error ppf = function - | #Mirage_flow.write_error as e -> Mirage_flow.pp_write_error ppf e - | #error as e -> pp_error ppf e - - type flow = { - flow : FLOW.flow ; - mutable state : [ `Active of Awa.Client.t | `Eof | `Error of error ] - } - - let write_flow t buf = - FLOW.write t.flow buf >>= function - | Ok () -> Lwt.return (Ok ()) - | Error w -> t.state <- `Error (`Write w) ; Lwt.return (Error (`Write w)) - - let writev_flow t bufs = - Lwt_list.fold_left_s (fun r d -> - match r with - | Error e -> Lwt.return (Error e) - | Ok () -> write_flow t d) - (Ok ()) bufs - - let read_react t = - match t.state with - | `Eof | `Error _ -> Lwt.return (Error ()) - | `Active _ -> - FLOW.read t.flow >>= function - | Error e -> t.state <- `Error (`Read e) ; Lwt.return (Error ()) - | Ok `Eof -> t.state <- `Eof ; Lwt.return (Error ()) - | Ok (`Data data) -> - match t.state with - | `Active ssh -> - begin match Awa.Client.incoming ssh (Mtime.of_uint64_ns (M.elapsed_ns ())) data with - | Error msg -> t.state <- `Error (`Msg msg) ; Lwt.return (Error ()) - | Ok (ssh', out, events) -> - let state' = if List.mem `Disconnected events then `Eof else `Active ssh' in - t.state <- state'; - writev_flow t out >>= fun _ -> - Lwt.return (Ok events) - end - | _ -> Lwt.return (Error ()) - - let rec drain_handshake t = - read_react t >>= function - | Ok es -> - begin match t.state, List.filter (function `Established _ -> true | _ -> false) es with - | `Eof, _ -> Lwt.return (Error (`Msg "disconnected")) - | `Error e, _ -> Lwt.return (Error e) - | `Active _, [ `Established id ] -> Lwt.return (Ok id) - | `Active _, _ -> drain_handshake t - end - | Error () -> match t.state with - | `Error e -> Lwt.return (Error e) - | `Eof -> Lwt.return (Error (`Msg "disconnected")) - | `Active _ -> assert false - - let rec read t = - read_react t >>= function - | Ok events -> - let r = List.fold_left (fun acc e -> - match acc, e with - | `Data d, `Channel_data (_, more) -> `Data (Cstruct.append d more) - (* TODO verify that received on same channel! *) - | `Data d, _ -> `Data d - | `Nothing, `Channel_data (_, data) -> `Data data - | `Nothing, `Channel_eof _ -> `Eof - | `Nothing, `Disconnected -> `Eof - | a, _ -> a) - `Nothing events - in - begin match r with - | `Nothing -> read t - | `Data _ | `Eof as r -> Lwt.return (Ok r) - end - | Error () -> match t.state with - | `Error e -> Lwt.return (Error e) - | `Eof -> Lwt.return (Ok `Eof) - | `Active _ -> assert false - - let close t = - (* TODO ssh session teardown (send some protocol messages) *) - FLOW.close t.flow >|= fun () -> - t.state <- `Eof - - let writev t bufs = - let open Lwt_result.Infix in - match t.state with - | `Active ssh -> - Lwt_list.fold_left_s (fun r data -> - match r with - | Error e -> Lwt.return (Error e) - | Ok ssh -> - match Awa.Client.outgoing_data ssh data with - | Ok (ssh', datas) -> - t.state <- `Active ssh'; - writev_flow t datas >|= fun () -> - ssh' - | Error msg -> - t.state <- `Error (`Msg msg) ; - Lwt.return (Error (`Msg msg))) - (Ok ssh) bufs >|= fun _ -> () - | `Eof -> Lwt.return (Error `Closed) - | `Error e -> Lwt.return (Error (e :> write_error)) - - let write t buf = writev t [buf] - - let client_of_flow ?authenticator ~user key req flow = - let open Lwt_result.Infix in - let client, msgs = Awa.Client.make ?authenticator ~user key in - let t = { - flow = flow ; - state = `Active client ; - } in - writev_flow t msgs >>= fun () -> - drain_handshake t >>= fun id -> - (* TODO that's a bit hardcoded... *) - let ssh = match t.state with `Active t -> t | _ -> assert false in - (match Awa.Client.outgoing_request ssh ~id req with - | Error msg -> t.state <- `Error (`Msg msg) ; Lwt.return (Error (`Msg msg)) - | Ok (ssh', data) -> t.state <- `Active ssh' ; write_flow t data) >|= fun () -> - t -end diff --git a/mirage/awa_mirage.mli b/mirage/awa_mirage.mli deleted file mode 100644 index 6a0a0be..0000000 --- a/mirage/awa_mirage.mli +++ /dev/null @@ -1,30 +0,0 @@ -(** Effectful operations using Mirage for pure SSH. *) - -(** SSH module given a flow *) -module Make (F : Mirage_flow.S) (M : Mirage_clock.MCLOCK) : sig - - module FLOW : Mirage_flow.S - - (** possible errors: incoming alert, processing failure, or a - problem in the underlying flow. *) - type error = [ `Msg of string - | `Read of F.error - | `Write of F.write_error ] - - type write_error = [ `Closed | error ] - (** The type for write errors. *) - - (** we provide the FLOW interface *) - include Mirage_flow.S - with type error := error - and type write_error := write_error - - (** [client_of_flow ~authenticator ~user key channel_request flow] upgrades the - existing connection to SSH, mutually authenticates, opens a channel and - sends the channel request. *) - val client_of_flow : ?authenticator:Awa.Keys.authenticator -> user:string -> - Awa.Hostkey.priv -> Awa.Ssh.channel_request -> FLOW.flow -> - (flow, error) result Lwt.t - -end - with module FLOW = F diff --git a/mirage/dune b/mirage/dune deleted file mode 100644 index c5ee28d..0000000 --- a/mirage/dune +++ /dev/null @@ -1,5 +0,0 @@ -(library - (name awa_mirage) - (public_name awa-mirage) - (wrapped false) - (libraries awa mirage-flow mirage-clock lwt mtime logs))