Skip to content

Commit

Permalink
aligning with node changes (#382)
Browse files Browse the repository at this point in the history
* aligning with node changes

---------

Co-authored-by: Jakub Zajkowski <[email protected]>
  • Loading branch information
zajko and Jakub Zajkowski authored Dec 18, 2024
1 parent 3597922 commit b8d19f7
Show file tree
Hide file tree
Showing 19 changed files with 153 additions and 122 deletions.
145 changes: 80 additions & 65 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion event_sidecar/src/event_stream_server/sse_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ use casper_event_types::{
sse_data::{EventFilter, SseData},
Filter as SseFilter,
};
use casper_types::{ProtocolVersion, Transaction};
use casper_types::ProtocolVersion;
#[cfg(test)]
use casper_types::Transaction;
use futures::{future, Stream, StreamExt};
use http::StatusCode;
use hyper::Body;
#[cfg(test)]
use serde::Serialize;
#[cfg(test)]
use serde_json::Value;
Expand Down Expand Up @@ -97,6 +100,7 @@ type UrlProps = (
IsLegacyFilter,
);

#[cfg(test)]
#[derive(Serialize)]
#[serde(rename_all = "PascalCase")]
pub(super) struct TransactionAccepted {
Expand Down
4 changes: 2 additions & 2 deletions event_sidecar/src/event_stream_server/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,11 +598,11 @@ async fn fetch_text(
///
/// The expected order is:
/// * data:<JSON-encoded ApiVersion> (note, no ID line follows this first event)
/// then the following three repeated for as many events as are applicable to that stream:
/// then the following three repeated for as many events as are applicable to that stream:
/// * data:<JSON-encoded event>
/// * id:<integer>
/// * empty line
/// then finally, repeated keepalive lines until the server is shut down.
/// then finally, repeated keepalive lines until the server is shut down.
#[allow(clippy::too_many_lines)]
fn parse_response(response_text: String, client_id: &str) -> Vec<ReceivedEvent> {
let mut received_events = Vec::new();
Expand Down
4 changes: 0 additions & 4 deletions event_sidecar/src/testing/simple_sse_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ pub(crate) mod tests {
pub routes: HashMap<Vec<String>, CacheAndData>,
}

#[derive(Debug)]
struct Nope;
impl warp::reject::Reject for Nope {}

type ShutdownCallbacks = Arc<Mutex<Vec<broadcast::Sender<Option<(Option<String>, String)>>>>>;

impl SimpleSseServer {
Expand Down
26 changes: 23 additions & 3 deletions event_sidecar/src/types/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,29 @@ pub enum DatabaseWriteError {
Unhandled(anyhow::Error),
}

impl ToString for DatabaseWriteError {
fn to_string(&self) -> String {
format!("{:?}", self)
impl Display for DatabaseWriteError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
DatabaseWriteError::Serialisation(error) => {
write!(f, "DatabaseWriteError::Serialisation: {}", error)
}
DatabaseWriteError::SqlConstruction(error) => {
write!(f, "DatabaseWriteError::SqlConstruction: {}", error)
}
DatabaseWriteError::UniqueConstraint(unique_constraint_error) => {
write!(
f,
"DatabaseWriteError::UniqueConstraint: table: {}, error: {}",
unique_constraint_error.table, unique_constraint_error.error
)
}
DatabaseWriteError::Database(error) => {
write!(f, "DatabaseWriteError::Database: {}", error)
}
DatabaseWriteError::Unhandled(error) => {
write!(f, "DatabaseWriteError::Unhandled: {}", error)
}
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions event_sidecar/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ pub mod tests {
let infix_str = infix.as_str();
data.iter().any(|x| x.contains(infix_str))
}

#[allow(dead_code)]
pub struct MockNodeTestProperties {
pub testing_config: TestingConfig,
pub temp_storage_dir: TempDir,
Expand Down
1 change: 1 addition & 0 deletions json_rpc/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ impl Request {
/// * `allow_unknown_fields` is `false` and extra fields exist
///
/// Returns a `Rejection` if the "id" field is `None`.
#[allow(clippy::result_large_err)]
pub(super) fn new(
mut request: Map<String, Value>,
allow_unknown_fields: bool,
Expand Down
1 change: 1 addition & 0 deletions json_rpc/src/request/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub enum Params {
}

impl Params {
#[allow(clippy::result_large_err)]
pub(super) fn try_from(request_id: &Value, params: Value) -> Result<Self, ErrorOrRejection> {
let err_invalid_request = |additional_info: &str| {
let error = Error::new(ReservedErrorCode::InvalidRequest, additional_info);
Expand Down
1 change: 0 additions & 1 deletion listener/src/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ impl DefaultConnectionManagerBuilder {
impl DefaultConnectionManager {
/// Start handling traffic from nodes endpoint. This function is blocking, it will return a
/// ConnectionManagerError result if something went wrong while processing.
async fn connect(
&mut self,
) -> Result<Pin<Box<dyn Stream<Item = EventResult> + Send + 'static>>, ConnectionManagerError>
Expand Down
13 changes: 5 additions & 8 deletions listener/src/sse_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use eventsource_stream::{Event, EventStream, EventStreamError, Eventsource};
use futures::StreamExt;
use reqwest::Client;
use std::pin::Pin;
use std::{fmt::Debug, sync::Arc, time::Duration};
use std::{fmt::Debug, time::Duration};
use tokio::select;
use tokio_stream::Stream;
use tracing::debug;
Expand All @@ -17,7 +17,7 @@ use url::Url;
#[derive(Clone, Debug)]
pub enum SseDataStreamingError {
NoDataTimeout(),
ConnectionError(Arc<Error>),
ConnectionError(),
}

pub type EventResult = Result<Event, EventStreamError<SseDataStreamingError>>;
Expand Down Expand Up @@ -84,8 +84,8 @@ impl SseConnection {
monitor.tick().await;
yield Ok(bytes);
},
Err(err) => {
yield Err(SseDataStreamingError::ConnectionError(Arc::new(Error::from(err))));
Err(_) => {
yield Err(SseDataStreamingError::ConnectionError());
break;
}
}
Expand Down Expand Up @@ -177,7 +177,6 @@ pub mod tests {
use std::{
convert::Infallible,
pin::Pin,
sync::Arc,
time::{Duration, Instant},
};
use tokio::sync::mpsc::channel;
Expand Down Expand Up @@ -225,9 +224,7 @@ pub mod tests {
}

pub fn build_failing_on_message() -> Self {
let e = SseDataStreamingError::ConnectionError(Arc::new(Error::msg(
"Some error on message",
)));
let e = SseDataStreamingError::ConnectionError();
MockSseConnection {
data: vec![],
failure_on_connection: None,
Expand Down
10 changes: 2 additions & 8 deletions listener/src/version_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,17 +125,11 @@ fn try_resolve_version(raw_response: &Value) -> Result<ProtocolVersion, Error> {
let raw = build_version_value
.as_str()
.context("build_version_value should be a string")
.map_err(|e| {
count_error("version_value_not_a_string");
e
})?
.inspect_err(|_| count_error("version_value_not_a_string"))?
.split('-')
.next()
.context("splitting build_version_value should always return at least one slice")
.map_err(|e| {
count_error("incomprehensible_build_version_form");
e
})?;
.inspect_err(|_| count_error("incomprehensible_build_version_form"))?;
ProtocolVersion::from_str(raw).map_err(|error| {
count_error("failed_parsing_protocol_version");
anyhow!("failed parsing build version from '{}': {}", raw, error)
Expand Down
8 changes: 5 additions & 3 deletions metrics/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::fmt::{Display, Formatter};

use once_cell::sync::Lazy;
use prometheus::{IntCounterVec, Opts, Registry};

Expand Down Expand Up @@ -25,9 +27,9 @@ pub struct MetricCollectionError {
reason: String,
}

impl ToString for MetricCollectionError {
fn to_string(&self) -> String {
format!("MetricCollectionError: {}", self.reason)
impl Display for MetricCollectionError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "MetricCollectionError: {}", self.reason)
}
}

Expand Down
2 changes: 1 addition & 1 deletion metrics/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub fn inc_method_call(method: &str) {
pub fn observe_response_time(method: &str, status: &str, response_time: Duration) {
let response_time = response_time.as_secs_f64() * 1000.0;
RESPONSE_TIMES_MS
.with_label_values(&[method, &status])
.with_label_values(&[method, status])
.observe(response_time);
}

Expand Down
14 changes: 7 additions & 7 deletions resources/test/rpc_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -5511,10 +5511,10 @@
"description": "A version 1 transfer.",
"type": "object",
"required": [
"LegacyTransfer"
"Transfer"
],
"properties": {
"LegacyTransfer": {
"Transfer": {
"$ref": "#/components/schemas/TransferV1"
}
},
Expand Down Expand Up @@ -5683,14 +5683,14 @@
"additionalProperties": false
},
{
"description": "A reservation record.",
"description": "A prepayment record.",
"type": "object",
"required": [
"Prepaid"
"Prepayment"
],
"properties": {
"Prepaid": {
"$ref": "#/components/schemas/PrepaidKind"
"Prepayment": {
"$ref": "#/components/schemas/PrepaymentKind"
}
},
"additionalProperties": false
Expand Down Expand Up @@ -6560,7 +6560,7 @@
}
}
},
"PrepaidKind": {
"PrepaymentKind": {
"description": "Container for bytes recording location, type and data for a gas pre payment",
"type": "object",
"required": [
Expand Down
14 changes: 7 additions & 7 deletions resources/test/speculative_rpc_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1543,10 +1543,10 @@
"description": "A version 1 transfer.",
"type": "object",
"required": [
"LegacyTransfer"
"Transfer"
],
"properties": {
"LegacyTransfer": {
"Transfer": {
"$ref": "#/components/schemas/TransferV1"
}
},
Expand Down Expand Up @@ -1715,14 +1715,14 @@
"additionalProperties": false
},
{
"description": "A reservation record.",
"description": "A prepayment record.",
"type": "object",
"required": [
"Prepaid"
"Prepayment"
],
"properties": {
"Prepaid": {
"$ref": "#/components/schemas/PrepaidKind"
"Prepayment": {
"$ref": "#/components/schemas/PrepaymentKind"
}
},
"additionalProperties": false
Expand Down Expand Up @@ -3529,7 +3529,7 @@
}
}
},
"PrepaidKind": {
"PrepaymentKind": {
"description": "Container for bytes recording location, type and data for a gas pre payment",
"type": "object",
"required": [
Expand Down
14 changes: 7 additions & 7 deletions rpc_sidecar/src/node_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,9 +520,9 @@ pub enum InvalidTransactionOrDeploy {
/// Entry point cannot be 'call'
#[error("entry point cannot be 'call'")]
InvalidTransactionEntryPointCannotBeCall,
/// Invalid transaction kind
#[error("invalid transaction kind")]
InvalidTransactionInvalidTransactionKind,
/// Invalid transaction lane
#[error("invalid transaction lane")]
InvalidTransactionInvalidTransactionLane,
}

impl From<ErrorCode> for InvalidTransactionOrDeploy {
Expand Down Expand Up @@ -627,8 +627,8 @@ impl From<ErrorCode> for InvalidTransactionOrDeploy {
ErrorCode::InvalidTransactionEntryPointCannotBeCall => {
Self::InvalidTransactionEntryPointCannotBeCall
}
ErrorCode::InvalidTransactionInvalidTransactionKind => {
Self::InvalidTransactionInvalidTransactionKind
ErrorCode::InvalidTransactionInvalidTransactionLane => {
Self::InvalidTransactionInvalidTransactionLane
}
ErrorCode::InvalidTransactionUnspecified => Self::TransactionUnspecified,
ErrorCode::InvalidTransactionOrDeployUnspecified => {
Expand Down Expand Up @@ -766,7 +766,7 @@ impl Error {
| ErrorCode::DeployMissingTransferTarget
| ErrorCode::DeployMissingModuleBytes
| ErrorCode::InvalidTransactionEntryPointCannotBeCall
| ErrorCode::InvalidTransactionInvalidTransactionKind
| ErrorCode::InvalidTransactionInvalidTransactionLane
| ErrorCode::InvalidTransactionUnspecified
| ErrorCode::InvalidTransactionOrDeployUnspecified),
) => Self::InvalidTransaction(InvalidTransactionOrDeploy::from(err)),
Expand Down Expand Up @@ -1206,7 +1206,7 @@ where
#[derive(Clone, Copy, Debug)]
pub(crate) struct ErrFormatter<'a, T>(pub &'a T);

impl<'a, T> Display for ErrFormatter<'a, T>
impl<T> Display for ErrFormatter<'_, T>
where
T: std::error::Error,
{
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[toolchain]
channel = "1.77.2"
channel = "1.83.0"
components = [ "rustfmt", "clippy" ]
targets = [ "wasm32-unknown-unknown" ]
profile = "minimal"
4 changes: 2 additions & 2 deletions sidecar/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use config::{SidecarConfig, SidecarConfigTarget};
use run::run;
use std::{
env, fmt, io,
panic::{self, PanicInfo},
panic::{self, PanicHookInfo},
process::{self, ExitCode},
};
#[cfg(not(target_env = "msvc"))]
Expand Down Expand Up @@ -68,7 +68,7 @@ pub fn read_config(config_path: &str) -> Result<SidecarConfigTarget, Error> {
toml::from_str(&toml_content).context("Error parsing config into TOML format")
}

fn panic_hook(info: &PanicInfo) {
fn panic_hook(info: &PanicHookInfo) {
let backtrace = Backtrace::new();

eprintln!("{:?}", backtrace);
Expand Down
4 changes: 2 additions & 2 deletions types/src/legacy_sse_data/translate_execution_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ fn maybe_tanslate_stored_value(stored_value: &StoredValue) -> Option<TransformKi
StoredValue::ContractWasm(_) => Some(TransformKindV1::WriteContractWasm),
StoredValue::Contract(_) => Some(TransformKindV1::WriteContract),
StoredValue::ContractPackage(_) => Some(TransformKindV1::WriteContractPackage),
StoredValue::LegacyTransfer(transfer_v1) => {
StoredValue::Transfer(transfer_v1) => {
Some(TransformKindV1::WriteTransfer(transfer_v1.clone()))
}
StoredValue::DeployInfo(deploy_info) => {
Expand Down Expand Up @@ -162,7 +162,7 @@ fn maybe_tanslate_stored_value(stored_value: &StoredValue) -> Option<TransformKi
StoredValue::ByteCode(_) => None,
StoredValue::MessageTopic(_) => None,
StoredValue::Message(_) => None,
StoredValue::Prepaid(_) => None,
StoredValue::Prepayment(_) => None,
StoredValue::EntryPoint(_) => None,
StoredValue::RawBytes(_) => None,
}
Expand Down

0 comments on commit b8d19f7

Please sign in to comment.