Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem with connection forwarding #179

Open
Idk121-blip opened this issue Dec 3, 2024 · 8 comments
Open

Problem with connection forwarding #179

Idk121-blip opened this issue Dec 3, 2024 · 8 comments

Comments

@Idk121-blip
Copy link

Idk121-blip commented Dec 3, 2024

Hi, I have some problems related to connection forwarding and since i didn't find anything similar in the examples I'm writing my issue here.

The problem that I'm facing is related to the connection forwarding for one node. To put it simply if I have node A, B and C. If C sends a message to A in which it says to connect to B address, isReady of B will always be Some(false), and it will be impossible to send any message. Is it a normal behaviour? Is there something that I can do to solve this problem? Here is an example:

use crate::Message::{other, Message1};
use message_io::network::Transport::Ws;
use message_io::network::{Endpoint, NetEvent, SendStatus, Transport};
use message_io::node::{self, NodeHandler, NodeListener};
use serde::{Deserialize, Serialize};
use std::net::{IpAddr, SocketAddr, ToSocketAddrs};
use std::thread;
use std::thread::sleep;
use std::time::Duration;

#[derive(Serialize, Deserialize)]
pub enum Message {
  Message1(String),
  other,
}

struct test_forwarding {
  handler: NodeHandler<()>,
  listener: NodeListener<()>,
}
impl test_forwarding {
  pub fn new(string: String, string2: String) -> Self {
      let (handler, listener) = node::split();
      let addr = SocketAddr::new(
          IpAddr::V4(string.parse().unwrap()),
          string2.parse().unwrap(),
      );
      handler.network().listen(Ws, addr);
      if string2 == "8002" {
          let (e, s) = handler.network().connect(Ws, "127.0.0.1:8000").unwrap();
          println!("{:?}", handler.network().is_ready(e.resource_id()));
          sleep(Duration::from_secs(1));
          println!("{:?}", handler.network().is_ready(e.resource_id()));
          handler.network().send(
              e,
              &bincode::serialize(&Message1("127.0.0.1:8001".to_string())).unwrap(),
          );
      } else {
      }
      test_forwarding { handler, listener }
  }
  pub fn run(self) {
      let Self { handler, listener } = self;
      listener.for_each(move |event| match event.network() {
          NetEvent::Message(endpoint, serialized) => {
              let message = bincode::deserialize(serialized).unwrap();
              handle_message(&handler, endpoint, message);
          }
          NetEvent::Connected(endpoint, _result) => {
              // self.node_handler.network().connect(Transport::FramedTcp, endpoint.addr());
          }
          NetEvent::Accepted(e, _) => {
              println!("{:?}\n", handler.network().is_ready(e.resource_id()));
          }
          NetEvent::Disconnected(_) => {}
      });
      pub fn handle_message(handler: &NodeHandler<()>, endpoint: Endpoint, message: Message) {
          match message {
              Message1(x) => {
                  let (e, _) = handler.network().connect(Ws, x).unwrap();

                  while handler
                      .network()
                      .send(e, &bincode::serialize(&other).unwrap())
                      == SendStatus::ResourceNotAvailable
                  {
                      println!("Pending");
                      sleep(Duration::from_secs(1));
                  }
              }
              _ => {
                  println!("other message");
              }
          }
      }
  }
}

fn main() {
  let a = test_forwarding::new("127.0.0.1".to_string(), "8000".to_string());
  thread::spawn(|| {
      a.run();
  });
  let b = test_forwarding::new("127.0.0.1".to_string(), "8001".to_string());
  thread::spawn(|| {
      b.run();
  });
  sleep(Duration::from_secs(3));
  let c = test_forwarding::new("127.0.0.1".to_string(), "8002".to_string());
  c.run();
}
@lemunozm
Copy link
Owner

lemunozm commented Dec 3, 2024

Hi,

The message sent Message1 contains the port 9000 instead of 8000, which is the port I would expect to have a. Is that related to the issue?

@Idk121-blip
Copy link
Author

Idk121-blip commented Dec 3, 2024

Hi, thank you for your answer. I made an error while copying (since I tried different ports) but the error is always there it remains in pending. (btw It should be 8001 since i want that a connects to b, I will change it)

@lemunozm
Copy link
Owner

lemunozm commented Dec 3, 2024

The problem is that connect() is asynchronous, it needs to perform more things until the connection is ready to send data. The connect() inside handle_message(), tries to send a message after starting the connection but without finalizing the sync.

You only should send data after receiving the Connected event. So:

  1. You receive the message, you serialize and call connect(),
  2. Later in Connected you send the message.

@lemunozm
Copy link
Owner

lemunozm commented Dec 3, 2024

Also, note that here:

 if string2 == "8002" {
      let (e, s) = handler.network().connect(Ws, "127.0.0.1:8000").unwrap();
      println!("{:?}", handler.network().is_ready(e.resource_id()));
      sleep(Duration::from_secs(1));
      println!("{:?}", handler.network().is_ready(e.resource_id()));

you do not need to wait if you use connect_sync(), which will wait for you the exact time until it's connected. But you should not use that method inside the listener.for_each closure.

@Idk121-blip
Copy link
Author

  • You receive the message, you serialize and call connect(),
  • Later in Connected you send the message.

I don't think this is a good way of implementhing what I want to since it will lose the meaning of the messages (maybe I'm missing something but I'm preatty new to rust). My goal was to forward request to the server that has the resource that I'm looking for, or where I want to put something or connect to share information. To do so in the Connected event I need something like a channel where I'll write stuff and when I'll have more server the situation will be really hard to handle. I don't know if I understood it properly so maybe I'm missing something. How would you implement something like that with what you said?

@lemunozm
Copy link
Owner

lemunozm commented Dec 4, 2024

message-io follows an event driven pattern. Basically, you receive an event that means something for that endpoint. Depending of the connection event received, the connection is in a specific state.

How you architect the application is up to you. The only thing message-io requires is that if you call connect() you should wait to the Connected event before do anything with that connection, in this case, before sending the message.

To share some state, you can use the signals that comes along with messsge-io.

@Idk121-blip
Copy link
Author

I agree with what you said but is it normal that the Connected state will never be achieved inside match of the NetEvent? What I mean is that I can wait event 2 minutes or more for the handshake to finish BUT if I'm still inside the Message Event this connection won't finish the handshake. However if I finish the event whit connection after 1 second the connected state will be triggered. Like is it something wanted the behaviour that I described? The fact that the connection won't be made until the end of the event.

@lemunozm
Copy link
Owner

lemunozm commented Dec 4, 2024

The thread that finalizes the handshake is the event thread. That means that if you block the thread with an active waiting, the handshake can not finalize, and then the connection is not complete, even if you wait an infinite time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants