diff --git a/.travis.yml b/.travis.yml index 9a057c8..1957db9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,4 +2,3 @@ language: c script: bash -ex .travis-ci.sh env: - OCAML_VERSION=4.01.0 OPAM_VERSION=1.1.0 - - OCAML_VERSION=4.00.1 OPAM_VERSION=1.1.0 diff --git a/CHANGES b/CHANGES index 82b3256..d02d378 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,7 @@ +* When waiting for space in the transmit queue, we would sometimes fail to notice when + space became available. +* Copy out-bound data into pre-shared pages for performance, security and simplicity. + 1.2.0 (2014-12-17): * Add profiling tracepoints and labels (#13). Introduces a dependency on `mirage-profile`. * New `opam` file present in source repository for OPAM 1.2 workflow. diff --git a/_tags b/_tags index 0156d2a..3748a81 100644 --- a/_tags +++ b/_tags @@ -26,3 +26,4 @@ true: annot, bin_annot : pkg_xen-gnt # OASIS_STOP true: annot, bin_annot, debug, principal +true: warn(A-27-32-34-37), strict_sequence diff --git a/lib/netif.ml b/lib/netif.ml index e0af711..241b8eb 100644 --- a/lib/netif.ml +++ b/lib/netif.ml @@ -131,6 +131,87 @@ module TX = struct return (rx_gnt, fring, client) end +module Shared_page_pool : sig + type t + val make : (Gnt.gntref -> Io_page.t -> unit) -> t + val block_size : int + + val use : t -> (Gnt.gntref -> Cstruct.t -> ('a * unit Lwt.t) Lwt.t) -> ('a * unit Lwt.t) Lwt.t + (** [use t fn] calls [fn gref block] with a free shared block of memory. + * The function should return a thread that indicates when the request has + * been added to the queue, by returning a result value and a second thread + * indicating when the block can be returned to the pool. *) + + val blocks_needed : int -> int + + val shutdown : t -> unit +end = struct + type block = Gnt.gntref * Cstruct.t + type t = { + grant : Gnt.gntref -> Io_page.t -> unit; + mutable blocks : block list; + mutable in_use : int; + mutable shutdown : bool; + } + + let page_size = Io_page.round_to_page_size 1 + let block_size = page_size / 2 + + let make grant = { grant; blocks = []; shutdown = false; in_use = 0 } + + let shutdown t = + t.shutdown <- true; + if t.in_use = 0 then ( + t.blocks |> List.iter (fun (gref, block) -> + if block.Cstruct.off = 0 then ( + Gnt.Gntshr.end_access gref; + Gnt.Gntshr.put gref; + ) + ); + t.blocks <- [] + ) + (* Otherwise, shutdown gets called again when in_use becomes 0 *) + + let alloc t = + let page = Io_page.get 1 in + (* (the Xen version of caml_alloc_pages clears the page, so we don't have to) *) + lwt gnt = Gnt.Gntshr.get () in + t.grant gnt page; + return (gnt, Io_page.to_cstruct page) + + let put t block = + t.blocks <- block :: t.blocks; + t.in_use <- t.in_use - 1; + if t.in_use = 0 && t.shutdown then shutdown t + + let use t fn = + if t.shutdown then + failwith "Shared_page_pool.use after shutdown"; + lwt (gntref, block) as grant = + match t.blocks with + | [] -> + (* Frames normally fit within 2048 bytes, so we split each page in half. *) + lwt gntref, page = alloc t in + let b1 = Cstruct.sub page 0 block_size in + let b2 = Cstruct.shift page block_size in + t.blocks <- (gntref, b2) :: t.blocks; + return (gntref, b1) + | hd :: tl -> + t.blocks <- tl; + return hd in + t.in_use <- t.in_use + 1; + lwt (_, release) as result = + try_lwt fn gntref block + with ex -> + put t grant; + raise ex in + Lwt.on_termination release (fun () -> put t grant); + return result + + let blocks_needed bytes = + (bytes + block_size - 1) / block_size +end + type features = { sg: bool; gso_tcpv4: bool; @@ -154,6 +235,7 @@ type transport = { tx_client: (TX.response,int) Lwt_ring.Front.t; tx_gnt: Gnt.gntref; tx_mutex: Lwt_mutex.t; (* Held to avoid signalling between fragments *) + tx_pool: Shared_page_pool.t; rx_fring: (RX.response,int) Ring.Rpc.Front.t; rx_client: (RX.response,int) Lwt_ring.Front.t; rx_map: (int, Gnt.gntref * Io_page.t) Hashtbl.t; @@ -191,7 +273,7 @@ let plug_inner id = Printf.printf "Netfront.create: id=%d domid=%d\n%!" id backend_id; (* Allocate a transmit and receive ring, and event channel for them *) lwt (rx_gnt, rx_fring, rx_client) = RX.create (id, backend_id) in - lwt (tx_gnt, tx_fring, tx_client) = TX.create (id, backend_id) in + lwt (tx_gnt, _tx_fring, tx_client) = TX.create (id, backend_id) in let tx_mutex = Lwt_mutex.create () in let evtchn = Eventchn.bind_unbound_port h backend_id in let evtchn_port = Eventchn.to_int evtchn in @@ -237,8 +319,10 @@ let plug_inner id = features.sg features.gso_tcpv4 features.rx_copy features.rx_flip features.smart_poll; Eventchn.unmask h evtchn; let stats = { rx_pkts=0l;rx_bytes=0L;tx_pkts=0l;tx_bytes=0L } in + let grant_tx_page = Gnt.Gntshr.grant_access ~domid:backend_id ~writable:false in + let tx_pool = Shared_page_pool.make grant_tx_page in (* Register callback activation *) - return { id; backend_id; tx_client; tx_gnt; tx_mutex; + return { id; backend_id; tx_client; tx_gnt; tx_mutex; tx_pool; rx_gnt; rx_fring; rx_client; rx_map; stats; evtchn; mac; backend; features; } @@ -360,97 +444,101 @@ let connect id = to Xenstore? XXX *) let disconnect t = printf "Netif: disconnect\n%!"; + Shared_page_pool.shutdown t.t.tx_pool; Hashtbl.remove devices t.t.id; return () -let page_size = Io_page.round_to_page_size 1 - -(* Push a single page to the ring, but no event notification *) -let write_request ?size ~flags nf page = - let len = Cstruct.len page in - if page.Cstruct.off + len > page_size then begin - (* netback rejects packets that cross page boundaries *) - let msg = - Printf.sprintf "Invalid page: offset=%d, length=%d" page.Cstruct.off len in - print_endline msg; - Lwt.fail (Failure msg) - end else - lwt gref = Gnt.Gntshr.get () in - (* This grants access to the *base* data pointer of the page *) - (* XXX: another place where we peek inside the cstruct *) - Gnt.Gntshr.grant_access ~domid:nf.t.backend_id ~writable:false gref page.Cstruct.buffer; - let size = match size with |None -> len |Some s -> s in - (* XXX: another place where we peek inside the cstruct *) - nf.t.stats.tx_pkts <- Int32.succ nf.t.stats.tx_pkts; - nf.t.stats.tx_bytes <- Int64.add nf.t.stats.tx_bytes (Int64.of_int size); - let offset = page.Cstruct.off in - lwt replied = Lwt_ring.Front.write nf.t.tx_client - (TX.Proto_64.write ~id:gref ~gref:(Int32.of_int gref) ~offset ~flags ~size) in - (* request has been written; when replied returns we have a reply *) - let replied = - try_lwt - lwt _ = replied in - Gnt.Gntshr.end_access gref; - Gnt.Gntshr.put gref; - return () - with Lwt_ring.Shutdown -> - Gnt.Gntshr.put gref; - fail Lwt_ring.Shutdown - | e -> - Gnt.Gntshr.end_access gref; - Gnt.Gntshr.put gref; - fail e in - return replied - -(* Transmit a packet from buffer, with offset and length *) -let rec write_already_locked nf page = - try_lwt - lwt th = write_request ~flags:0 nf page in - Lwt_ring.Front.push nf.t.tx_client (notify nf.t); - lwt () = th in - (* all fragments acknowledged, resources cleaned up *) - return () - with | Lwt_ring.Shutdown -> write_already_locked nf page - -let write nf page = - Lwt_mutex.with_lock nf.t.tx_mutex - (fun () -> - write_already_locked nf page - ) +(** Copy from src to dst until src is exhausted or dst is full. + * Returns the number of bytes copied and the remaining data from src, if any. *) +(* TODO: replace this with Cstruct.buffer once that's released. *) +let blitv src dst = + let rec aux dst n = function + | [] -> n, [] + | hd::tl -> + let avail = Cstruct.len dst in + let first = Cstruct.len hd in + if first <= avail then ( + Cstruct.blit hd 0 dst 0 first; + aux (Cstruct.shift dst first) (n + first) tl + ) else ( + Cstruct.blit hd 0 dst 0 avail; + let rest_hd = Cstruct.shift hd first in + (n + avail, rest_hd :: tl) + ) in + aux dst 0 src + +(* Push up to one page's worth of data to the ring, but without sending an + * event notification. Once the data has been added to the ring, returns the + * remaining (unsent) data and a thread which will return when the data has + * been ack'd by netback. *) +let write_request ?size ~flags nf datav = + Shared_page_pool.use nf.t.tx_pool (fun gref shared_block -> + let len, datav = blitv datav shared_block in + (* [size] includes extra pages to follow later *) + let size = match size with |None -> len |Some s -> s in + nf.t.stats.tx_pkts <- Int32.succ nf.t.stats.tx_pkts; + nf.t.stats.tx_bytes <- Int64.add nf.t.stats.tx_bytes (Int64.of_int size); + lwt replied = Lwt_ring.Front.write nf.t.tx_client + (TX.Proto_64.write ~id:gref ~gref:(Int32.of_int gref) ~offset:shared_block.Cstruct.off ~flags ~size) in + (* request has been written; when replied returns we have a reply *) + let release = replied >>= fun _ -> return () in + return (datav, release) + ) + +(* Transmit a packet from buffer, with offset and length. + * The buffer's data must fit in a single block. *) +let write_already_locked nf datav = + lwt remaining, th = write_request ~flags:0 nf datav in + assert (Cstruct.lenv remaining = 0); + Lwt_ring.Front.push nf.t.tx_client (notify nf.t); + return th (* Transmit a packet from a list of pages *) -let writev nf pages = +let writev_no_retry nf datav = + let size = Cstruct.lenv datav in + let numneeded = Shared_page_pool.blocks_needed size in Lwt_mutex.with_lock nf.t.tx_mutex (fun () -> - let numneeded = List.length pages in lwt () = Lwt_ring.Front.wait_for_free nf.t.tx_client numneeded in - match pages with - |[] -> return () - |[page] -> - (* If there is only one page, then just write it normally *) - write_already_locked nf page - |first_page::other_pages -> + match numneeded with + | 0 -> return (return ()) + | 1 -> + (* If there is only one block, then just write it normally *) + write_already_locked nf datav + | n -> (* For Xen Netfront, the first fragment contains the entire packet - * length, which is the backend will use to consume the remaining + * length, which the backend will use to consume the remaining * fragments until the full length is satisfied *) - let size = Cstruct.lenv pages in - lwt first_th = - write_request ~flags:TX.Proto_64.flag_more_data ~size nf first_page in - let rec xmit = function - | [] -> return [] - | hd :: [] -> - lwt th = write_request ~flags:0 nf hd in - return [ th ] - | hd :: tl -> - lwt next_th = write_request ~flags:TX.Proto_64.flag_more_data nf hd in - lwt rest = xmit tl in - return (next_th :: rest) in - lwt rest_th = xmit other_pages in + lwt datav, first_th = + write_request ~flags:TX.Proto_64.flag_more_data ~size nf datav in + let rec xmit datav = function + | 0 -> return [] + | 1 -> + lwt datav, th = write_request ~flags:0 nf datav in + assert (Cstruct.lenv datav = 0); + return [ th ] + | n -> + lwt datav, next_th = write_request ~flags:TX.Proto_64.flag_more_data nf datav in + lwt rest = xmit datav (n - 1) in + return (next_th :: rest) in + lwt rest_th = xmit datav (n - 1) in (* All fragments are now written, we can now notify the backend *) Lwt_ring.Front.push nf.t.tx_client (notify nf.t); - return () + return (Lwt.join (first_th :: rest_th)) ) +let rec writev nf datav = + lwt released = + try_lwt writev_no_retry nf datav + with Lwt_ring.Shutdown -> return (fail Lwt_ring.Shutdown) in + Lwt.on_failure released (function + | Lwt_ring.Shutdown -> ignore (writev nf datav) + | ex -> raise ex + ); + return () + +let write nf data = writev nf [data] + let wait_for_plug nf = Printf.printf "Wait for plug...\n"; Lwt_mutex.with_lock nf.l (fun () ->