Skip to content

Commit

Permalink
Take clippy's advice on some things (#444)
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry authored Dec 8, 2023
1 parent d9d4db6 commit 47ba492
Show file tree
Hide file tree
Showing 36 changed files with 626 additions and 767 deletions.
2 changes: 1 addition & 1 deletion src/algorithms/graphs/bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,6 @@ where

inner.join_core(&edges, |_k,l,d| Some((d.clone(), l+1)))
.concat(&nodes)
.reduce(|_, s, t| t.push((s[0].0.clone(), 1)))
.reduce(|_, s, t| t.push((*s[0].0, 1)))
})
}
6 changes: 3 additions & 3 deletions src/algorithms/graphs/bijkstra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ where
let reached =
forward
.join_map(&reverse, |_, (src,d1), (dst,d2)| ((src.clone(), dst.clone()), *d1 + *d2))
.reduce(|_key, s, t| t.push((s[0].0.clone(), 1)))
.reduce(|_key, s, t| t.push((*s[0].0, 1)))
.semijoin(&goals);

let active =
Expand All @@ -96,7 +96,7 @@ where
.join_core(&forward_edges, |_med, (src, dist), next| Some((next.clone(), (src.clone(), *dist+1))))
.concat(&forward)
.map(|(next, (src, dist))| ((next, src), dist))
.reduce(|_key, s, t| t.push((s[0].0.clone(), 1)))
.reduce(|_key, s, t| t.push((*s[0].0, 1)))
.map(|((next, src), dist)| (next, (src, dist)));

forward_next.map(|_| ()).consolidate().inspect(|x| println!("forward_next: {:?}", x));
Expand All @@ -113,7 +113,7 @@ where
.join_core(&reverse_edges, |_med, (rev, dist), next| Some((next.clone(), (rev.clone(), *dist+1))))
.concat(&reverse)
.map(|(next, (rev, dist))| ((next, rev), dist))
.reduce(|_key, s, t| t.push((s[0].0.clone(), 1)))
.reduce(|_key, s, t| t.push((*s[0].0, 1)))
.map(|((next,rev), dist)| (next, (rev, dist)));

reverse_next.map(|_| ()).consolidate().inspect(|x| println!("reverse_next: {:?}", x));
Expand Down
2 changes: 1 addition & 1 deletion src/algorithms/graphs/propagate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ where
let labels =
proposals
.concat(&nodes)
.reduce_abelian::<_,ValSpine<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1 as i8))));
.reduce_abelian::<_,ValSpine<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1_i8))));

let propagate: Collection<_, (N, L), R> =
labels
Expand Down
4 changes: 2 additions & 2 deletions src/algorithms/graphs/scc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ where
// keep edges from active edge destinations.
let active =
edges.map(|(_src,dst)| dst)
.threshold(|_,c| if c.is_zero() { R::from(0 as i8) } else { R::from(1 as i8) });
.threshold(|_,c| if c.is_zero() { R::from(0_i8) } else { R::from(1_i8) });

graph.enter(&edges.scope())
.semijoin(&active)
Expand Down Expand Up @@ -65,7 +65,7 @@ where

// NOTE: With a node -> int function, can be improved by:
// let labels = propagate_at(&cycle, &nodes, |x| *x as u64);
let labels = propagate(&cycle, &nodes);
let labels = propagate(cycle, &nodes);

edges.join_map(&labels, |e1,e2,l1| (e2.clone(),(e1.clone(),l1.clone())))
.join_map(&labels, |e2,(e1,l1),l2| ((e1.clone(),e2.clone()),(l1.clone(),l2.clone())))
Expand Down
5 changes: 2 additions & 3 deletions src/algorithms/graphs/sequential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ where
.distinct();

// repeatedly apply color-picking logic.
sequence(&start, &edges, |_node, vals| {
sequence(&start, edges, |_node, vals| {

// look for the first absent positive integer.
// start at 1 in case we ever use NonZero<u32>.

(1u32 ..)
.filter(|&i| vals.get(i as usize - 1).map(|x| *x.0) != Some(i))
.next()
.find(|&i| vals.get(i as usize - 1).map(|x| *x.0) != Some(i))
.unwrap()
})
}
Expand Down
22 changes: 10 additions & 12 deletions src/algorithms/identifiers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,16 @@ pub trait Identifiers<G: Scope, D: ExchangeData, R: ExchangeData+Abelian> {
/// use differential_dataflow::algorithms::identifiers::Identifiers;
/// use differential_dataflow::operators::Threshold;
///
/// fn main() {
/// ::timely::example(|scope| {
/// ::timely::example(|scope| {
///
/// let identifiers =
/// scope.new_collection_from(1 .. 10).1
/// .identifiers()
/// // assert no conflicts
/// .map(|(data, id)| id)
/// .threshold(|_id,cnt| if cnt > &1 { *cnt } else { 0 })
/// .assert_empty();
/// });
/// }
/// let identifiers =
/// scope.new_collection_from(1 .. 10).1
/// .identifiers()
/// // assert no conflicts
/// .map(|(data, id)| id)
/// .threshold(|_id,cnt| if cnt > &1 { *cnt } else { 0 })
/// .assert_empty();
/// });
/// ```
fn identifiers(&self) -> Collection<G, (D, u64), R>;
}
Expand Down Expand Up @@ -63,7 +61,7 @@ where
.as_collection()
.iterate(|diff|
init.enter(&diff.scope())
.concat(&diff)
.concat(diff)
.map(|pair| (pair.hashed(), pair))
.reduce(|_hash, input, output| {
// keep round-positive records as changes.
Expand Down
6 changes: 2 additions & 4 deletions src/algorithms/prefix_sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,8 @@ where
let combine1 = ::std::rc::Rc::new(combine);
let combine2 = combine1.clone();

let ranges = aggregate(self.clone(), move |k,x,y| (*combine1)(k,x,y));
let values = broadcast(ranges, locations, zero, move |k,x,y| (*combine2)(k,x,y));

values
let ranges = aggregate(self.clone(), move |k,x,y| (*combine1)(k,x,y));
broadcast(ranges, locations, zero, move |k,x,y| (*combine2)(k,x,y))
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ pub mod source {
let mut progress_session = progress.session(&progress_caps[0]);

// We presume the iterator will yield if appropriate.
while let Some(message) = source.next() {
for message in source.by_ref() {
match message {
Message::Updates(mut updates) => {
updates_session.give_vec(&mut updates);
Expand Down Expand Up @@ -624,7 +624,7 @@ pub mod sink {
// and so any record we see is in fact guaranteed to happen.
let mut builder = OperatorBuilder::new("UpdatesWriter".to_owned(), stream.scope());
let reactivator = stream.scope().activator_for(&builder.operator_info().address);
let mut input = builder.new_input(&stream, Pipeline);
let mut input = builder.new_input(stream, Pipeline);
let (mut updates_out, updates) = builder.new_output();

builder.build_reschedule(
Expand Down Expand Up @@ -655,7 +655,7 @@ pub mod sink {
if let Some(sink) = updates_sink.upgrade() {
let mut sink = sink.borrow_mut();
while let Some(message) = send_queue.front() {
if let Some(duration) = sink.poll(&message) {
if let Some(duration) = sink.poll(message) {
// Reschedule after `duration` and then bail.
reactivator.activate_after(duration);
return true;
Expand Down Expand Up @@ -736,7 +736,7 @@ pub mod sink {
frontier = input.frontier.frontier().to_owned();

while let Some(message) = send_queue.front() {
if let Some(duration) = sink.poll(&message) {
if let Some(duration) = sink.poll(message) {
// Reschedule after `duration` and then bail.
reactivator.activate_after(duration);
// Signal that work remains to be done.
Expand Down
Loading

0 comments on commit 47ba492

Please sign in to comment.