From 92e5490dd50ffbba9f6f4da1c18103b4482c29ed Mon Sep 17 00:00:00 2001 From: Andreas Fackler Date: Tue, 2 Sep 2025 16:40:44 +0200 Subject: [PATCH 1/2] Make transaction tracker oracle methods harder to misuse. --- linera-execution/src/execution_state_actor.rs | 164 +++++++----------- linera-execution/src/system.rs | 62 ++++--- linera-execution/src/transaction_tracker.rs | 35 +++- 3 files changed, 123 insertions(+), 138 deletions(-) diff --git a/linera-execution/src/execution_state_actor.rs b/linera-execution/src/execution_state_actor.rs index 62662e8f9e7d..c6b239d26fec 100644 --- a/linera-execution/src/execution_state_actor.rs +++ b/linera-execution/src/execution_state_actor.rs @@ -387,13 +387,8 @@ where http_responses_are_oracle_responses, callback, } => { - let response = if let Some(response) = - self.txn_tracker.next_replayed_oracle_response()? - { - match response { - OracleResponse::Http(response) => response, - _ => return Err(ExecutionError::OracleResponseMismatch), - } + let maybe_response = if self.txn_tracker.is_replaying() { + None } else { let headers = request .headers @@ -439,13 +434,17 @@ where .min(committee.policy().maximum_oracle_response_bytes); } - self.receive_http_response(response, response_size_limit) - .await? + Some(OracleResponse::Http( + self.receive_http_response(response, response_size_limit) + .await?, + )) }; // Record the oracle response - self.txn_tracker - .add_oracle_response(OracleResponse::Http(response.clone())); + let response = match self.txn_tracker.add_oracle_response(maybe_response)? { + OracleResponse::Http(response) => response.clone(), + _ => return Err(ExecutionError::OracleResponseMismatch), + }; callback.respond(response); } @@ -513,25 +512,26 @@ where } ReadEvent { event_id, callback } => { - let event = match self.txn_tracker.next_replayed_oracle_response()? { - None => { - let event = self - .state - .context() - .extra() - .get_event(event_id.clone()) - .await?; - event.ok_or(ExecutionError::EventsNotFound(vec![event_id.clone()]))? - } - Some(OracleResponse::Event(recorded_event_id, event)) - if recorded_event_id == event_id => + let maybe_response = if self.txn_tracker.is_replaying() { + None + } else { + let event = self + .state + .context() + .extra() + .get_event(event_id.clone()) + .await? + .ok_or_else(|| ExecutionError::EventsNotFound(vec![event_id.clone()]))?; + Some(OracleResponse::Event(event_id.clone(), event)) + }; + let event = match self.txn_tracker.add_oracle_response(maybe_response)? { + OracleResponse::Event(recorded_event_id, event) + if *recorded_event_id == event_id => { - event + event.clone() } - Some(_) => return Err(ExecutionError::OracleResponseMismatch), + _ => return Err(ExecutionError::OracleResponseMismatch), }; - self.txn_tracker - .add_oracle_response(OracleResponse::Event(event_id, event.clone())); callback.respond(event); } @@ -598,44 +598,38 @@ where query, callback, } => { - let response = match self.txn_tracker.next_replayed_oracle_response()? { - Some(OracleResponse::Service(bytes)) => bytes, - Some(_) => return Err(ExecutionError::OracleResponseMismatch), - None => { - let context = QueryContext { - chain_id: self.state.context().extra().chain_id(), - next_block_height, - local_time: self.txn_tracker.local_time(), - }; - let QueryOutcome { - response, - operations, - } = Box::pin(self.state.query_user_application_with_deadline( - application_id, - context, - query, - deadline, - self.txn_tracker.created_blobs().clone(), - )) - .await?; - ensure!( - operations.is_empty(), - ExecutionError::ServiceOracleQueryOperations(operations) - ); - response - } + let maybe_response = if self.txn_tracker.is_replaying() { + None + } else { + let context = QueryContext { + chain_id: self.state.context().extra().chain_id(), + next_block_height, + local_time: self.txn_tracker.local_time(), + }; + let QueryOutcome { + response, + operations, + } = Box::pin(self.state.query_user_application_with_deadline( + application_id, + context, + query, + deadline, + self.txn_tracker.created_blobs().clone(), + )) + .await?; + ensure!( + operations.is_empty(), + ExecutionError::ServiceOracleQueryOperations(operations) + ); + Some(OracleResponse::Service(response)) + }; + let response = match self.txn_tracker.add_oracle_response(maybe_response)? { + OracleResponse::Service(bytes) => bytes.clone(), + _ => return Err(ExecutionError::OracleResponseMismatch), }; - self.txn_tracker - .add_oracle_response(OracleResponse::Service(response.clone())); callback.respond(response); } - QueryService { response, callback } => { - self.txn_tracker - .add_oracle_response(OracleResponse::Service(response)); - callback.respond(()); - } - AddOutgoingMessage { message, callback } => { self.txn_tracker.add_outgoing_message(message); callback.respond(()); @@ -649,11 +643,6 @@ where callback.respond(()); } - GetCreatedBlobs { callback } => { - let blobs = self.txn_tracker.created_blobs().clone(); - callback.respond(blobs); - } - AssertBefore { timestamp, callback, @@ -684,24 +673,17 @@ where } ValidationRound { round, callback } => { - let result_round = - if let Some(response) = self.txn_tracker.next_replayed_oracle_response()? { - match response { - OracleResponse::Round(round) => round, - _ => return Err(ExecutionError::OracleResponseMismatch), - } - } else { - round - }; - self.txn_tracker - .add_oracle_response(OracleResponse::Round(result_round)); + let maybe_response = if self.txn_tracker.is_replaying() { + None + } else { + Some(OracleResponse::Round(round)) + }; + let result_round = match self.txn_tracker.add_oracle_response(maybe_response)? { + OracleResponse::Round(round) => *round, + _ => return Err(ExecutionError::OracleResponseMismatch), + }; callback.respond(result_round); } - - AddOracleResponse { response, callback } => { - self.txn_tracker.add_oracle_response(response); - callback.respond(()); - } } Ok(()) @@ -1221,13 +1203,6 @@ pub enum ExecutionRequest { callback: Sender>, }, - QueryService { - #[debug(with = hex_debug)] - response: Vec, - #[debug(skip)] - callback: Sender<()>, - }, - AddOutgoingMessage { message: crate::OutgoingMessage, #[debug(skip)] @@ -1240,11 +1215,6 @@ pub enum ExecutionRequest { callback: Sender<()>, }, - GetCreatedBlobs { - #[debug(skip)] - callback: Sender>, - }, - AssertBefore { timestamp: Timestamp, #[debug(skip)] @@ -1262,10 +1232,4 @@ pub enum ExecutionRequest { #[debug(skip)] callback: Sender>, }, - - AddOracleResponse { - response: OracleResponse, - #[debug(skip)] - callback: Sender<()>, - }, } diff --git a/linera-execution/src/system.rs b/linera-execution/src/system.rs index 9b1b82313d8a..f94836f2adc7 100644 --- a/linera-execution/src/system.rs +++ b/linera-execution/src/system.rs @@ -492,17 +492,21 @@ where stream_id: StreamId::system(EPOCH_STREAM_NAME), index: epoch.0, }; - let bytes = match txn_tracker.next_replayed_oracle_response()? { - None => self.get_event(event_id.clone()).await?, - Some(OracleResponse::Event(recorded_event_id, bytes)) - if recorded_event_id == event_id => + let maybe_response = if txn_tracker.is_replaying() { + None + } else { + let event = self.get_event(event_id.clone()).await?; + Some(OracleResponse::Event(event_id.clone(), event)) + }; + let bytes = match txn_tracker.add_oracle_response(maybe_response)? { + OracleResponse::Event(recorded_event_id, bytes) + if *recorded_event_id == event_id => { - bytes + bytes.clone() } - Some(_) => return Err(ExecutionError::OracleResponseMismatch), + _ => return Err(ExecutionError::OracleResponseMismatch), }; let blob_id = BlobId::new(bcs::from_bytes(&bytes)?, BlobType::Committee); - txn_tracker.add_oracle_response(OracleResponse::Event(event_id, bytes)); let committee = bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?; self.blob_used(txn_tracker, blob_id).await?; self.committees.get_mut().insert(epoch, committee); @@ -522,16 +526,17 @@ where stream_id: StreamId::system(REMOVED_EPOCH_STREAM_NAME), index: epoch.0, }; - let bytes = match txn_tracker.next_replayed_oracle_response()? { - None => self.get_event(event_id.clone()).await?, - Some(OracleResponse::Event(recorded_event_id, bytes)) - if recorded_event_id == event_id => - { - bytes - } - Some(_) => return Err(ExecutionError::OracleResponseMismatch), + let maybe_response = if txn_tracker.is_replaying() { + None + } else { + let event = self.get_event(event_id.clone()).await?; + Some(OracleResponse::Event(event_id.clone(), event)) }; - txn_tracker.add_oracle_response(OracleResponse::Event(event_id, bytes)); + match txn_tracker.add_oracle_response(maybe_response)? { + OracleResponse::Event(recorded_event_id, _) + if *recorded_event_id == event_id => {} + _ => return Err(ExecutionError::OracleResponseMismatch), + } } UpdateStreams(streams) => { let mut missing_events = Vec::new(); @@ -562,23 +567,16 @@ where stream_id, index, }; - match txn_tracker.next_replayed_oracle_response()? { - None => { - if !self - .context() - .extra() - .contains_event(event_id.clone()) - .await? - { - missing_events.push(event_id); - continue; - } - } - Some(OracleResponse::EventExists(recorded_event_id)) - if recorded_event_id == event_id => {} - Some(_) => return Err(ExecutionError::OracleResponseMismatch), + if !txn_tracker + .replay_oracle_response(OracleResponse::EventExists(event_id.clone()))? + && !self + .context() + .extra() + .contains_event(event_id.clone()) + .await? + { + missing_events.push(event_id); } - txn_tracker.add_oracle_response(OracleResponse::EventExists(event_id)); } ensure!( missing_events.is_empty(), diff --git a/linera-execution/src/transaction_tracker.rs b/linera-execution/src/transaction_tracker.rs index e3e67ddde704..e029eab9f3c9 100644 --- a/linera-execution/src/transaction_tracker.rs +++ b/linera-execution/src/transaction_tracker.rs @@ -166,8 +166,27 @@ impl TransactionTracker { &self.blobs } - pub fn add_oracle_response(&mut self, oracle_response: OracleResponse) { - self.oracle_responses.push(oracle_response); + /// Add an oracle response to the execution results, or take it from the replayed ones and + /// add it. The argument must be `None` if and only if `is_replaying`. + /// + /// In either case, a reference to the oracle response is returned. + pub fn add_oracle_response( + &mut self, + oracle_response: Option, + ) -> Result<&OracleResponse, ExecutionError> { + let response = if let Some(recorded_response) = self.next_replayed_oracle_response()? { + assert!( + oracle_response.is_none(), + "Trying to record oracle response in replay mode; this is a bug" + ); + recorded_response + } else { + oracle_response.expect( + "Passed None into add_oracle_response, but not in replay mode; this is a bug", + ) + }; + self.oracle_responses.push(response); + Ok(self.oracle_responses.last().unwrap()) } pub fn add_operation_result(&mut self, result: Option>) { @@ -230,6 +249,11 @@ impl TransactionTracker { .collect() } + /// Returns whether we are executing in replay mode, with recorded oracle responses. + pub fn is_replaying(&self) -> bool { + self.replaying_oracle_responses.is_some() + } + /// Adds the oracle response to the record. /// If replaying, it also checks that it matches the next replayed one and returns `true`. pub fn replay_oracle_response( @@ -241,11 +265,12 @@ impl TransactionTracker { recorded_response == oracle_response, ExecutionError::OracleResponseMismatch ); + self.oracle_responses.push(recorded_response); true } else { + self.oracle_responses.push(oracle_response); false }; - self.add_oracle_response(oracle_response); Ok(replaying) } @@ -256,9 +281,7 @@ impl TransactionTracker { /// /// In both cases, the value (returned or obtained from the oracle) must be recorded using /// `add_oracle_response`. - pub fn next_replayed_oracle_response( - &mut self, - ) -> Result, ExecutionError> { + fn next_replayed_oracle_response(&mut self) -> Result, ExecutionError> { let Some(responses) = &mut self.replaying_oracle_responses else { return Ok(None); // Not in replay mode. }; From 160aae7ff5e3f788cd12b45bc96adea2ff1791a6 Mon Sep 17 00:00:00 2001 From: Andreas Fackler Date: Tue, 2 Sep 2025 17:51:34 +0200 Subject: [PATCH 2/2] Deduplicate push call. --- linera-execution/src/transaction_tracker.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/linera-execution/src/transaction_tracker.rs b/linera-execution/src/transaction_tracker.rs index e029eab9f3c9..ac7804769fe8 100644 --- a/linera-execution/src/transaction_tracker.rs +++ b/linera-execution/src/transaction_tracker.rs @@ -265,12 +265,11 @@ impl TransactionTracker { recorded_response == oracle_response, ExecutionError::OracleResponseMismatch ); - self.oracle_responses.push(recorded_response); true } else { - self.oracle_responses.push(oracle_response); false }; + self.oracle_responses.push(oracle_response); Ok(replaying) }