Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 64 additions & 100 deletions linera-execution/src/execution_state_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,13 +387,8 @@ where
http_responses_are_oracle_responses,
callback,
} => {
let response = if let Some(response) =
self.txn_tracker.next_replayed_oracle_response()?
{
match response {
OracleResponse::Http(response) => response,
_ => return Err(ExecutionError::OracleResponseMismatch),
}
let maybe_response = if self.txn_tracker.is_replaying() {
Copy link
Copy Markdown
Contributor

@deuszx deuszx Sep 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be a negation?

Suggested change
let maybe_response = if self.txn_tracker.is_replaying() {
let maybe_response = if !self.txn_tracker.is_replaying() {

Previously, the next_replayed_oracle_response would return:

If not in replay mode, None is returned,

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the None case, i.e. in non-replay mode, we queried the actual oracle.

That's still the same: In non-replay mode we query the oracle (the else clause), but in replay mode we set this to None because we will obtain the actual value from the replaying responses below.

This PR just merges both calls so you can't forget one, i.e. we don't obtain the Some value in replay mode yet.

Copy link
Copy Markdown
Contributor

@deuszx deuszx Sep 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more confusing than before IMO.

Previously, when next_replayed_oracle_response returned Some(Http(response)) we'd simply add that to add_oracle_response.

Now the add_oracle_response has new logic included that interprets None as "we should use a next entry from the oracle responses.

Maybe it'd be bit clearer if you didn't try to have single let response here and special case in the if { ... } and else { ... } clauses? Maybe

if self.txn_tracker.is_replaying() {
  self.txn_tracker.replay(callback)
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, maybe it's not much of an improvement after all. Let's do the removal part first: #4467

It's not in all call sites as simple as putting the response in the callback, so I think what you're suggesting would basically be the approach from before.

The case I'm worried about is that in one of the many places where we use oracles we forget to add the response in one of the two cases (or both).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe if we mark the callback as must_use and then make the code so that the only thing you can do with it is push via self.transaction_tracker?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, so replay either returns an oracle response and adds it to the outcomes, or it returns a must_use value that can only be used by later adding the response from the actual oracle to the outcomes.

Maybe I'll try that approach tomorrow.

None
} else {
let headers = request
.headers
Expand Down Expand Up @@ -439,13 +434,17 @@ where
.min(committee.policy().maximum_oracle_response_bytes);
}

self.receive_http_response(response, response_size_limit)
.await?
Some(OracleResponse::Http(
self.receive_http_response(response, response_size_limit)
.await?,
))
};

// Record the oracle response
self.txn_tracker
.add_oracle_response(OracleResponse::Http(response.clone()));
let response = match self.txn_tracker.add_oracle_response(maybe_response)? {
OracleResponse::Http(response) => response.clone(),
_ => return Err(ExecutionError::OracleResponseMismatch),
};

callback.respond(response);
}
Expand Down Expand Up @@ -513,25 +512,26 @@ where
}

ReadEvent { event_id, callback } => {
let event = match self.txn_tracker.next_replayed_oracle_response()? {
None => {
let event = self
.state
.context()
.extra()
.get_event(event_id.clone())
.await?;
event.ok_or(ExecutionError::EventsNotFound(vec![event_id.clone()]))?
}
Some(OracleResponse::Event(recorded_event_id, event))
if recorded_event_id == event_id =>
let maybe_response = if self.txn_tracker.is_replaying() {
None
} else {
let event = self
.state
.context()
.extra()
.get_event(event_id.clone())
.await?
.ok_or_else(|| ExecutionError::EventsNotFound(vec![event_id.clone()]))?;
Some(OracleResponse::Event(event_id.clone(), event))
};
let event = match self.txn_tracker.add_oracle_response(maybe_response)? {
OracleResponse::Event(recorded_event_id, event)
if *recorded_event_id == event_id =>
{
event
event.clone()
}
Some(_) => return Err(ExecutionError::OracleResponseMismatch),
_ => return Err(ExecutionError::OracleResponseMismatch),
};
self.txn_tracker
.add_oracle_response(OracleResponse::Event(event_id, event.clone()));
callback.respond(event);
}

Expand Down Expand Up @@ -598,44 +598,38 @@ where
query,
callback,
} => {
let response = match self.txn_tracker.next_replayed_oracle_response()? {
Some(OracleResponse::Service(bytes)) => bytes,
Some(_) => return Err(ExecutionError::OracleResponseMismatch),
None => {
let context = QueryContext {
chain_id: self.state.context().extra().chain_id(),
next_block_height,
local_time: self.txn_tracker.local_time(),
};
let QueryOutcome {
response,
operations,
} = Box::pin(self.state.query_user_application_with_deadline(
application_id,
context,
query,
deadline,
self.txn_tracker.created_blobs().clone(),
))
.await?;
ensure!(
operations.is_empty(),
ExecutionError::ServiceOracleQueryOperations(operations)
);
response
}
let maybe_response = if self.txn_tracker.is_replaying() {
None
} else {
let context = QueryContext {
chain_id: self.state.context().extra().chain_id(),
next_block_height,
local_time: self.txn_tracker.local_time(),
};
let QueryOutcome {
response,
operations,
} = Box::pin(self.state.query_user_application_with_deadline(
application_id,
context,
query,
deadline,
self.txn_tracker.created_blobs().clone(),
))
.await?;
ensure!(
operations.is_empty(),
ExecutionError::ServiceOracleQueryOperations(operations)
);
Some(OracleResponse::Service(response))
};
let response = match self.txn_tracker.add_oracle_response(maybe_response)? {
OracleResponse::Service(bytes) => bytes.clone(),
_ => return Err(ExecutionError::OracleResponseMismatch),
};
self.txn_tracker
.add_oracle_response(OracleResponse::Service(response.clone()));
callback.respond(response);
}

QueryService { response, callback } => {
self.txn_tracker
.add_oracle_response(OracleResponse::Service(response));
callback.respond(());
}

AddOutgoingMessage { message, callback } => {
self.txn_tracker.add_outgoing_message(message);
callback.respond(());
Expand All @@ -649,11 +643,6 @@ where
callback.respond(());
}

GetCreatedBlobs { callback } => {
let blobs = self.txn_tracker.created_blobs().clone();
callback.respond(blobs);
}

AssertBefore {
timestamp,
callback,
Expand Down Expand Up @@ -684,24 +673,17 @@ where
}

ValidationRound { round, callback } => {
let result_round =
if let Some(response) = self.txn_tracker.next_replayed_oracle_response()? {
match response {
OracleResponse::Round(round) => round,
_ => return Err(ExecutionError::OracleResponseMismatch),
}
} else {
round
};
self.txn_tracker
.add_oracle_response(OracleResponse::Round(result_round));
let maybe_response = if self.txn_tracker.is_replaying() {
None
} else {
Some(OracleResponse::Round(round))
};
let result_round = match self.txn_tracker.add_oracle_response(maybe_response)? {
OracleResponse::Round(round) => *round,
_ => return Err(ExecutionError::OracleResponseMismatch),
};
callback.respond(result_round);
}

AddOracleResponse { response, callback } => {
self.txn_tracker.add_oracle_response(response);
callback.respond(());
}
}

Ok(())
Expand Down Expand Up @@ -1221,13 +1203,6 @@ pub enum ExecutionRequest {
callback: Sender<Vec<u8>>,
},

QueryService {
#[debug(with = hex_debug)]
response: Vec<u8>,
#[debug(skip)]
callback: Sender<()>,
},

AddOutgoingMessage {
message: crate::OutgoingMessage,
#[debug(skip)]
Expand All @@ -1240,11 +1215,6 @@ pub enum ExecutionRequest {
callback: Sender<()>,
},

GetCreatedBlobs {
#[debug(skip)]
callback: Sender<std::collections::BTreeMap<BlobId, BlobContent>>,
},

AssertBefore {
timestamp: Timestamp,
#[debug(skip)]
Expand All @@ -1262,10 +1232,4 @@ pub enum ExecutionRequest {
#[debug(skip)]
callback: Sender<Option<u32>>,
},

AddOracleResponse {
response: OracleResponse,
#[debug(skip)]
callback: Sender<()>,
},
}
62 changes: 30 additions & 32 deletions linera-execution/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,17 +492,21 @@ where
stream_id: StreamId::system(EPOCH_STREAM_NAME),
index: epoch.0,
};
let bytes = match txn_tracker.next_replayed_oracle_response()? {
None => self.get_event(event_id.clone()).await?,
Some(OracleResponse::Event(recorded_event_id, bytes))
if recorded_event_id == event_id =>
let maybe_response = if txn_tracker.is_replaying() {
None
} else {
let event = self.get_event(event_id.clone()).await?;
Some(OracleResponse::Event(event_id.clone(), event))
};
let bytes = match txn_tracker.add_oracle_response(maybe_response)? {
OracleResponse::Event(recorded_event_id, bytes)
if *recorded_event_id == event_id =>
{
bytes
bytes.clone()
}
Some(_) => return Err(ExecutionError::OracleResponseMismatch),
_ => return Err(ExecutionError::OracleResponseMismatch),
};
let blob_id = BlobId::new(bcs::from_bytes(&bytes)?, BlobType::Committee);
txn_tracker.add_oracle_response(OracleResponse::Event(event_id, bytes));
let committee = bcs::from_bytes(self.read_blob_content(blob_id).await?.bytes())?;
self.blob_used(txn_tracker, blob_id).await?;
self.committees.get_mut().insert(epoch, committee);
Expand All @@ -522,16 +526,17 @@ where
stream_id: StreamId::system(REMOVED_EPOCH_STREAM_NAME),
index: epoch.0,
};
let bytes = match txn_tracker.next_replayed_oracle_response()? {
None => self.get_event(event_id.clone()).await?,
Some(OracleResponse::Event(recorded_event_id, bytes))
if recorded_event_id == event_id =>
{
bytes
}
Some(_) => return Err(ExecutionError::OracleResponseMismatch),
let maybe_response = if txn_tracker.is_replaying() {
None
} else {
let event = self.get_event(event_id.clone()).await?;
Some(OracleResponse::Event(event_id.clone(), event))
};
txn_tracker.add_oracle_response(OracleResponse::Event(event_id, bytes));
match txn_tracker.add_oracle_response(maybe_response)? {
OracleResponse::Event(recorded_event_id, _)
if *recorded_event_id == event_id => {}
_ => return Err(ExecutionError::OracleResponseMismatch),
}
}
UpdateStreams(streams) => {
let mut missing_events = Vec::new();
Expand Down Expand Up @@ -562,23 +567,16 @@ where
stream_id,
index,
};
match txn_tracker.next_replayed_oracle_response()? {
None => {
if !self
.context()
.extra()
.contains_event(event_id.clone())
.await?
{
missing_events.push(event_id);
continue;
}
}
Some(OracleResponse::EventExists(recorded_event_id))
if recorded_event_id == event_id => {}
Some(_) => return Err(ExecutionError::OracleResponseMismatch),
if !txn_tracker
.replay_oracle_response(OracleResponse::EventExists(event_id.clone()))?
&& !self
.context()
.extra()
.contains_event(event_id.clone())
.await?
{
missing_events.push(event_id);
}
txn_tracker.add_oracle_response(OracleResponse::EventExists(event_id));
}
ensure!(
missing_events.is_empty(),
Expand Down
34 changes: 28 additions & 6 deletions linera-execution/src/transaction_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,27 @@ impl TransactionTracker {
&self.blobs
}

pub fn add_oracle_response(&mut self, oracle_response: OracleResponse) {
self.oracle_responses.push(oracle_response);
/// Add an oracle response to the execution results, or take it from the replayed ones and
/// add it. The argument must be `None` if and only if `is_replaying`.
///
/// In either case, a reference to the oracle response is returned.
pub fn add_oracle_response(
&mut self,
oracle_response: Option<OracleResponse>,
) -> Result<&OracleResponse, ExecutionError> {
let response = if let Some(recorded_response) = self.next_replayed_oracle_response()? {
assert!(
oracle_response.is_none(),
"Trying to record oracle response in replay mode; this is a bug"
);
recorded_response
} else {
oracle_response.expect(
"Passed None into add_oracle_response, but not in replay mode; this is a bug",
)
};
self.oracle_responses.push(response);
Ok(self.oracle_responses.last().unwrap())
}

pub fn add_operation_result(&mut self, result: Option<Vec<u8>>) {
Expand Down Expand Up @@ -230,6 +249,11 @@ impl TransactionTracker {
.collect()
}

/// Returns whether we are executing in replay mode, with recorded oracle responses.
pub fn is_replaying(&self) -> bool {
self.replaying_oracle_responses.is_some()
}

/// Adds the oracle response to the record.
/// If replaying, it also checks that it matches the next replayed one and returns `true`.
pub fn replay_oracle_response(
Expand All @@ -245,7 +269,7 @@ impl TransactionTracker {
} else {
false
};
self.add_oracle_response(oracle_response);
self.oracle_responses.push(oracle_response);
Ok(replaying)
}

Expand All @@ -256,9 +280,7 @@ impl TransactionTracker {
///
/// In both cases, the value (returned or obtained from the oracle) must be recorded using
/// `add_oracle_response`.
pub fn next_replayed_oracle_response(
&mut self,
) -> Result<Option<OracleResponse>, ExecutionError> {
fn next_replayed_oracle_response(&mut self) -> Result<Option<OracleResponse>, ExecutionError> {
let Some(responses) = &mut self.replaying_oracle_responses else {
return Ok(None); // Not in replay mode.
};
Expand Down
Loading