Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRY handle_request in peer::dispatcher #8

Open
Thomasdezeeuw opened this issue Aug 25, 2020 · 0 comments
Open

DRY handle_request in peer::dispatcher #8

Thomasdezeeuw opened this issue Aug 25, 2020 · 0 comments

Comments

@Thomasdezeeuw
Copy link
Owner

Location:

fn handle_request(
ctx: &mut actor::Context<Response, ThreadSafe>,
remote: &SocketAddr,
db_ref: &ActorRef<db::Message>,
peers: &Peers,
running: &mut HashMap<ConsensusId, ActorRef<VoteResult>, FxBuildHasher>,
request: Request,
) {
debug!("received a request: {:?}", request);
if request.consensus_id == PARTICIPANT_CONSENSUS_ID {
let msg = consensus::Message::Peer {
key: request.key,
op: request.op,
};
peers.send_participant_consensus(msg);
return;
}
// TODO: DRY this.
match request.op {
Operation::AddBlob => {
let consensus_id = request.consensus_id;
if let Some(actor_ref) = running.remove(&consensus_id) {
warn!(
"received conflicting consensus ids, stopping both: consensus_id={}",
consensus_id
);
let msg = VoteResult {
request_id: request.id,
key: request.key, // NOTE: this is the wrong key.
result: ConsensusVote::Abort,
};
// If we fail to send the actor already stopped, so that's
// fine.
let _ = actor_ref.send(msg);
return;
}
let responder = RpcResponder {
id: request.id,
actor_ref: ctx.actor_ref(),
phase: ConsensusPhase::init(),
};
debug!(
"participant dispatcher starting store blob consensus actor: request={:?}, response={:?}",
request, responder
);
let consensus = consensus::store_blob_actor as fn(_, _, _, _, _, _) -> _;
let actor_ref = ctx.spawn(
|err| {
warn!("store blob consensus actor failed: {}", err);
SupervisorStrategy::Stop
},
consensus,
(
db_ref.clone(),
peers.clone(),
*remote,
request.key,
responder,
),
ActorOptions::default().mark_ready(),
);
// Checked above that we don't have duplicates.
let _ = running.insert(consensus_id, actor_ref);
}
Operation::CommitStoreBlob(timestamp) => {
if let Some(actor_ref) = running.get(&request.consensus_id) {
// Relay the message to the correct actor.
let msg = VoteResult {
request_id: request.id,
key: request.key,
result: ConsensusVote::Commit(timestamp),
};
if let Err(..) = actor_ref.send(msg) {
warn!("failed to send to consensus actor for commit request: request_id={}, consensus_id={}",
request.id, request.consensus_id);
// In case we fail we send ourself a message to relay to
// the coordinator that the actor failed.
let response = Response {
request_id: request.id,
vote: ConsensusVote::Fail,
};
// We can always send ourselves a message.
ctx.actor_ref().send(response).unwrap();
}
} else {
warn!("can't find consensus actor for commit request: request_id={}, consensus_id={}",
request.id, request.consensus_id);
let response = Response {
request_id: request.id,
vote: ConsensusVote::Fail,
};
// We can always send ourselves a message.
ctx.actor_ref().send(response).unwrap();
}
}
Operation::AbortStoreBlob => {
if let Some(actor_ref) = running.remove(&request.consensus_id) {
// Relay the message to the correct actor.
let msg = VoteResult {
request_id: request.id,
key: request.key,
result: ConsensusVote::Abort,
};
if let Err(..) = actor_ref.send(msg) {
warn!("failed to send to consensus actor for abort request: request_id={}, consensus_id={}",
request.id, request.consensus_id);
// In case we fail we send ourself a message to relay to
// the coordinator that the actor failed.
let response = Response {
request_id: request.id,
vote: ConsensusVote::Fail,
};
// We can always send ourselves a message.
ctx.actor_ref().send(response).unwrap();
}
} else {
warn!("can't find consensus actor for abort request: request_id={}, consensus_id={}",
request.id, request.consensus_id);
let response = Response {
request_id: request.id,
vote: ConsensusVote::Fail,
};
// We can always send ourselves a message.
ctx.actor_ref().send(response).unwrap();
}
}
Operation::StoreCommitted(timestamp) => {
if let Some(actor_ref) = running.remove(&request.consensus_id) {
// Relay the message to the correct actor.
let msg = VoteResult {
request_id: request.id,
key: request.key,
result: ConsensusVote::Commit(timestamp),
};
if let Err(..) = actor_ref.send(msg) {
warn!("failed to send to consensus actor for committed request: request_id={}, consensus_id={}",
request.id, request.consensus_id);
}
} else {
warn!("can't find consensus actor for committed request: request_id={}, consensus_id={}",
request.id, request.consensus_id);
}
}
Operation::RemoveBlob => {
let consensus_id = request.consensus_id;
if let Some(actor_ref) = running.get(&consensus_id) {
warn!(
"received conflicting consensus ids, stopping both: consensus_id={}",
consensus_id
);
let msg = VoteResult {
request_id: request.id,
key: request.key, // NOTE: this is the wrong key.
result: ConsensusVote::Abort,
};
// If we fail to send the actor already stopped, so that's
// fine.
let _ = actor_ref.send(msg);
return;
}
let responder = RpcResponder {
id: request.id,
actor_ref: ctx.actor_ref(),
phase: ConsensusPhase::init(),
};
debug!(
"participant dispatcher starting remove blob consensus actor: request={:?}, response={:?}",
request, responder
);
let consensus = consensus::remove_blob_actor as fn(_, _, _, _, _, _) -> _;
let actor_ref = ctx.spawn(
|err| {
warn!("remove blob consensus actor failed: {}", err);
SupervisorStrategy::Stop
},
consensus,
(
db_ref.clone(),
peers.clone(),
*remote,
request.key,
responder,
),
ActorOptions::default().mark_ready(),
);
// Checked above that we don't have duplicates.
let _ = running.insert(consensus_id, actor_ref);
}
Operation::CommitRemoveBlob(timestamp) => {
if let Some(actor_ref) = running.get(&request.consensus_id) {
// Relay the message to the correct actor.
let msg = VoteResult {
request_id: request.id,
key: request.key,
result: ConsensusVote::Commit(timestamp),
};
if let Err(..) = actor_ref.send(msg) {
warn!("failed to send to consensus actor for commit request: request_id={}, consensus_id={}",
request.id, request.consensus_id);
// In case we fail we send ourself a message to relay to
// the coordinator that the actor failed.
let response = Response {
request_id: request.id,
vote: ConsensusVote::Fail,
};
// We can always send ourselves a message.
ctx.actor_ref().send(response).unwrap();
}
} else {
warn!("can't find consensus actor for commit request: request_id={}, consensus_id={}",
request.id, request.consensus_id);
let response = Response {
request_id: request.id,
vote: ConsensusVote::Fail,
};
// We can always send ourselves a message.
ctx.actor_ref().send(response).unwrap();
}
}
Operation::AbortRemoveBlob => {
if let Some(actor_ref) = running.remove(&request.consensus_id) {
// Relay the message to the correct actor.
let msg = VoteResult {
request_id: request.id,
key: request.key,
result: ConsensusVote::Abort,
};
if let Err(..) = actor_ref.send(msg) {
warn!("failed to send to consensus actor for abort request: request_id={}, consensus_id={}",
request.id, request.consensus_id);
// In case we fail we send ourself a message to relay to
// the coordinator that the actor failed.
let response = Response {
request_id: request.id,
vote: ConsensusVote::Fail,
};
// We can always send ourselves a message.
ctx.actor_ref().send(response).unwrap();
}
} else {
warn!("can't find consensus actor for abort request: request_id={}, consensus_id={}",
request.id, request.consensus_id);
let response = Response {
request_id: request.id,
vote: ConsensusVote::Fail,
};
// We can always send ourselves a message.
ctx.actor_ref().send(response).unwrap();
}
}
Operation::RemoveCommitted(timestamp) => {
if let Some(actor_ref) = running.remove(&request.consensus_id) {
// Relay the message to the correct actor.
let msg = VoteResult {
request_id: request.id,
key: request.key,
result: ConsensusVote::Commit(timestamp),
};
if let Err(..) = actor_ref.send(msg) {
warn!("failed to send to consensus actor for committed request: request_id={}, consensus_id={}",
request.id, request.consensus_id);
}
} else {
warn!("can't find consensus actor for committed request: request_id={}, consensus_id={}",
request.id, request.consensus_id);
}
}
}
}

Its basically a large match statement turned in to a 300 line monster function.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant