From 7d2777d789530e2104b6d9252cdca0d6f8078acb Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Fri, 9 Jan 2015 14:44:31 +0000 Subject: [PATCH 1/2] Copy out-bound data into pre-shared pages The original motivation for this is to make TLS support work. Currently, TLS copies the encrypted buffers into a large Io_page and TCP sends segment-sized Cstruct views of this buffer to the network. If any of these overlap a page boundary, the send will fail. With this patch, TLS can send ordinary unaligned buffers to TCP (avoiding a copy). The copy now happens in Netif. This some other advantages: - It avoids splitting requests across multiple grants (before, we sent the IP header and payload in separate pages). - It avoids the security problem of sharing unrelated data that happens to be in the same page. Now, netback will only ever see data explicitly sent to it. - It allows the sender to reuse pages as soon as Netif.write returns. Before, the pages were queued and the application could change them even as netback was reading them. Behaviour should now be deterministic. - It's faster (132 MB/s -> 181 MB/s for an x86_64 unikernel running under Xen in VirtualBox on my laptop). --- CHANGES | 4 + _tags | 1 + lib/netif.ml | 246 ++++++++++++++++++++++++++++++++++----------------- 3 files changed, 172 insertions(+), 79 deletions(-) diff --git a/CHANGES b/CHANGES index 82b3256..d02d378 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,7 @@ +* When waiting for space in the transmit queue, we would sometimes fail to notice when + space became available. +* Copy out-bound data into pre-shared pages for performance, security and simplicity. + 1.2.0 (2014-12-17): * Add profiling tracepoints and labels (#13). Introduces a dependency on `mirage-profile`. * New `opam` file present in source repository for OPAM 1.2 workflow. diff --git a/_tags b/_tags index 0156d2a..3748a81 100644 --- a/_tags +++ b/_tags @@ -26,3 +26,4 @@ true: annot, bin_annot : pkg_xen-gnt # OASIS_STOP true: annot, bin_annot, debug, principal +true: warn(A-27-32-34-37), strict_sequence diff --git a/lib/netif.ml b/lib/netif.ml index e0af711..241b8eb 100644 --- a/lib/netif.ml +++ b/lib/netif.ml @@ -131,6 +131,87 @@ module TX = struct return (rx_gnt, fring, client) end +module Shared_page_pool : sig + type t + val make : (Gnt.gntref -> Io_page.t -> unit) -> t + val block_size : int + + val use : t -> (Gnt.gntref -> Cstruct.t -> ('a * unit Lwt.t) Lwt.t) -> ('a * unit Lwt.t) Lwt.t + (** [use t fn] calls [fn gref block] with a free shared block of memory. + * The function should return a thread that indicates when the request has + * been added to the queue, by returning a result value and a second thread + * indicating when the block can be returned to the pool. *) + + val blocks_needed : int -> int + + val shutdown : t -> unit +end = struct + type block = Gnt.gntref * Cstruct.t + type t = { + grant : Gnt.gntref -> Io_page.t -> unit; + mutable blocks : block list; + mutable in_use : int; + mutable shutdown : bool; + } + + let page_size = Io_page.round_to_page_size 1 + let block_size = page_size / 2 + + let make grant = { grant; blocks = []; shutdown = false; in_use = 0 } + + let shutdown t = + t.shutdown <- true; + if t.in_use = 0 then ( + t.blocks |> List.iter (fun (gref, block) -> + if block.Cstruct.off = 0 then ( + Gnt.Gntshr.end_access gref; + Gnt.Gntshr.put gref; + ) + ); + t.blocks <- [] + ) + (* Otherwise, shutdown gets called again when in_use becomes 0 *) + + let alloc t = + let page = Io_page.get 1 in + (* (the Xen version of caml_alloc_pages clears the page, so we don't have to) *) + lwt gnt = Gnt.Gntshr.get () in + t.grant gnt page; + return (gnt, Io_page.to_cstruct page) + + let put t block = + t.blocks <- block :: t.blocks; + t.in_use <- t.in_use - 1; + if t.in_use = 0 && t.shutdown then shutdown t + + let use t fn = + if t.shutdown then + failwith "Shared_page_pool.use after shutdown"; + lwt (gntref, block) as grant = + match t.blocks with + | [] -> + (* Frames normally fit within 2048 bytes, so we split each page in half. *) + lwt gntref, page = alloc t in + let b1 = Cstruct.sub page 0 block_size in + let b2 = Cstruct.shift page block_size in + t.blocks <- (gntref, b2) :: t.blocks; + return (gntref, b1) + | hd :: tl -> + t.blocks <- tl; + return hd in + t.in_use <- t.in_use + 1; + lwt (_, release) as result = + try_lwt fn gntref block + with ex -> + put t grant; + raise ex in + Lwt.on_termination release (fun () -> put t grant); + return result + + let blocks_needed bytes = + (bytes + block_size - 1) / block_size +end + type features = { sg: bool; gso_tcpv4: bool; @@ -154,6 +235,7 @@ type transport = { tx_client: (TX.response,int) Lwt_ring.Front.t; tx_gnt: Gnt.gntref; tx_mutex: Lwt_mutex.t; (* Held to avoid signalling between fragments *) + tx_pool: Shared_page_pool.t; rx_fring: (RX.response,int) Ring.Rpc.Front.t; rx_client: (RX.response,int) Lwt_ring.Front.t; rx_map: (int, Gnt.gntref * Io_page.t) Hashtbl.t; @@ -191,7 +273,7 @@ let plug_inner id = Printf.printf "Netfront.create: id=%d domid=%d\n%!" id backend_id; (* Allocate a transmit and receive ring, and event channel for them *) lwt (rx_gnt, rx_fring, rx_client) = RX.create (id, backend_id) in - lwt (tx_gnt, tx_fring, tx_client) = TX.create (id, backend_id) in + lwt (tx_gnt, _tx_fring, tx_client) = TX.create (id, backend_id) in let tx_mutex = Lwt_mutex.create () in let evtchn = Eventchn.bind_unbound_port h backend_id in let evtchn_port = Eventchn.to_int evtchn in @@ -237,8 +319,10 @@ let plug_inner id = features.sg features.gso_tcpv4 features.rx_copy features.rx_flip features.smart_poll; Eventchn.unmask h evtchn; let stats = { rx_pkts=0l;rx_bytes=0L;tx_pkts=0l;tx_bytes=0L } in + let grant_tx_page = Gnt.Gntshr.grant_access ~domid:backend_id ~writable:false in + let tx_pool = Shared_page_pool.make grant_tx_page in (* Register callback activation *) - return { id; backend_id; tx_client; tx_gnt; tx_mutex; + return { id; backend_id; tx_client; tx_gnt; tx_mutex; tx_pool; rx_gnt; rx_fring; rx_client; rx_map; stats; evtchn; mac; backend; features; } @@ -360,97 +444,101 @@ let connect id = to Xenstore? XXX *) let disconnect t = printf "Netif: disconnect\n%!"; + Shared_page_pool.shutdown t.t.tx_pool; Hashtbl.remove devices t.t.id; return () -let page_size = Io_page.round_to_page_size 1 - -(* Push a single page to the ring, but no event notification *) -let write_request ?size ~flags nf page = - let len = Cstruct.len page in - if page.Cstruct.off + len > page_size then begin - (* netback rejects packets that cross page boundaries *) - let msg = - Printf.sprintf "Invalid page: offset=%d, length=%d" page.Cstruct.off len in - print_endline msg; - Lwt.fail (Failure msg) - end else - lwt gref = Gnt.Gntshr.get () in - (* This grants access to the *base* data pointer of the page *) - (* XXX: another place where we peek inside the cstruct *) - Gnt.Gntshr.grant_access ~domid:nf.t.backend_id ~writable:false gref page.Cstruct.buffer; - let size = match size with |None -> len |Some s -> s in - (* XXX: another place where we peek inside the cstruct *) - nf.t.stats.tx_pkts <- Int32.succ nf.t.stats.tx_pkts; - nf.t.stats.tx_bytes <- Int64.add nf.t.stats.tx_bytes (Int64.of_int size); - let offset = page.Cstruct.off in - lwt replied = Lwt_ring.Front.write nf.t.tx_client - (TX.Proto_64.write ~id:gref ~gref:(Int32.of_int gref) ~offset ~flags ~size) in - (* request has been written; when replied returns we have a reply *) - let replied = - try_lwt - lwt _ = replied in - Gnt.Gntshr.end_access gref; - Gnt.Gntshr.put gref; - return () - with Lwt_ring.Shutdown -> - Gnt.Gntshr.put gref; - fail Lwt_ring.Shutdown - | e -> - Gnt.Gntshr.end_access gref; - Gnt.Gntshr.put gref; - fail e in - return replied - -(* Transmit a packet from buffer, with offset and length *) -let rec write_already_locked nf page = - try_lwt - lwt th = write_request ~flags:0 nf page in - Lwt_ring.Front.push nf.t.tx_client (notify nf.t); - lwt () = th in - (* all fragments acknowledged, resources cleaned up *) - return () - with | Lwt_ring.Shutdown -> write_already_locked nf page - -let write nf page = - Lwt_mutex.with_lock nf.t.tx_mutex - (fun () -> - write_already_locked nf page - ) +(** Copy from src to dst until src is exhausted or dst is full. + * Returns the number of bytes copied and the remaining data from src, if any. *) +(* TODO: replace this with Cstruct.buffer once that's released. *) +let blitv src dst = + let rec aux dst n = function + | [] -> n, [] + | hd::tl -> + let avail = Cstruct.len dst in + let first = Cstruct.len hd in + if first <= avail then ( + Cstruct.blit hd 0 dst 0 first; + aux (Cstruct.shift dst first) (n + first) tl + ) else ( + Cstruct.blit hd 0 dst 0 avail; + let rest_hd = Cstruct.shift hd first in + (n + avail, rest_hd :: tl) + ) in + aux dst 0 src + +(* Push up to one page's worth of data to the ring, but without sending an + * event notification. Once the data has been added to the ring, returns the + * remaining (unsent) data and a thread which will return when the data has + * been ack'd by netback. *) +let write_request ?size ~flags nf datav = + Shared_page_pool.use nf.t.tx_pool (fun gref shared_block -> + let len, datav = blitv datav shared_block in + (* [size] includes extra pages to follow later *) + let size = match size with |None -> len |Some s -> s in + nf.t.stats.tx_pkts <- Int32.succ nf.t.stats.tx_pkts; + nf.t.stats.tx_bytes <- Int64.add nf.t.stats.tx_bytes (Int64.of_int size); + lwt replied = Lwt_ring.Front.write nf.t.tx_client + (TX.Proto_64.write ~id:gref ~gref:(Int32.of_int gref) ~offset:shared_block.Cstruct.off ~flags ~size) in + (* request has been written; when replied returns we have a reply *) + let release = replied >>= fun _ -> return () in + return (datav, release) + ) + +(* Transmit a packet from buffer, with offset and length. + * The buffer's data must fit in a single block. *) +let write_already_locked nf datav = + lwt remaining, th = write_request ~flags:0 nf datav in + assert (Cstruct.lenv remaining = 0); + Lwt_ring.Front.push nf.t.tx_client (notify nf.t); + return th (* Transmit a packet from a list of pages *) -let writev nf pages = +let writev_no_retry nf datav = + let size = Cstruct.lenv datav in + let numneeded = Shared_page_pool.blocks_needed size in Lwt_mutex.with_lock nf.t.tx_mutex (fun () -> - let numneeded = List.length pages in lwt () = Lwt_ring.Front.wait_for_free nf.t.tx_client numneeded in - match pages with - |[] -> return () - |[page] -> - (* If there is only one page, then just write it normally *) - write_already_locked nf page - |first_page::other_pages -> + match numneeded with + | 0 -> return (return ()) + | 1 -> + (* If there is only one block, then just write it normally *) + write_already_locked nf datav + | n -> (* For Xen Netfront, the first fragment contains the entire packet - * length, which is the backend will use to consume the remaining + * length, which the backend will use to consume the remaining * fragments until the full length is satisfied *) - let size = Cstruct.lenv pages in - lwt first_th = - write_request ~flags:TX.Proto_64.flag_more_data ~size nf first_page in - let rec xmit = function - | [] -> return [] - | hd :: [] -> - lwt th = write_request ~flags:0 nf hd in - return [ th ] - | hd :: tl -> - lwt next_th = write_request ~flags:TX.Proto_64.flag_more_data nf hd in - lwt rest = xmit tl in - return (next_th :: rest) in - lwt rest_th = xmit other_pages in + lwt datav, first_th = + write_request ~flags:TX.Proto_64.flag_more_data ~size nf datav in + let rec xmit datav = function + | 0 -> return [] + | 1 -> + lwt datav, th = write_request ~flags:0 nf datav in + assert (Cstruct.lenv datav = 0); + return [ th ] + | n -> + lwt datav, next_th = write_request ~flags:TX.Proto_64.flag_more_data nf datav in + lwt rest = xmit datav (n - 1) in + return (next_th :: rest) in + lwt rest_th = xmit datav (n - 1) in (* All fragments are now written, we can now notify the backend *) Lwt_ring.Front.push nf.t.tx_client (notify nf.t); - return () + return (Lwt.join (first_th :: rest_th)) ) +let rec writev nf datav = + lwt released = + try_lwt writev_no_retry nf datav + with Lwt_ring.Shutdown -> return (fail Lwt_ring.Shutdown) in + Lwt.on_failure released (function + | Lwt_ring.Shutdown -> ignore (writev nf datav) + | ex -> raise ex + ); + return () + +let write nf data = writev nf [data] + let wait_for_plug nf = Printf.printf "Wait for plug...\n"; Lwt_mutex.with_lock nf.l (fun () -> From dd078415b6f4a21ddb6389fec6932b332302d615 Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Thu, 15 Jan 2015 15:15:45 +0000 Subject: [PATCH 2/2] Drop Travis tests for OCaml 4.00 --- .travis.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 9a057c8..1957db9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,4 +2,3 @@ language: c script: bash -ex .travis-ci.sh env: - OCAML_VERSION=4.01.0 OPAM_VERSION=1.1.0 - - OCAML_VERSION=4.00.1 OPAM_VERSION=1.1.0