-
Notifications
You must be signed in to change notification settings - Fork 15
Copy out-bound data into pre-shared pages #17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -131,6 +131,87 @@ module TX = struct | |
| return (rx_gnt, fring, client) | ||
| end | ||
|
|
||
| module Shared_page_pool : sig | ||
| type t | ||
| val make : (Gnt.gntref -> Io_page.t -> unit) -> t | ||
| val block_size : int | ||
|
|
||
| val use : t -> (Gnt.gntref -> Cstruct.t -> ('a * unit Lwt.t) Lwt.t) -> ('a * unit Lwt.t) Lwt.t | ||
| (** [use t fn] calls [fn gref block] with a free shared block of memory. | ||
| * The function should return a thread that indicates when the request has | ||
| * been added to the queue, by returning a result value and a second thread | ||
| * indicating when the block can be returned to the pool. *) | ||
|
|
||
| val blocks_needed : int -> int | ||
|
|
||
| val shutdown : t -> unit | ||
| end = struct | ||
| type block = Gnt.gntref * Cstruct.t | ||
| type t = { | ||
| grant : Gnt.gntref -> Io_page.t -> unit; | ||
| mutable blocks : block list; | ||
| mutable in_use : int; | ||
| mutable shutdown : bool; | ||
| } | ||
|
|
||
| let page_size = Io_page.round_to_page_size 1 | ||
| let block_size = page_size / 2 | ||
|
|
||
| let make grant = { grant; blocks = []; shutdown = false; in_use = 0 } | ||
|
|
||
| let shutdown t = | ||
| t.shutdown <- true; | ||
| if t.in_use = 0 then ( | ||
| t.blocks |> List.iter (fun (gref, block) -> | ||
| if block.Cstruct.off = 0 then ( | ||
| Gnt.Gntshr.end_access gref; | ||
| Gnt.Gntshr.put gref; | ||
| ) | ||
| ); | ||
| t.blocks <- [] | ||
| ) | ||
| (* Otherwise, shutdown gets called again when in_use becomes 0 *) | ||
|
|
||
| let alloc t = | ||
| let page = Io_page.get 1 in | ||
| (* (the Xen version of caml_alloc_pages clears the page, so we don't have to) *) | ||
| lwt gnt = Gnt.Gntshr.get () in | ||
| t.grant gnt page; | ||
| return (gnt, Io_page.to_cstruct page) | ||
|
|
||
| let put t block = | ||
| t.blocks <- block :: t.blocks; | ||
| t.in_use <- t.in_use - 1; | ||
| if t.in_use = 0 && t.shutdown then shutdown t | ||
|
|
||
| let use t fn = | ||
| if t.shutdown then | ||
| failwith "Shared_page_pool.use after shutdown"; | ||
| lwt (gntref, block) as grant = | ||
| match t.blocks with | ||
| | [] -> | ||
| (* Frames normally fit within 2048 bytes, so we split each page in half. *) | ||
| lwt gntref, page = alloc t in | ||
| let b1 = Cstruct.sub page 0 block_size in | ||
| let b2 = Cstruct.shift page block_size in | ||
| t.blocks <- (gntref, b2) :: t.blocks; | ||
| return (gntref, b1) | ||
| | hd :: tl -> | ||
| t.blocks <- tl; | ||
| return hd in | ||
| t.in_use <- t.in_use + 1; | ||
| lwt (_, release) as result = | ||
| try_lwt fn gntref block | ||
| with ex -> | ||
| put t grant; | ||
| raise ex in | ||
| Lwt.on_termination release (fun () -> put t grant); | ||
| return result | ||
|
|
||
| let blocks_needed bytes = | ||
| (bytes + block_size - 1) / block_size | ||
| end | ||
|
|
||
| type features = { | ||
| sg: bool; | ||
| gso_tcpv4: bool; | ||
|
|
@@ -154,6 +235,7 @@ type transport = { | |
| tx_client: (TX.response,int) Lwt_ring.Front.t; | ||
| tx_gnt: Gnt.gntref; | ||
| tx_mutex: Lwt_mutex.t; (* Held to avoid signalling between fragments *) | ||
| tx_pool: Shared_page_pool.t; | ||
| rx_fring: (RX.response,int) Ring.Rpc.Front.t; | ||
| rx_client: (RX.response,int) Lwt_ring.Front.t; | ||
| rx_map: (int, Gnt.gntref * Io_page.t) Hashtbl.t; | ||
|
|
@@ -191,7 +273,7 @@ let plug_inner id = | |
| Printf.printf "Netfront.create: id=%d domid=%d\n%!" id backend_id; | ||
| (* Allocate a transmit and receive ring, and event channel for them *) | ||
| lwt (rx_gnt, rx_fring, rx_client) = RX.create (id, backend_id) in | ||
| lwt (tx_gnt, tx_fring, tx_client) = TX.create (id, backend_id) in | ||
| lwt (tx_gnt, _tx_fring, tx_client) = TX.create (id, backend_id) in | ||
| let tx_mutex = Lwt_mutex.create () in | ||
| let evtchn = Eventchn.bind_unbound_port h backend_id in | ||
| let evtchn_port = Eventchn.to_int evtchn in | ||
|
|
@@ -237,8 +319,10 @@ let plug_inner id = | |
| features.sg features.gso_tcpv4 features.rx_copy features.rx_flip features.smart_poll; | ||
| Eventchn.unmask h evtchn; | ||
| let stats = { rx_pkts=0l;rx_bytes=0L;tx_pkts=0l;tx_bytes=0L } in | ||
| let grant_tx_page = Gnt.Gntshr.grant_access ~domid:backend_id ~writable:false in | ||
| let tx_pool = Shared_page_pool.make grant_tx_page in | ||
| (* Register callback activation *) | ||
| return { id; backend_id; tx_client; tx_gnt; tx_mutex; | ||
| return { id; backend_id; tx_client; tx_gnt; tx_mutex; tx_pool; | ||
| rx_gnt; rx_fring; rx_client; rx_map; stats; | ||
| evtchn; mac; backend; features; | ||
| } | ||
|
|
@@ -360,97 +444,101 @@ let connect id = | |
| to Xenstore? XXX *) | ||
| let disconnect t = | ||
| printf "Netif: disconnect\n%!"; | ||
| Shared_page_pool.shutdown t.t.tx_pool; | ||
| Hashtbl.remove devices t.t.id; | ||
| return () | ||
|
|
||
| let page_size = Io_page.round_to_page_size 1 | ||
|
|
||
| (* Push a single page to the ring, but no event notification *) | ||
| let write_request ?size ~flags nf page = | ||
| let len = Cstruct.len page in | ||
| if page.Cstruct.off + len > page_size then begin | ||
| (* netback rejects packets that cross page boundaries *) | ||
| let msg = | ||
| Printf.sprintf "Invalid page: offset=%d, length=%d" page.Cstruct.off len in | ||
| print_endline msg; | ||
| Lwt.fail (Failure msg) | ||
| end else | ||
| lwt gref = Gnt.Gntshr.get () in | ||
| (* This grants access to the *base* data pointer of the page *) | ||
| (* XXX: another place where we peek inside the cstruct *) | ||
| Gnt.Gntshr.grant_access ~domid:nf.t.backend_id ~writable:false gref page.Cstruct.buffer; | ||
| let size = match size with |None -> len |Some s -> s in | ||
| (* XXX: another place where we peek inside the cstruct *) | ||
| nf.t.stats.tx_pkts <- Int32.succ nf.t.stats.tx_pkts; | ||
| nf.t.stats.tx_bytes <- Int64.add nf.t.stats.tx_bytes (Int64.of_int size); | ||
| let offset = page.Cstruct.off in | ||
| lwt replied = Lwt_ring.Front.write nf.t.tx_client | ||
| (TX.Proto_64.write ~id:gref ~gref:(Int32.of_int gref) ~offset ~flags ~size) in | ||
| (* request has been written; when replied returns we have a reply *) | ||
| let replied = | ||
| try_lwt | ||
| lwt _ = replied in | ||
| Gnt.Gntshr.end_access gref; | ||
| Gnt.Gntshr.put gref; | ||
| return () | ||
| with Lwt_ring.Shutdown -> | ||
| Gnt.Gntshr.put gref; | ||
| fail Lwt_ring.Shutdown | ||
| | e -> | ||
| Gnt.Gntshr.end_access gref; | ||
| Gnt.Gntshr.put gref; | ||
| fail e in | ||
| return replied | ||
|
|
||
| (* Transmit a packet from buffer, with offset and length *) | ||
| let rec write_already_locked nf page = | ||
| try_lwt | ||
| lwt th = write_request ~flags:0 nf page in | ||
| Lwt_ring.Front.push nf.t.tx_client (notify nf.t); | ||
| lwt () = th in | ||
| (* all fragments acknowledged, resources cleaned up *) | ||
| return () | ||
| with | Lwt_ring.Shutdown -> write_already_locked nf page | ||
|
|
||
| let write nf page = | ||
| Lwt_mutex.with_lock nf.t.tx_mutex | ||
| (fun () -> | ||
| write_already_locked nf page | ||
| ) | ||
| (** Copy from src to dst until src is exhausted or dst is full. | ||
| * Returns the number of bytes copied and the remaining data from src, if any. *) | ||
| (* TODO: replace this with Cstruct.buffer once that's released. *) | ||
| let blitv src dst = | ||
| let rec aux dst n = function | ||
| | [] -> n, [] | ||
| | hd::tl -> | ||
| let avail = Cstruct.len dst in | ||
| let first = Cstruct.len hd in | ||
| if first <= avail then ( | ||
| Cstruct.blit hd 0 dst 0 first; | ||
| aux (Cstruct.shift dst first) (n + first) tl | ||
| ) else ( | ||
| Cstruct.blit hd 0 dst 0 avail; | ||
| let rest_hd = Cstruct.shift hd first in | ||
| (n + avail, rest_hd :: tl) | ||
| ) in | ||
| aux dst 0 src | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could be useful to upstream this into Cstruct (more generally we need more Cstruct functions working on list of Cstructs I think)
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
|
||
| (* Push up to one page's worth of data to the ring, but without sending an | ||
| * event notification. Once the data has been added to the ring, returns the | ||
| * remaining (unsent) data and a thread which will return when the data has | ||
| * been ack'd by netback. *) | ||
| let write_request ?size ~flags nf datav = | ||
| Shared_page_pool.use nf.t.tx_pool (fun gref shared_block -> | ||
| let len, datav = blitv datav shared_block in | ||
| (* [size] includes extra pages to follow later *) | ||
| let size = match size with |None -> len |Some s -> s in | ||
| nf.t.stats.tx_pkts <- Int32.succ nf.t.stats.tx_pkts; | ||
| nf.t.stats.tx_bytes <- Int64.add nf.t.stats.tx_bytes (Int64.of_int size); | ||
| lwt replied = Lwt_ring.Front.write nf.t.tx_client | ||
| (TX.Proto_64.write ~id:gref ~gref:(Int32.of_int gref) ~offset:shared_block.Cstruct.off ~flags ~size) in | ||
| (* request has been written; when replied returns we have a reply *) | ||
| let release = replied >>= fun _ -> return () in | ||
| return (datav, release) | ||
| ) | ||
|
|
||
| (* Transmit a packet from buffer, with offset and length. | ||
| * The buffer's data must fit in a single block. *) | ||
| let write_already_locked nf datav = | ||
| lwt remaining, th = write_request ~flags:0 nf datav in | ||
| assert (Cstruct.lenv remaining = 0); | ||
| Lwt_ring.Front.push nf.t.tx_client (notify nf.t); | ||
| return th | ||
|
|
||
| (* Transmit a packet from a list of pages *) | ||
| let writev nf pages = | ||
| let writev_no_retry nf datav = | ||
| let size = Cstruct.lenv datav in | ||
| let numneeded = Shared_page_pool.blocks_needed size in | ||
| Lwt_mutex.with_lock nf.t.tx_mutex | ||
| (fun () -> | ||
| let numneeded = List.length pages in | ||
| lwt () = Lwt_ring.Front.wait_for_free nf.t.tx_client numneeded in | ||
| match pages with | ||
| |[] -> return () | ||
| |[page] -> | ||
| (* If there is only one page, then just write it normally *) | ||
| write_already_locked nf page | ||
| |first_page::other_pages -> | ||
| match numneeded with | ||
| | 0 -> return (return ()) | ||
| | 1 -> | ||
| (* If there is only one block, then just write it normally *) | ||
| write_already_locked nf datav | ||
| | n -> | ||
| (* For Xen Netfront, the first fragment contains the entire packet | ||
| * length, which is the backend will use to consume the remaining | ||
| * length, which the backend will use to consume the remaining | ||
| * fragments until the full length is satisfied *) | ||
| let size = Cstruct.lenv pages in | ||
| lwt first_th = | ||
| write_request ~flags:TX.Proto_64.flag_more_data ~size nf first_page in | ||
| let rec xmit = function | ||
| | [] -> return [] | ||
| | hd :: [] -> | ||
| lwt th = write_request ~flags:0 nf hd in | ||
| return [ th ] | ||
| | hd :: tl -> | ||
| lwt next_th = write_request ~flags:TX.Proto_64.flag_more_data nf hd in | ||
| lwt rest = xmit tl in | ||
| return (next_th :: rest) in | ||
| lwt rest_th = xmit other_pages in | ||
| lwt datav, first_th = | ||
| write_request ~flags:TX.Proto_64.flag_more_data ~size nf datav in | ||
| let rec xmit datav = function | ||
| | 0 -> return [] | ||
| | 1 -> | ||
| lwt datav, th = write_request ~flags:0 nf datav in | ||
| assert (Cstruct.lenv datav = 0); | ||
| return [ th ] | ||
| | n -> | ||
| lwt datav, next_th = write_request ~flags:TX.Proto_64.flag_more_data nf datav in | ||
| lwt rest = xmit datav (n - 1) in | ||
| return (next_th :: rest) in | ||
| lwt rest_th = xmit datav (n - 1) in | ||
| (* All fragments are now written, we can now notify the backend *) | ||
| Lwt_ring.Front.push nf.t.tx_client (notify nf.t); | ||
| return () | ||
| return (Lwt.join (first_th :: rest_th)) | ||
| ) | ||
|
|
||
| let rec writev nf datav = | ||
| lwt released = | ||
| try_lwt writev_no_retry nf datav | ||
| with Lwt_ring.Shutdown -> return (fail Lwt_ring.Shutdown) in | ||
| Lwt.on_failure released (function | ||
| | Lwt_ring.Shutdown -> ignore (writev nf datav) | ||
| | ex -> raise ex | ||
| ); | ||
| return () | ||
|
|
||
| let write nf data = writev nf [data] | ||
|
|
||
| let wait_for_plug nf = | ||
| Printf.printf "Wait for plug...\n"; | ||
| Lwt_mutex.with_lock nf.l (fun () -> | ||
|
|
||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See: mirage/ocaml-cstruct#40