Skip to content

Commit

Permalink
Merge pull request #382 from nervosnetwork/enable-tcp-base-protocol-l…
Browse files Browse the repository at this point in the history
…isten-on-same-port

feat: enable tcp base protocol listen on same port
  • Loading branch information
driftluo authored Dec 2, 2024
2 parents d44926e + fbe16e8 commit 16278e5
Show file tree
Hide file tree
Showing 24 changed files with 961 additions and 482 deletions.
3 changes: 2 additions & 1 deletion tentacle/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ nohash-hasher = "0.2"

parking_lot = { version = "0.12", optional = true }
tokio-tungstenite = { version = "0.24", optional = true }
httparse = { version = "1.9", optional = true }
futures-timer = { version = "3.0.2", optional = true }
async-std = { version = "1", features = ["unstable"], optional = true }
async-io = { version = "1", optional = true }
Expand Down Expand Up @@ -76,7 +77,7 @@ nix = { version = "0.29", default-features = false, features = ["signal"] }

[features]
default = ["tokio-runtime", "tokio-timer"]
ws = ["tokio-tungstenite"]
ws = ["tokio-tungstenite", "httparse"]
tls = ["tokio-rustls"]
upnp = ["igd"]
secio-async-trait = ["secio/async-trait"]
Expand Down
2 changes: 1 addition & 1 deletion tentacle/examples/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ fn server() {
.unwrap();
#[cfg(feature = "ws")]
service
.listen("/ip4/127.0.0.1/tcp/1338/ws".parse().unwrap())
.listen("/ip4/127.0.0.1/tcp/1337/ws".parse().unwrap())
.await
.unwrap();
service.run().await
Expand Down
2 changes: 1 addition & 1 deletion tentacle/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
};

/// If the buffer unused capacity is greater than u8 max, shrink it
const BUF_SHRINK_THRESHOLD: usize = u8::max_value() as usize;
const BUF_SHRINK_THRESHOLD: usize = u8::MAX as usize;

pub enum SendResult {
Ok,
Expand Down
2 changes: 1 addition & 1 deletion tentacle/src/channel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub use sink_impl::QuickSinkExt;
use std::fmt;

// The `is_open` flag is stored in the left-most bit of `Inner::state`
const OPEN_MASK: usize = usize::max_value() - (usize::max_value() >> 1);
const OPEN_MASK: usize = usize::MAX - (usize::MAX >> 1);

// When a new channel is created, it is created in the open state with no
// pending messages.
Expand Down
3 changes: 1 addition & 2 deletions tentacle/src/lock/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
#![allow(dead_code)]
#![allow(dead_code, unused_imports)]

#[cfg(feature = "parking_lot")]
pub use parking_lot::{const_fair_mutex, const_mutex, const_rwlock, FairMutex, Mutex, RwLock};
#[cfg(not(feature = "parking_lot"))]
pub mod native;

#[allow(unused_imports)]
#[cfg(not(feature = "parking_lot"))]
pub use native::{Mutex, RwLock};
4 changes: 4 additions & 0 deletions tentacle/src/runtime/async_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ mod os {
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
self.0.get_ref().peer_addr()
}

pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
self.0.get_ref().peek(buf).await
}
}

impl AsyncRead for TcpStream {
Expand Down
2 changes: 1 addition & 1 deletion tentacle/src/runtime/budget.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
};

thread_local! {
static CURRENT: RefCell<u8> = RefCell::new(128);
static CURRENT: RefCell<u8> = const { RefCell::new(128) };
}

/// Returns `Poll::Pending` if the current task has exceeded its budget and should yield.
Expand Down
2 changes: 1 addition & 1 deletion tentacle/src/runtime/tokio_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ mod time {
}

fn size_hint(&self) -> (usize, Option<usize>) {
(std::usize::MAX, None)
(usize::MAX, None)
}
}

Expand Down
12 changes: 8 additions & 4 deletions tentacle/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::{
},
session::{Session, SessionEvent, SessionMeta},
traits::ServiceHandle,
transports::{MultiIncoming, MultiTransport, Transport},
transports::{MultiIncoming, MultiTransport, TransportDial, TransportListen},
utils::extract_peer_id,
ProtocolId, SessionId,
};
Expand Down Expand Up @@ -228,7 +228,9 @@ where
}
inner.listens.insert(listen_address.clone());

inner.spawn_listener(incoming, listen_address);
if !matches!(incoming, MultiIncoming::TcpUpgrade) {
inner.spawn_listener(incoming, listen_address);
}

Ok(addr)
}
Expand Down Expand Up @@ -1017,7 +1019,7 @@ where
if let Some(ref mut client) = self.igd_client {
client.remove(&address);
}

self.try_update_listens().await;
let _ignore = self
.handle_sender
.send(ServiceEvent::ListenClose { address }.into())
Expand Down Expand Up @@ -1075,7 +1077,9 @@ where
if let Some(client) = self.igd_client.as_mut() {
client.register(&listen_address)
}
self.spawn_listener(incoming, listen_address);
if !matches!(incoming, MultiIncoming::TcpUpgrade) {
self.spawn_listener(incoming, listen_address);
}
}
SessionEvent::ProtocolHandleError { error, proto_id } => {
let _ignore = self
Expand Down
2 changes: 1 addition & 1 deletion tentacle/src/service/future_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ mod test {
let mut manager = FutureTaskManager::new(receiver, shutdown);
let finished_tasks = Arc::new(AtomicUsize::new(0));
let finished_tasks_inner = Arc::clone(&finished_tasks);
let signals_len = Arc::new(AtomicUsize::new(usize::max_value()));
let signals_len = Arc::new(AtomicUsize::new(usize::MAX));
let signals_len_inner = Arc::clone(&signals_len);

let mut send_task = sender.clone();
Expand Down
11 changes: 8 additions & 3 deletions tentacle/src/transports/browser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use crate::{
error::TransportErrorKind,
multiaddr::{Multiaddr, Protocol},
transports::{find_type, Result, Transport, TransportFuture, TransportType},
transports::{
find_type, Result, TransportDial, TransportFuture, TransportListen, TransportType,
},
utils::multiaddr_to_socketaddr,
};
use futures::FutureExt;
Expand Down Expand Up @@ -113,13 +115,16 @@ impl BrowserTransport {
pub type BrowserDialFuture =
TransportFuture<Pin<Box<dyn Future<Output = Result<(Multiaddr, BrowserStream)>> + Send>>>;

impl Transport for BrowserTransport {
impl TransportListen for BrowserTransport {
type ListenFuture = ();
type DialFuture = BrowserDialFuture;

fn listen(self, address: Multiaddr) -> Result<Self::ListenFuture> {
Err(TransportErrorKind::NotSupported(address))
}
}

impl TransportDial for BrowserTransport {
type DialFuture = BrowserDialFuture;

fn dial(self, address: Multiaddr) -> Result<Self::DialFuture> {
if !matches!(find_type(&address), TransportType::Ws) {
Expand Down
9 changes: 5 additions & 4 deletions tentacle/src/transports/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
error::TransportErrorKind,
lock::Mutex,
multiaddr::{Multiaddr, Protocol},
transports::{Result, Transport, TransportFuture},
transports::{Result, TransportDial, TransportFuture, TransportListen},
};

use bytes::Bytes;
Expand Down Expand Up @@ -120,15 +120,16 @@ pub type MemoryListenFuture =
pub type MemoryDialFuture =
TransportFuture<Pin<Box<dyn Future<Output = Result<(Multiaddr, MemorySocket)>> + Send>>>;

impl Transport for MemoryTransport {
impl TransportListen for MemoryTransport {
type ListenFuture = MemoryListenFuture;
type DialFuture = MemoryDialFuture;

fn listen(self, address: Multiaddr) -> Result<Self::ListenFuture> {
let task = bind(address);
Ok(TransportFuture::new(Box::pin(task)))
}

}
impl TransportDial for MemoryTransport {
type DialFuture = MemoryDialFuture;
fn dial(self, address: Multiaddr) -> Result<Self::DialFuture> {
let task = connect(address);
Ok(TransportFuture::new(Box::pin(task)))
Expand Down
Loading

0 comments on commit 16278e5

Please sign in to comment.