Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions mm2src/mm2_bitcoin/rpc/src/v1/types/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use primitives::hash::H256 as GlobalH256;
use primitives::hash::H264 as GlobalH264;
use serde;
use serde::de::Unexpected;
use serde::ser::SerializeSeq;
use std::cmp::Ordering;
use std::fmt;
use std::hash::{Hash, Hasher};
Expand All @@ -17,6 +18,60 @@ macro_rules! impl_hash {

impl $name {
pub const fn const_default() -> $name { $name([0; $size]) }

pub fn serialize_to_byte_seq<S>(value: &Self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut seq = serializer.serialize_seq(Some(value.0.len()))?;
for byte in &value.0 {
seq.serialize_element(byte)?;
}
seq.end()
}

pub fn deserialize_from_bytes<'de, D>(deserializer: D) -> Result<$name, D::Error>
where
D: serde::Deserializer<'de>,
{
struct BytesVisitor;
impl<'de> serde::de::Visitor<'de> for BytesVisitor {
type Value = $name;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
write!(formatter, "a byte array or sequence of length {}", $size)
}

fn visit_bytes<E>(self, v: &[u8]) -> Result<$name, E>
where
E: serde::de::Error,
{
if v.len() != $size {
return Err(E::invalid_length(v.len(), &self));
}
let mut arr = [0u8; $size];
arr.copy_from_slice(v);
Ok($name(arr))
}

fn visit_seq<A>(self, mut seq: A) -> Result<$name, A::Error>
where
A: serde::de::SeqAccess<'de>,
{
let mut vec = Vec::with_capacity($size);
while let Some(elem) = seq.next_element()? {
vec.push(elem);
}
if vec.len() != $size {
return Err(serde::de::Error::invalid_length(vec.len(), &self));
}
let mut arr = [0u8; $size];
arr.copy_from_slice(&vec);
Ok($name(arr))
}
}
deserializer.deserialize_any(BytesVisitor)
}
}

impl Default for $name {
Expand Down
8 changes: 6 additions & 2 deletions mm2src/mm2_event_stream/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,12 +314,16 @@ impl StreamingManager {
/// In this case, we need to remove the streamer and de-list it from all clients.
fn remove_streamer_if_down(&self, streamer_id: &str) {
let mut this = self.write();
let Some(streamer_info) = this.streamers.get(streamer_id) else { return };
let Some(streamer_info) = this.streamers.get(streamer_id) else {
return;
};
if !streamer_info.is_down() {
return;
}
// Remove the streamer from our registry.
let Some(streamer_info) = this.streamers.remove(streamer_id) else { return };
let Some(streamer_info) = this.streamers.remove(streamer_id) else {
return;
};
// And remove the streamer from all clients listening to it.
for client_id in streamer_info.clients {
if let Some(info) = this.clients.get_mut(&client_id) {
Expand Down
94 changes: 94 additions & 0 deletions mm2src/mm2_main/src/lp_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,10 @@ pub struct NegotiationDataV1 {
started_at: u64,
payment_locktime: u64,
secret_hash: [u8; 20],
#[serde(
deserialize_with = "H264::deserialize_from_bytes",
serialize_with = "H264::serialize_to_byte_seq"
)]
persistent_pubkey: H264,
}

Expand All @@ -854,6 +858,10 @@ pub struct NegotiationDataV2 {
started_at: u64,
payment_locktime: u64,
secret_hash: Vec<u8>,
#[serde(
deserialize_with = "H264::deserialize_from_bytes",
serialize_with = "H264::serialize_to_byte_seq"
)]
persistent_pubkey: H264,
maker_coin_swap_contract: Vec<u8>,
taker_coin_swap_contract: Vec<u8>,
Expand All @@ -866,7 +874,15 @@ pub struct NegotiationDataV3 {
secret_hash: Vec<u8>,
maker_coin_swap_contract: Vec<u8>,
taker_coin_swap_contract: Vec<u8>,
#[serde(
deserialize_with = "H264::deserialize_from_bytes",
serialize_with = "H264::serialize_to_byte_seq"
)]
maker_coin_htlc_pub: H264,
#[serde(
deserialize_with = "H264::deserialize_from_bytes",
serialize_with = "H264::serialize_to_byte_seq"
)]
taker_coin_htlc_pub: H264,
}

Expand Down Expand Up @@ -2419,4 +2435,82 @@ mod lp_swap_tests {

assert_eq!(testcoin_taker_fee * MmNumber::from("0.90"), mycoin_taker_fee);
}

#[test]
fn test_legacy_new_negotiation_rmp() {
// In legacy messages, persistent_pubkey was represented as Vec<u8> instead of H264.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
struct LegacyNegotiationDataV2 {
started_at: u64,
payment_locktime: u64,
secret_hash: Vec<u8>,
persistent_pubkey: Vec<u8>,
maker_coin_swap_contract: Vec<u8>,
taker_coin_swap_contract: Vec<u8>,
}

let legacy_instance = LegacyNegotiationDataV2 {
started_at: 1620000000,
payment_locktime: 1620003600,
secret_hash: vec![0u8; 20],
persistent_pubkey: vec![1u8; 33],
maker_coin_swap_contract: vec![1u8; 20],
taker_coin_swap_contract: vec![1u8; 20],
};

// ------------------------------------------
// Step 1: Test Deserialization from Legacy Format
// ------------------------------------------
let legacy_serialized =
rmp_serde::to_vec_named(&legacy_instance).expect("Legacy MessagePack serialization failed");
let new_instance: NegotiationDataV2 =
rmp_serde::from_slice(&legacy_serialized).expect("Deserialization into new NegotiationDataV2 failed");

assert_eq!(new_instance.started_at, legacy_instance.started_at);
assert_eq!(new_instance.payment_locktime, legacy_instance.payment_locktime);
assert_eq!(new_instance.secret_hash, legacy_instance.secret_hash);
assert_eq!(
new_instance.persistent_pubkey.0.to_vec(),
legacy_instance.persistent_pubkey
);
assert_eq!(
new_instance.maker_coin_swap_contract,
legacy_instance.maker_coin_swap_contract
);
assert_eq!(
new_instance.taker_coin_swap_contract,
legacy_instance.taker_coin_swap_contract
);

// ------------------------------------------
// Step 2: Test Serialization from New Format to Legacy Format
// ------------------------------------------
let new_serialized = rmp_serde::to_vec_named(&new_instance).expect("Serialization of new type failed");
let legacy_from_new: LegacyNegotiationDataV2 =
rmp_serde::from_slice(&new_serialized).expect("Legacy deserialization from new serialization failed");

assert_eq!(legacy_from_new.started_at, new_instance.started_at);
assert_eq!(legacy_from_new.payment_locktime, new_instance.payment_locktime);
assert_eq!(legacy_from_new.secret_hash, new_instance.secret_hash);
assert_eq!(
legacy_from_new.persistent_pubkey,
new_instance.persistent_pubkey.0.to_vec()
);
assert_eq!(
legacy_from_new.maker_coin_swap_contract,
new_instance.maker_coin_swap_contract
);
assert_eq!(
legacy_from_new.taker_coin_swap_contract,
new_instance.taker_coin_swap_contract
);

// ------------------------------------------
// Step 3: Round-Trip Test of the New Format
// ------------------------------------------
let rt_serialized = rmp_serde::to_vec_named(&new_instance).expect("Round-trip serialization failed");
let round_trip: NegotiationDataV2 =
rmp_serde::from_slice(&rt_serialized).expect("Round-trip deserialization failed");
assert_eq!(round_trip, new_instance);
}
}
2 changes: 1 addition & 1 deletion mm2src/mm2_main/src/lp_swap/maker_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ impl TakerNegotiationData {
pub struct MakerSwapData {
pub taker_coin: String,
pub maker_coin: String,
#[serde(alias = "taker")]
#[serde(rename = "taker")]
pub taker_pubkey: H256Json,
pub secret: H256Json,
pub secret_hash: Option<BytesJson>,
Expand Down
2 changes: 1 addition & 1 deletion mm2src/mm2_main/src/lp_swap/taker_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ pub async fn run_taker_swap(swap: RunTakerSwapInput, ctx: MmArc) {
pub struct TakerSwapData {
pub taker_coin: String,
pub maker_coin: String,
#[serde(alias = "maker")]
#[serde(rename = "maker")]
pub maker_pubkey: H256Json,
pub my_persistent_pub: H264Json,
pub lock_duration: u64,
Expand Down
2 changes: 1 addition & 1 deletion mm2src/mm2_net/src/event_streaming/sse_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub async fn handle_sse(request: Request<Body>, ctx_h: u32) -> Response<Body> {

let event_stream_manager = ctx.event_stream_manager.clone();
let Ok(mut rx) = event_stream_manager.new_client(client_id) else {
return handle_internal_error("ID already in use".to_string()).await
return handle_internal_error("ID already in use".to_string()).await;
};
let body = Body::wrap_stream(async_stream::stream! {
while let Some(event) = rx.recv().await {
Expand Down