diff --git a/.github/workflows/maturin.yml b/.github/workflows/maturin.yml index ad3ec43..bd517bb 100644 --- a/.github/workflows/maturin.yml +++ b/.github/workflows/maturin.yml @@ -117,8 +117,8 @@ jobs: strategy: matrix: platform: - - runner: macos-12 - target: x86_64 + # - runner: macos-12 + # target: x86_64 - runner: macos-14 target: aarch64 steps: diff --git a/Cargo.toml b/Cargo.toml index 720e6b5..f143f97 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,8 +9,8 @@ name = "_lowtime_rs" crate-type = ["cdylib"] [dependencies] -pyo3 = "0.22.0" -pyo3-log = "0.11.0" +pyo3 = { version = "0.23.3", features = ["extension-module"] } +pyo3-log = "0.12.0" log = "0.4" ordered-float = { version = "4.0", default-features = false } pathfinding = "4.11.0" diff --git a/lowtime/_lowtime_rs.pyi b/lowtime/_lowtime_rs.pyi index f5b7270..7b19ab9 100644 --- a/lowtime/_lowtime_rs.pyi +++ b/lowtime/_lowtime_rs.pyi @@ -5,9 +5,15 @@ import networkx as nx class PhillipsDessouky: def __init__( self, + fp_error: float, node_ids: list[int] | nx.classes.reportviews.NodeView, source_node_id: int, sink_node_id: int, - edges_raw: list[tuple[tuple[int, int], float]], + edges_raw: list[ + tuple[ + tuple[int, int], + tuple[float, float, float, float], + ] + ], ) -> None: ... - def max_flow(self) -> list[tuple[tuple[int, int], float]]: ... + def find_min_cut(self) -> tuple[set[int], set[int]]: ... diff --git a/lowtime/solver.py b/lowtime/solver.py index ae034aa..ab7a060 100644 --- a/lowtime/solver.py +++ b/lowtime/solver.py @@ -17,13 +17,13 @@ from __future__ import annotations -import time import sys import logging from collections import deque from collections.abc import Generator import networkx as nx +from networkx.algorithms.flow import edmonds_karp from attrs import define, field from lowtime.operation import Operation @@ -161,6 +161,35 @@ def __init__(self, dag: nx.DiGraph, attr_name: str = "op") -> None: self.aon_dag = dag self.unit_time = unit_time_candidates.pop() + def format_rust_inputs( + self, + dag: nx.DiGraph, + ) -> tuple[ + nx.classes.reportviews.NodeView, + list[ + tuple[ + tuple[int, int], + tuple[float, float, float, float], + ] + ], + ]: + """Convert Python-side nx.DiGraph into format compatible with Rust-side LowtimeGraph.""" + nodes = dag.nodes + edges = [] + for from_, to_, edge_attrs in dag.edges(data=True): + edges.append( + ( + (from_, to_), + ( + edge_attrs.get("capacity", 0), + edge_attrs.get("flow", 0), + edge_attrs.get("ub", 0), + edge_attrs.get("lb", 0), + ), + ) + ) + return nodes, edges + def run(self) -> Generator[IterationResult, None, None]: """Run the algorithm and yield a DAG after each iteration. @@ -172,7 +201,6 @@ def run(self) -> Generator[IterationResult, None, None]: of all operations are up to date w.r.t. the `duration` of each operation. """ logger.info("Starting Phillips-Dessouky solver.") - profiling_setup = time.time() # Convert the original activity-on-node DAG to activity-on-arc DAG form. # AOA DAGs are purely internal. All public input and output of this class @@ -208,15 +236,9 @@ def run(self) -> Generator[IterationResult, None, None]: num_iters = max_time - min_time + 1 logger.info("Expected number of PD iterations: %d", num_iters) - profiling_setup = time.time() - profiling_setup - logger.info( - "PROFILING PhillipsDessouky::run set up time: %.10fs", profiling_setup - ) - # Iteratively reduce the execution time of the DAG. for iteration in range(sys.maxsize): logger.info(">>> Beginning iteration %d/%d", iteration + 1, num_iters) - profiling_iter = time.time() # At this point, `critical_dag` always exists and is what we want. # For the first iteration, the critical DAG is computed before the for @@ -238,13 +260,7 @@ def run(self) -> Generator[IterationResult, None, None]: sum(op.duration for op in non_dummy_ops), ) - profiling_annotate = time.time() self.annotate_capacities(critical_dag) - profiling_annotate = time.time() - profiling_annotate - logger.info( - "PROFILING PhillipsDessouky::annotate_capacities time: %.10fs", - profiling_annotate, - ) if logger.isEnabledFor(logging.DEBUG): logger.debug("Capacity DAG:") @@ -258,25 +274,21 @@ def run(self) -> Generator[IterationResult, None, None]: ) try: - profiling_min_cut = time.time() - s_set, t_set = self.find_min_cut(critical_dag) - profiling_min_cut = time.time() - profiling_min_cut - logger.info( - "PROFILING PhillipsDessouky::find_min_cut time: %.10fs", - profiling_min_cut, + nodes, edges = self.format_rust_inputs(critical_dag) + rust_runner = _lowtime_rs.PhillipsDessouky( + FP_ERROR, + nodes, + critical_dag.graph["source_node"], + critical_dag.graph["sink_node"], + edges, ) + s_set, t_set = rust_runner.find_min_cut() except LowtimeFlowError as e: logger.info("Could not find minimum cut: %s", e.message) logger.info("Terminating PD iteration.") break - profiling_reduce = time.time() cost_change = self.reduce_durations(critical_dag, s_set, t_set) - profiling_reduce = time.time() - profiling_reduce - logger.info( - "PROFILING PhillipsDessouky::reduce_durations time: %.10fs", - profiling_reduce, - ) if cost_change == float("inf") or abs(cost_change) < FP_ERROR: logger.info("No further time reduction possible.") @@ -298,11 +310,6 @@ def run(self) -> Generator[IterationResult, None, None]: unit_time=self.unit_time, ) logger.info("%s", result) - profiling_iter = time.time() - profiling_iter - logger.info( - "PROFILING PhillipsDessouky::run single iteration time: %.10fs", - profiling_iter, - ) yield result def reduce_durations( @@ -361,6 +368,10 @@ def reduce_durations( def find_min_cut(self, dag: nx.DiGraph) -> tuple[set[int], set[int]]: """Find the min cut of the DAG annotated with lower/upper bound flow capacities. + Note: this function is not used and instead accelerated by calling + rust_runner.find_min_cut. It is left here for reference in case someone wants + to modify the algorithm in Python for research. + Assumptions: - The capacity DAG is in AOA form. - The capacity DAG has been annotated with `lb` and `ub` attributes on edges, @@ -374,7 +385,6 @@ def find_min_cut(self, dag: nx.DiGraph) -> tuple[set[int], set[int]]: Raises: LowtimeFlowError: When no feasible flow exists. """ - profiling_min_cut_setup = time.time() source_node = dag.graph["source_node"] sink_node = dag.graph["sink_node"] @@ -428,63 +438,18 @@ def find_min_cut(self, dag: nx.DiGraph) -> tuple[set[int], set[int]]: capacity=float("inf"), ) - profiling_min_cut_setup = time.time() - profiling_min_cut_setup - logger.info( - "PROFILING PhillipsDessouky::find_min_cut setup time: %.10fs", - profiling_min_cut_setup, - ) - - # Helper function for Rust interop - def format_rust_inputs( - dag: nx.DiGraph, - ) -> tuple[ - nx.classes.reportviews.NodeView, list[tuple[tuple[int, int], float]] - ]: - nodes = dag.nodes - edges = [ - ((u, v), cap) - for (u, v), cap in nx.get_edge_attributes(dag, "capacity").items() - ] - return nodes, edges - - # Helper function for Rust interop - # Rust's pathfinding::edmonds_karp does not return edges with 0 flow, - # but nx.max_flow does. So we fill in the 0s and empty nodes. - def reformat_rust_flow_to_dict( - flow_vec: list[tuple[tuple[int, int], float]], dag: nx.DiGraph - ) -> dict[int, dict[int, float]]: - flow_dict = dict() - for u in dag.nodes: - flow_dict[u] = dict() - for v in dag.successors(u): - flow_dict[u][v] = 0.0 - - for (u, v), cap in flow_vec: - flow_dict[u][v] = cap - - return flow_dict - # We're done with constructing the DAG with only flow upper bounds. # Find the maximum flow on this DAG. - profiling_max_flow = time.time() - nodes, edges = format_rust_inputs(unbound_dag) - - profiling_data_transfer = time.time() - rust_dag = _lowtime_rs.PhillipsDessouky(nodes, s_prime_id, t_prime_id, edges) - profiling_data_transfer = time.time() - profiling_data_transfer - logger.info( - "PROFILING PhillipsDessouky::find_min_cut data transfer time: %.10fs", - profiling_data_transfer, - ) - - rust_flow_vec = rust_dag.max_flow() - flow_dict = reformat_rust_flow_to_dict(rust_flow_vec, unbound_dag) - - profiling_max_flow = time.time() - profiling_max_flow - logger.info( - "PROFILING PhillipsDessouky::find_min_cut maximum_flow_1 time: %.10fs", - profiling_max_flow, - ) + try: + _, flow_dict = nx.maximum_flow( + unbound_dag, + s_prime_id, + t_prime_id, + capacity="capacity", + flow_func=edmonds_karp, + ) + except nx.NetworkXUnbounded as e: + raise LowtimeFlowError("ERROR: Infinite flow for unbounded DAG.") from e if logger.isEnabledFor(logging.DEBUG): logger.debug("After first max flow") @@ -494,8 +459,6 @@ def reformat_rust_flow_to_dict( total_flow += flow logger.debug("Sum of all flow values: %f", total_flow) - profiling_min_cut_between_max_flows = time.time() - # Check if residual graph is saturated. If so, we have a feasible flow. for u in unbound_dag.successors(s_prime_id): if ( @@ -537,50 +500,28 @@ def reformat_rust_flow_to_dict( # u -> v capacity `ub - flow` and v -> u capacity `flow - lb`. residual_graph = nx.DiGraph(dag) for u, v in dag.edges: - # Rounding small negative values to 0.0 avoids Rust-side - # pathfinding::edmonds_karp from entering unreachable code. - # This edge case did not exist in Python-side nx.max_flow call. - uv_capacity = residual_graph[u][v]["ub"] - residual_graph[u][v]["flow"] - uv_capacity = 0.0 if abs(uv_capacity) < FP_ERROR else uv_capacity - residual_graph[u][v]["capacity"] = uv_capacity - - vu_capacity = residual_graph[u][v]["flow"] - residual_graph[u][v]["lb"] - vu_capacity = 0.0 if abs(vu_capacity) < FP_ERROR else vu_capacity + residual_graph[u][v]["capacity"] = ( + residual_graph[u][v]["ub"] - residual_graph[u][v]["flow"] + ) + capacity = residual_graph[u][v]["flow"] - residual_graph[u][v]["lb"] if dag.has_edge(v, u): - residual_graph[v][u]["capacity"] = vu_capacity + residual_graph[v][u]["capacity"] = capacity else: - residual_graph.add_edge(v, u, capacity=vu_capacity) - - profiling_min_cut_between_max_flows = ( - time.time() - profiling_min_cut_between_max_flows - ) - logger.info( - "PROFILING PhillipsDessouky::find_min_cut between max flows time: %.10fs", - profiling_min_cut_between_max_flows, - ) + residual_graph.add_edge(v, u, capacity=capacity) # Run max flow on the new residual graph. - profiling_max_flow = time.time() - nodes, edges = format_rust_inputs(residual_graph) - - profiling_data_transfer = time.time() - rust_dag = _lowtime_rs.PhillipsDessouky(nodes, source_node, sink_node, edges) - profiling_data_transfer = time.time() - profiling_data_transfer - logger.info( - "PROFILING PhillipsDessouky::find_min_cut data transfer 2 time: %.10fs", - profiling_data_transfer, - ) - - rust_flow_vec = rust_dag.max_flow() - flow_dict = reformat_rust_flow_to_dict(rust_flow_vec, residual_graph) - - profiling_max_flow = time.time() - profiling_max_flow - logger.info( - "PROFILING PhillipsDessouky::find_min_cut maximum_flow_2 time: %.10fs", - profiling_max_flow, - ) - - profiling_min_cut_after_max_flows = time.time() + try: + _, flow_dict = nx.maximum_flow( + residual_graph, + source_node, + sink_node, + capacity="capacity", + flow_func=edmonds_karp, + ) + except nx.NetworkXUnbounded as e: + raise LowtimeFlowError( + "ERROR: Infinite flow on capacity residual graph." + ) from e # Add additional flow we get to the original graph for u, v in dag.edges: @@ -621,14 +562,6 @@ def reformat_rust_flow_to_dict( q.append(child_id) t_set = set(new_residual.nodes) - s_set - profiling_min_cut_after_max_flows = ( - time.time() - profiling_min_cut_after_max_flows - ) - logger.info( - "PROFILING PhillipsDessouky::find_min_cut after max flows time: %.10fs", - profiling_min_cut_after_max_flows, - ) - return s_set, t_set def annotate_capacities(self, critical_dag: nx.DiGraph) -> None: diff --git a/src/lib.rs b/src/lib.rs index ae23248..30d4fae 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1,12 @@ use pyo3::prelude::*; +mod lowtime_graph; mod phillips_dessouky; +mod utils; + use phillips_dessouky::PhillipsDessouky; + #[pymodule] fn _lowtime_rs(m: &Bound<'_, PyModule>) -> PyResult<()> { pyo3_log::init(); // send Rust logs to Python logger diff --git a/src/lowtime_graph.rs b/src/lowtime_graph.rs new file mode 100644 index 0000000..bbc89bc --- /dev/null +++ b/src/lowtime_graph.rs @@ -0,0 +1,158 @@ +use std::collections::{HashMap, HashSet}; +use ordered_float::OrderedFloat; +use pathfinding::directed::edmonds_karp::{ + SparseCapacity, + Edge, + EKFlows, + edmonds_karp, +}; + + +#[derive(Clone)] +pub struct LowtimeGraph { + pub node_ids: Vec, + pub source_node_id: u32, + pub sink_node_id: u32, + edges: HashMap>, + preds: HashMap>, + num_edges: usize, +} + +impl LowtimeGraph { + pub fn new(source_node_id: u32, sink_node_id: u32) -> Self { + LowtimeGraph { + node_ids: Vec::new(), + source_node_id, + sink_node_id, + edges: HashMap::new(), + preds: HashMap::new(), + num_edges: 0, + } + } + + pub fn of_python( + mut node_ids: Vec, + source_node_id: u32, + sink_node_id: u32, + edges_raw: Vec<((u32, u32), (f64, f64, f64, f64))>, + ) -> Self { + let mut graph = LowtimeGraph::new(source_node_id, sink_node_id); + node_ids.sort(); + graph.node_ids = node_ids.clone(); + + edges_raw.iter().for_each(|( + (from, to), + (capacity, flow, ub, lb), + )| { + graph.add_edge(*from, *to, LowtimeEdge::new( + OrderedFloat(*capacity), + *flow, + *ub, + *lb, + )) + }); + graph + } + + pub fn max_flow(&self) -> EKFlows> { + let edges_edmonds_karp = self.get_ek_preprocessed_edges(); + edmonds_karp::<_, _, _, SparseCapacity<_>>( + &self.node_ids, + &self.source_node_id, + &self.sink_node_id, + edges_edmonds_karp, + ) + } + + pub fn num_nodes(&self) -> usize { + self.node_ids.len() + } + + pub fn num_edges(&self) -> usize { + self.num_edges + } + + pub fn successors(&self, node_id: u32) -> Option> { + self.edges.get(&node_id).map(|succs| succs.keys()) + } + + pub fn predecessors(&self, node_id: u32) -> Option> { + self.preds.get(&node_id).map(|preds| preds.iter()) + } + + pub fn edges(&self) -> impl Iterator { + self.edges.iter().flat_map(|(from, inner)| { + inner.iter().map(move |(to, edge)| (from, to, edge)) + }) + } + + pub fn edges_mut(&mut self) -> impl Iterator { + self.edges.iter_mut().flat_map(|(from, inner)| { + inner.iter_mut().map(move |(to, edge)| (from, to, edge)) + }) + } + + pub fn add_node_id(&mut self, node_id: u32) -> () { + assert!(self.node_ids.last().unwrap() < &node_id, "New node ids must be larger than all existing node ids"); + self.node_ids.push(node_id) + } + + pub fn has_edge(&self, from: u32, to: u32) -> bool { + match self.edges.get(&from).and_then(|inner| inner.get(&to)) { + Some(_) => true, + None => false, + } + } + + pub fn get_edge(&self, from: u32, to: u32) -> &LowtimeEdge { + self.edges.get(&from) + .and_then(|inner| inner.get(&to)) + .expect(&format!("Edge {} to {} not found", from, to)) + } + + pub fn get_edge_mut(&mut self, from: u32, to: u32) -> &mut LowtimeEdge { + self.edges.get_mut(&from) + .and_then(|inner| inner.get_mut(&to)) + .expect(&format!("Edge {} to {} not found", from, to)) + } + + pub fn add_edge(&mut self, from: u32, to: u32, edge: LowtimeEdge) -> () { + self.edges.entry(from).or_insert_with(HashMap::new).insert(to, edge); + self.preds.entry(to).or_insert_with(HashSet::new).insert(from); + self.num_edges += 1; + } + + fn get_ek_preprocessed_edges(&self, ) -> Vec>> { + let mut processed_edges = Vec::with_capacity(self.num_edges); + processed_edges.extend( + self.edges.iter().flat_map(|(from, inner)| + inner.iter().map(|(to, edge)| + ((*from, *to), edge.capacity) + ))); + processed_edges + } +} + +#[derive(Clone)] +pub struct LowtimeEdge { + pub capacity: OrderedFloat, + pub flow: f64, + pub ub: f64, + pub lb: f64, +} + +impl LowtimeEdge { + pub fn new( + capacity: OrderedFloat, + flow: f64, + ub: f64, + lb: f64, + ) -> Self { + LowtimeEdge { + capacity, + flow, + ub, + lb, + } + } +} diff --git a/src/phillips_dessouky.rs b/src/phillips_dessouky.rs index 896b46f..fde1183 100644 --- a/src/phillips_dessouky.rs +++ b/src/phillips_dessouky.rs @@ -1,80 +1,307 @@ use pyo3::prelude::*; +use std::collections::{HashMap, HashSet, VecDeque}; +use log::{debug, error, Level}; use ordered_float::OrderedFloat; -use pathfinding::directed::edmonds_karp::{ - SparseCapacity, - Edge, - edmonds_karp -}; -use std::time::Instant; -use log::info; +use crate::lowtime_graph::{LowtimeGraph, LowtimeEdge}; #[pyclass] pub struct PhillipsDessouky { - node_ids: Vec, - source_node_id: u32, - sink_node_id: u32, - edges_raw: Vec<((u32, u32), f64)>, + dag: LowtimeGraph, + fp_error: f64, } #[pymethods] impl PhillipsDessouky { #[new] fn new( + fp_error: f64, node_ids: Vec, source_node_id: u32, sink_node_id: u32, - edges_raw: Vec<((u32, u32), f64)>, + edges_raw: Vec<((u32, u32), (f64, f64, f64, f64))>, ) -> PyResult { Ok(PhillipsDessouky { - node_ids, - source_node_id, - sink_node_id, - edges_raw, + dag: LowtimeGraph::of_python( + node_ids, + source_node_id, + sink_node_id, + edges_raw, + ), + fp_error, }) } - fn max_flow(&self) -> PyResult> { - let profiling_start = Instant::now(); - let edges_edmonds_karp: Vec>> = self.edges_raw.iter().map(|((from, to), cap)| { - ((*from, *to), OrderedFloat(*cap)) - }).collect(); - let profiling_end = Instant::now(); - info!("PROFILING Rust_PhillipsDessouky::max_flow scaling to OrderedFloat time: {:.10}s", PhillipsDessouky::profile_duration(profiling_start, profiling_end)); - - let profiling_start = Instant::now(); - let (flows, _max_flow, _min_cut) = edmonds_karp::<_, _, _, SparseCapacity<_>>( - &self.node_ids, - &self.source_node_id, - &self.sink_node_id, - edges_edmonds_karp, + /// Find the min cut of the DAG annotated with lower/upper bound flow capacities. + /// + /// Assumptions: + /// - The capacity DAG is in AOA form. + /// - The capacity DAG has been annotated with `lb` and `ub` attributes on edges, + /// representing the lower and upper bounds of the flow on the edge. + /// + /// Returns: + /// A tuple of (s_set, t_set) where s_set is the set of nodes on the source side + /// of the min cut and t_set is the set of nodes on the sink side of the min cut. + /// + /// Panics if: + /// - Any of the assumptions are not true. + /// - No feasible flow exists. + fn find_min_cut(&mut self) -> (HashSet, HashSet) { + // In order to solve max flow on edges with both lower and upper bounds, + // we first need to convert it to another DAG that only has upper bounds. + let mut unbound_dag: LowtimeGraph = self.dag.clone(); + + // For every edge, capacity = ub - lb. + unbound_dag.edges_mut().for_each(|(_from, _to, edge)| + edge.capacity = OrderedFloat(edge.ub - edge.lb) ); - let profiling_end = Instant::now(); - info!("PROFILING Rust_PhillipsDessouky::max_flow edmonds_karp time: {:.10}s", PhillipsDessouky::profile_duration(profiling_start, profiling_end)); - let profiling_start = Instant::now(); - let flows_f64: Vec<((u32, u32), f64)> = flows.into_iter().map(|(edge, flow)| { - (edge, flow.into_inner()) - }).collect(); - let profiling_end = Instant::now(); - info!("PROFILING Rust_PhillipsDessouky::max_flow reformat OrderedFloat to f64 time: {:.10}s", PhillipsDessouky::profile_duration(profiling_start, profiling_end)); + // Add a new node s', which will become the new source node. + // We constructed the AOA DAG, so we know that node IDs are integers. + let s_prime_id = unbound_dag.node_ids.last().unwrap() + 1; + unbound_dag.add_node_id(s_prime_id); - Ok(flows_f64) - } -} + // For every node u in the original graph, add an edge (s', u) with capacity + // equal to the sum of all lower bounds of u's parents. + let orig_node_ids = &self.dag.node_ids; + for u in orig_node_ids.iter() { + let mut capacity = OrderedFloat(0.0); + if let Some(preds) = unbound_dag.predecessors(*u) { + capacity = OrderedFloat(preds.fold(0.0, |acc, pred_id| { + acc + unbound_dag.get_edge(*pred_id, *u).lb + })); + } + unbound_dag.add_edge(s_prime_id, *u, LowtimeEdge::new( + capacity, + 0.0, // flow + 0.0, // ub + 0.0, // lb + )); + } -// Private to Rust, not exposed to Python -impl PhillipsDessouky { - fn profile_duration(start: Instant, end: Instant) -> f64 { - let duration = end.duration_since(start); - let seconds = duration.as_secs(); - let subsec_nanos = duration.subsec_nanos(); + // Add a new node t', which will become the new sink node. + let t_prime_id = s_prime_id + 1; + unbound_dag.add_node_id(t_prime_id); + + // For every node u in the original graph, add an edge (u, t') with capacity + // equal to the sum of all lower bounds of u's children. + for u in orig_node_ids.iter() { + let mut capacity = OrderedFloat(0.0); + if let Some(succs) = unbound_dag.successors(*u) { + capacity = OrderedFloat(succs.fold(0.0, |acc, succ_id| { + acc + unbound_dag.get_edge(*u, *succ_id).lb + })); + } + unbound_dag.add_edge(*u, t_prime_id, LowtimeEdge::new( + capacity, + 0.0, // flow + 0.0, // ub + 0.0, // lb + )); + } + + if log::log_enabled!(Level::Debug) { + debug!("Unbound DAG"); + debug!("Number of nodes: {}", unbound_dag.num_nodes()); + debug!("Number of edges: {}", unbound_dag.num_edges()); + let total_capacity = unbound_dag.edges() + .fold(OrderedFloat(0.0), |acc, (_from, _to, edge)| acc + edge.capacity); + debug!("Sum of capacities: {}", total_capacity); + } + + // Add an edge from t to s with infinite capacity. + unbound_dag.add_edge( + unbound_dag.sink_node_id, + unbound_dag.source_node_id, + LowtimeEdge::new( + OrderedFloat(f64::INFINITY), // capacity + 0.0, // flow + 0.0, // ub + 0.0, // lb + ), + ); + + // Update source and sink on unbound_dag + unbound_dag.source_node_id = s_prime_id; + unbound_dag.sink_node_id = t_prime_id; + + // We're done with constructing the DAG with only flow upper bounds. + // Find the maximum flow on this DAG. + let (flows, _max_flow, _min_cut): ( + Vec<((u32, u32), OrderedFloat)>, + OrderedFloat, + Vec<((u32, u32), OrderedFloat)>, + ) = unbound_dag.max_flow(); + + if log::log_enabled!(Level::Debug) { + debug!("After first max flow"); + let total_flow = flows.iter() + .fold(OrderedFloat(0.0), |acc, ((_from, _to), flow)| acc + flow); + debug!("Sum of all flow values: {}", total_flow); + } + + // Convert flows to dict for faster lookup + let flow_dict = flows.iter().fold( + HashMap::new(), + |mut acc: HashMap>, ((from, to), flow)| { + acc.entry(*from) + .or_insert_with(HashMap::new) + .insert(*to, flow.into_inner()); + acc + } + ); + + // Check if residual graph is saturated. If so, we have a feasible flow. + if let Some(succs) = unbound_dag.successors(s_prime_id) { + for u in succs { + let flow = flow_dict.get(&s_prime_id) + .and_then(|inner| inner.get(u)) + .unwrap_or(&0.0); + let cap = unbound_dag.get_edge(s_prime_id, *u).capacity; + let diff = (flow - cap.into_inner()).abs(); + if diff > self.fp_error { + error!( + "s' -> {} unsaturated (flow: {}, capacity: {})", + u, + flow_dict[&s_prime_id][u], + unbound_dag.get_edge(s_prime_id, *u).capacity, + ); + panic!("ERROR: Max flow on unbounded DAG didn't saturate."); + } + } + } + if let Some(preds) = unbound_dag.predecessors(t_prime_id) { + for u in preds { + let flow = flow_dict.get(u) + .and_then(|inner| inner.get(&t_prime_id)) + .unwrap_or(&0.0); + let cap = unbound_dag.get_edge(*u, t_prime_id).capacity; + let diff = (flow - cap.into_inner()).abs(); + if diff > self.fp_error { + error!( + "{} -> t' unsaturated (flow: {}, capacity: {})", + u, + flow_dict[u][&t_prime_id], + unbound_dag.get_edge(*u, t_prime_id).capacity, + ); + panic!("ERROR: Max flow on unbounded DAG didn't saturate."); + } + } + } + + // We have a feasible flow. Construct a new residual graph with the same + // shape as the capacity DAG so that we can find the min cut. + // First, retrieve the flow amounts to the original capacity graph, where for + // each edge u -> v, the flow amount is `flow + lb`. + for (u, v, edge) in self.dag.edges_mut() { + let flow = flow_dict.get(u) + .and_then(|inner| inner.get(v)) + .unwrap_or(&0.0); + edge.flow = flow + edge.lb; + } + + // Construct a new residual graph (same shape as capacity DAG) with + // u -> v capacity `ub - flow` and v -> u capacity `flow - lb`. + let mut residual_graph = self.dag.clone(); + for (u, v, _dag_edge) in self.dag.edges() { + // Rounding small negative values to 0.0 avoids pathfinding::edmonds_karp + // from entering unreachable code. Has no impact on correctness in test runs. + let residual_uv_edge = residual_graph.get_edge_mut(*u, *v); + let mut uv_capacity = OrderedFloat(residual_uv_edge.ub - residual_uv_edge.flow); + if uv_capacity.into_inner().abs() < self.fp_error { + uv_capacity = OrderedFloat(0.0); + } + residual_uv_edge.capacity = uv_capacity; + + let mut vu_capacity = OrderedFloat(residual_uv_edge.flow - residual_uv_edge.lb); + if vu_capacity.into_inner().abs() < self.fp_error { + vu_capacity = OrderedFloat(0.0); + } + + match self.dag.has_edge(*v, *u) { + true => residual_graph.get_edge_mut(*v, *u).capacity = vu_capacity, + false => residual_graph.add_edge(*v, *u, LowtimeEdge::new( + vu_capacity, + 0.0, // flow + 0.0, // ub + 0.0, // lb + )), + } + } + + // Run max flow on the new residual graph. + let (flows, _max_flow, _min_cut): ( + Vec<((u32, u32), OrderedFloat)>, + OrderedFloat, + Vec<((u32, u32), OrderedFloat)>, + ) = residual_graph.max_flow(); + + // Convert flows to dict for faster lookup + let flow_dict = flows.iter().fold( + HashMap::new(), + |mut acc: HashMap>, ((from, to), flow)| { + acc.entry(*from) + .or_insert_with(HashMap::new) + .insert(*to, flow.into_inner()); + acc + } + ); + + // Add additional flow we get to the original graph + for (u, v, edge) in self.dag.edges_mut() { + edge.flow += *flow_dict.get(u) + .and_then(|inner| inner.get(v)) + .unwrap_or(&0.0); + edge.flow -= *flow_dict.get(v) + .and_then(|inner| inner.get(u)) + .unwrap_or(&0.0); + } + + // Construct the new residual graph. + let mut new_residual = self.dag.clone(); + for (u, v, edge) in self.dag.edges() { + new_residual.get_edge_mut(*u, *v).flow = edge.ub - edge.flow; + new_residual.add_edge(*v, *u, LowtimeEdge::new( + OrderedFloat(0.0), // capacity + edge.flow - edge.lb, // flow + 0.0, // ub + 0.0, // lb + )); + } - let fractional_seconds = subsec_nanos as f64 / 1_000_000_000.0; - let total_seconds = seconds as f64 + fractional_seconds; + if log::log_enabled!(Level::Debug) { + debug!("New residual graph"); + debug!("Number of nodes: {}", new_residual.num_nodes()); + debug!("Number of edges: {}", new_residual.num_edges()); + let total_flow = unbound_dag.edges() + .fold(0.0, |acc, (_from, _to, edge)| acc + edge.flow); + debug!("Sum of capacities: {}", total_flow); + } - return total_seconds; + // Find the s-t cut induced by the second maximum flow above. + // Only following `flow > 0` edges, find the set of nodes reachable from + // source node. That's the s-set, and the rest is the t-set. + let mut s_set: HashSet = HashSet::new(); + let mut q: VecDeque = VecDeque::new(); + q.push_back(new_residual.source_node_id); + while let Some(cur_id) = q.pop_back() { + s_set.insert(cur_id); + if cur_id == new_residual.sink_node_id { + break; + } + if let Some(succs) = new_residual.successors(cur_id) { + for child_id in succs { + let flow = new_residual.get_edge(cur_id, *child_id).flow; + if !s_set.contains(child_id) && flow.abs() > self.fp_error { + q.push_back(*child_id); + } + } + } + } + let all_nodes: HashSet = new_residual.node_ids.iter().copied().collect(); + let t_set: HashSet = all_nodes.difference(&s_set).copied().collect(); + (s_set, t_set) } } diff --git a/src/utils.rs b/src/utils.rs new file mode 100644 index 0000000..391583f --- /dev/null +++ b/src/utils.rs @@ -0,0 +1,17 @@ +use std::time::{ + Instant, + Duration, +}; + +// This function is not used in the codebase, but it is left here +// to facilitate profiling during development. +pub fn get_duration(start: Instant, end: Instant) -> f64 { + let duration: Duration = end.duration_since(start); + let seconds = duration.as_secs(); + let subsec_nanos = duration.subsec_nanos(); + + let fractional_seconds = subsec_nanos as f64 / 1_000_000_000.0; + let total_seconds = seconds as f64 + fractional_seconds; + + return total_seconds; +}