Skip to content

Commit 967f9ec

Browse files
committed
net: channel hosts a pointer to Session
solved a bug which warned Session could not be made into an object. documented here: rust-lang/rust#51443 and here: https://stackoverflow.com/questions/72838225/rust-trait-warning-method-references-the-self-type-in-its-where-clause
1 parent ab2a5af commit 967f9ec

9 files changed

+33
-36
lines changed

src/net/acceptor.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,16 @@ pub type AcceptorPtr = Arc<Acceptor>;
2222
pub struct Acceptor {
2323
channel_subscriber: SubscriberPtr<Result<ChannelPtr>>,
2424
task: StoppableTaskPtr,
25-
//pub session: Mutex<Option<SessionWeakPtr>>,
25+
pub session: Mutex<Option<SessionWeakPtr>>,
2626
}
2727

2828
impl Acceptor {
2929
/// Create new Acceptor object.
30-
pub fn new() -> Arc<Self> {
30+
pub fn new(session: Mutex<Option<SessionWeakPtr>>) -> Arc<Self> {
3131
Arc::new(Self {
3232
channel_subscriber: Subscriber::new(),
3333
task: StoppableTask::new(),
34-
//session,
34+
session,
3535
})
3636
}
3737
/// Start accepting inbound socket connections. Creates a listener to start
@@ -148,7 +148,8 @@ impl Acceptor {
148148
loop {
149149
match listener.next().await {
150150
Ok((stream, url)) => {
151-
let channel = Channel::new(stream, url).await;
151+
let channel =
152+
Channel::new(stream, url, self.session.lock().await.clone().unwrap()).await;
152153
self.channel_subscriber.notify(Ok(channel)).await;
153154
}
154155
Err(e) => {

src/net/channel.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ pub struct Channel {
6868
receive_task: StoppableTaskPtr,
6969
stopped: Mutex<bool>,
7070
info: Mutex<ChannelInfo>,
71-
//session: SessionWeakPtr,
71+
session: SessionWeakPtr,
7272
}
7373

7474
impl Channel {
@@ -78,7 +78,7 @@ impl Channel {
7878
pub async fn new(
7979
stream: Box<dyn TransportStream>,
8080
address: Url,
81-
//session: SessionWeakPtr,
81+
session: SessionWeakPtr,
8282
) -> Arc<Self> {
8383
let (reader, writer) = stream.split();
8484
let reader = Mutex::new(reader);
@@ -96,7 +96,7 @@ impl Channel {
9696
receive_task: StoppableTask::new(),
9797
stopped: Mutex::new(false),
9898
info: Mutex::new(ChannelInfo::new()),
99-
//session,
99+
session,
100100
})
101101
}
102102

src/net/connector.rs

+8-7
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,13 @@ use super::{
1414
/// Create outbound socket connections.
1515
pub struct Connector {
1616
settings: SettingsPtr,
17-
//pub session: SessionWeakPtr,
17+
pub session: SessionWeakPtr,
1818
}
1919

2020
impl Connector {
2121
/// Create a new connector with default network settings.
22-
pub fn new(settings: SettingsPtr, // session: SessionWeakPtr
23-
) -> Self {
24-
Self { settings, // session
25-
}
22+
pub fn new(settings: SettingsPtr, session: SessionWeakPtr) -> Self {
23+
Self { settings, session }
2624
}
2725

2826
/// Establish an outbound connection.
@@ -58,10 +56,13 @@ impl Connector {
5856

5957
let channel = match $upgrade {
6058
// session
61-
None => Channel::new(Box::new(stream?), connect_url.clone()).await,
59+
None => {
60+
Channel::new(Box::new(stream?), connect_url.clone(), self.session.clone())
61+
.await
62+
}
6263
Some(u) if u == "tls" => {
6364
let stream = $transport.upgrade_dialer(stream?)?.await;
64-
Channel::new(Box::new(stream?), connect_url).await
65+
Channel::new(Box::new(stream?), connect_url, self.session.clone()).await
6566
}
6667
Some(u) => return Err(Error::UnsupportedTransportUpgrade(u)),
6768
};

src/net/p2p.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ impl P2p {
9494
let parent = Arc::downgrade(&self_);
9595

9696
*self_.session_manual.lock().await = Some(ManualSession::new(parent.clone()));
97-
*self_.session_inbound.lock().await = Some(InboundSession::new(parent.clone()));
97+
*self_.session_inbound.lock().await = Some(InboundSession::new(parent.clone()).await);
9898
*self_.session_outbound.lock().await = Some(OutboundSession::new(parent));
9999

100100
register_default_protocols(self_.clone()).await;

src/net/session/inbound_session.rs

+4-5
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,8 @@ pub struct InboundSession {
3737

3838
impl InboundSession {
3939
/// Create a new inbound session.
40-
pub fn new(p2p: Weak<P2p>) -> Arc<Self> {
41-
//let acceptor = Acceptor::new(Mutex::new(None));
42-
let acceptor = Acceptor::new();
40+
pub async fn new(p2p: Weak<P2p>) -> Arc<Self> {
41+
let acceptor = Acceptor::new(Mutex::new(None));
4342

4443
let self_ = Arc::new(Self {
4544
p2p,
@@ -48,9 +47,9 @@ impl InboundSession {
4847
connect_infos: Mutex::new(FxHashMap::default()),
4948
});
5049

51-
//let parent = Arc::downgrade(&self_);
50+
let parent = Arc::downgrade(&self_);
5251

53-
//*self_.acceptor.session.lock().await = Some(Arc::new(parent));
52+
*self_.acceptor.session.lock().await = Some(Arc::new(parent));
5453

5554
self_
5655
}

src/net/session/manual_session.rs

+1-4
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,7 @@ impl ManualSession {
5757
executor: Arc<Executor<'_>>,
5858
) -> Result<()> {
5959
let parent = Arc::downgrade(&self);
60-
let connector = Connector::new(
61-
self.p2p().settings(),
62-
//Arc::new(parent)
63-
);
60+
let connector = Connector::new(self.p2p().settings(), Arc::new(parent));
6461

6562
let settings = self.p2p().settings();
6663

src/net/session/mod.rs

+9-5
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,12 @@ async fn remove_sub_on_stop(p2p: P2pPtr, channel: ChannelPtr) {
7676
pub trait Session: Sync {
7777
/// Registers a new channel with the session. Performs a network handshake
7878
/// and starts the channel.
79+
// if we need to pass Self as an Arc we can do so like this:
80+
// pub trait MyTrait: Send + Sync {
81+
// async fn foo(&self, self_: Arc<dyn MyTrait>) {}
82+
// }
7983
async fn register_channel(
80-
self_: Arc<dyn Session>,
84+
&self,
8185
channel: ChannelPtr,
8286
executor: Arc<Executor<'_>>,
8387
) -> Result<()> {
@@ -87,14 +91,14 @@ pub trait Session: Sync {
8791
// We do this so that the protocols can begin receiving and buffering messages
8892
// while the handshake protocol is ongoing.
8993
// They are currently in sleep mode.
90-
let p2p = self_.p2p();
94+
let p2p = self.p2p();
9195
let protocols =
92-
p2p.protocol_registry().attach(self_.type_id(), channel.clone(), p2p.clone()).await;
96+
p2p.protocol_registry().attach(self.type_id(), channel.clone(), p2p.clone()).await;
9397

9498
// Perform the handshake protocol
95-
let protocol_version = ProtocolVersion::new(channel.clone(), self_.p2p().settings()).await;
99+
let protocol_version = ProtocolVersion::new(channel.clone(), self.p2p().settings()).await;
96100
let handshake_task =
97-
self_.perform_handshake_protocols(protocol_version, channel.clone(), executor.clone());
101+
self.perform_handshake_protocols(protocol_version, channel.clone(), executor.clone());
98102

99103
// Switch on the channel
100104
channel.start(executor.clone());

src/net/session/outbound_session.rs

+1-4
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,7 @@ impl OutboundSession {
137137
) -> Result<()> {
138138
let parent = Arc::downgrade(&self);
139139

140-
let connector = Connector::new(
141-
self.p2p().settings(),
142-
//Arc::new(parent)
143-
);
140+
let connector = Connector::new(self.p2p().settings(), Arc::new(parent));
144141

145142
loop {
146143
let addr = self.load_address(slot_number).await?;

src/net/session/seed_session.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,7 @@ impl SeedSession {
105105
};
106106

107107
let parent = Arc::downgrade(&self);
108-
let connector = Connector::new(
109-
settings.clone(), //Arc::new(parent)
110-
);
108+
let connector = Connector::new(settings.clone(), Arc::new(parent));
111109
match connector.connect(seed.clone()).await {
112110
Ok(channel) => {
113111
// Blacklist goes here

0 commit comments

Comments
 (0)