-
Notifications
You must be signed in to change notification settings - Fork 43
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
- This will need a bunch of tests. Especially around closing listeners, connections, picking the right one, etc. They don't have to be that fancy, but we should test unspecified versus localhost, etc.
- We can leave your dep where it is or adopt it into the libp2p org, I'm happy to do either. I'd also like to try adding windows support before shipping this but that shouldn't actually be too difficult (we can work on that after we finish up with this PR).
The current design (as well as the current code, for that matter) is nice and simple but, as far as I can tell, it doesn't allow us to actually close the underlying udp sockets when we no longer need them (because we can't know if we no longer need them). We may want to reference count on create/close.
If we're going to be wrapping connections anyways, we should consider a manager of the form:
type Manager interface {
// Dial dials the remote addr, reusing an existing socket if possible.
Dial(network string, addr *net.UDPAddr) (net.PacketConn, error)
// Listen listens on the given address.
Listen(network string, addr *net.UDPAddr) (net.PacketConn, error)
}
Where Dial/Listen return connections wrapped in a reference counter.
- Closing any connection (listen/dial) would decrement the reference count. When it hits zero, the underlying connection would be closed.
- Closing a listener would also remove the listener from the list of open listeners.
That way, we no longer need to manually register/unregister connections. The connection manager just does everything for us.
Note: I don't feel strongly about this interface and, IMO, the current interface is just fine. My main concern is reference counting.
If you'd like, I'd be fine punting reference counting given that it's an existing issue.
transport.go
Outdated
@@ -66,12 +129,32 @@ func (c *connManager) createConn(network, host string) (net.PacketConn, error) { | |||
return net.ListenUDP(network, addr) | |||
} | |||
|
|||
func (c *connManagers) AddListener(network string, addr *net.UDPAddr, conn net.PacketConn) error { | |||
if connManager, ok := c.managers[network]; ok { | |||
return connManager.AddListener(network, addr, conn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Feeding the network through here just to be thrown away seems funny. We don't have to use the exact same interface for the connManager and the connManagers.
(although I don't mind much either way)
listener.go
Outdated
@@ -49,10 +54,12 @@ func newListener(addr ma.Multiaddr, transport tpt.Transport, localPeer peer.ID, | |||
} | |||
return &listener{ | |||
quicListener: ln, | |||
transport: transport, | |||
t: t, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's just keep this as transport
.
@@ -39,6 +40,10 @@ func newListener(addr ma.Multiaddr, transport tpt.Transport, localPeer peer.ID, | |||
if err != nil { | |||
return nil, err | |||
} | |||
err = t.connManagers.AddListener(lnet, laddr, conn) | |||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to close the connection?
listener.go
Outdated
privKey ic.PrivKey | ||
localPeer peer.ID | ||
localMultiaddr ma.Multiaddr | ||
udpAddr *net.UDPAddr | ||
network string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd mildly prefer to just re-parse the multiaddr on close (storing duplicate information bugs me).
Test Added.
Adopt it into the libp2p org will be better.
Used referenced connection now. @Stebalien I have updated the code with your review opinoin. Please for a second review. |
"net" | ||
"sync" | ||
|
||
srcs "github.com/lnykww/go-src-select" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've forked this to github.com/libp2p/go-src-select I can also delete that and we can transfer it if you'd like.
Could you add a LICENSE? Ideally Apache 2.0 + MIT. Take a look at the COPYRIGHT, LICENSE-MIT, and LICENSE-APACHE files in ipfs/go-ds-crdt@d51c9f1.
Note: This project is still copyright you (well, you and whoever else ends up contributing to it).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
reuse.go
Outdated
srcs "github.com/lnykww/go-src-select" | ||
) | ||
|
||
type ReuseConn struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd keep all these types internal for now.
reuse.go
Outdated
return reuseConn.Ref() | ||
} | ||
|
||
func (r *Reuse) dial(network string, raddr *net.UDPAddr) (net.PacketConn, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can return a *ReuseConn
here. Then we can drop reuseConn
and just call Ref
directly.
reuse.go
Outdated
} | ||
} | ||
|
||
func (rc *ReuseConn) Ref() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably simpler just to return a boolean true
for success.
reuse.go
Outdated
return nil, err | ||
} | ||
// we are reuse a conn, reference it | ||
if err = r.reuseConn(conn); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I believe this will double-ref if we end up creating a new global connection.
- We probably need a loop (either inside dial out outside). Otherwise, we could try to use the connection but fail because some other thread decided it didn't want it. In that case, we should just open a new connection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I believe this will double-ref if we end up creating a new global connection.
I have found this problem before. But we create a listen socket as a global socket,
it means that if we don't want to use it, we need close it ourself in this module,
rather than the user in other module. So when it was created, the reference should be 1.
- We probably need a loop (either inside dial out outside). Otherwise, we could try to use the connection but fail because some other thread decided it didn't want it. In that case, we should just open a new connection.
Ok, i will add a retry for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have found this problem before. But we create a listen socket as a global socket,
it means that if we don't want to use it, we need close it ourself in this module,
rather than the user in other module.
My thinking was that, when the last connection using the global connection closes, we could close the global connection (reopening a new one when necessary).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thinking was that, when the last connection using the global connection closes, we could close the global connection (reopening a new one when necessary).
This may cause us to create new socket frequently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're repeatedly closing all connections. Also note: this will only happen if we aren't listening on a relevant ip address.
My worry here is that we're leaking a socket. In theory, a user should be able to spin up/down new libp2p instances without leaking anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right. It does not happen very often. I will do that.
So, I realized something annoying. quic-go correctly won't close connections it didn't create. @marten-seemann is this something that could be made configurable? I.e., some way to "give" a connection to QUIC? This patch is introducing connection refcounting but nothing will ever call @lnykww otherwise, we'll have to stash the Let's wait for @marten-seemann to chime in here before we move forward. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some stylistic comments. Will do a full review later.
listener.go
Outdated
@@ -35,7 +34,7 @@ func newListener(addr ma.Multiaddr, transport tpt.Transport, localPeer peer.ID, | |||
if err != nil { | |||
return nil, err | |||
} | |||
conn, err := net.ListenUDP(lnet, laddr) | |||
conn, err := t.connManagers.Listen(lnet, laddr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't access unexported fields of other structs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should have been more specific. Please don't access any fields of other structs.
listener.go
Outdated
if err != nil { | ||
return err | ||
} | ||
err = l.transport.connManagers.Close(lnet, laddr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
reuse.go
Outdated
|
||
type ReuseConn struct { | ||
net.PacketConn | ||
lock sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Convention: This should be called mutex
.
reuse.go
Outdated
return &ReuseConn{ | ||
PacketConn: conn, | ||
ref: 1, | ||
lock: sync.Mutex{}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to initialize a mutex. The default value works fine.
reuse.go
Outdated
type ReuseConn struct { | ||
net.PacketConn | ||
lock sync.Mutex | ||
ref int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is ref
an int
, if it only ever takes the value 0
or 1
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ref indicates how many connection was use it. and its value is not only 0 or 1.
reuse.go
Outdated
} | ||
|
||
type Reuse struct { | ||
lock sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
reuse.go
Outdated
// dialGlobal get the global random port socket, if not exist, create | ||
// it first. | ||
func (r *Reuse) dialGlobal(network string) (*reuseConn, error) { | ||
var err error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This err
is unused.
|
||
func NewReuse() *Reuse { | ||
return &Reuse{ | ||
unicast: make(map[string]map[int]*reuseConn), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume the int
should be the port number, right? This will need documentation here.
reuse.go
Outdated
r.unicast[laddr.IP.String()] = make(map[int]*reuseConn) | ||
} | ||
if _, ok := r.unicast[laddr.IP.String()][laddr.Port]; ok { | ||
conn.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we closing the connection that we just created a few lines above? Can't we avoid creating it in the first place?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
emm, I think we need not to check if we has already listening on an address. Because when we call ListenUDP, the kernel will do that.
} | ||
} | ||
|
||
func (rc *reuseConn) Ref() bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Things will become a lot clearer if you introduce a function to tell if rc.ref > 0
and another function to increase the value.
reuse.go
Outdated
} | ||
|
||
rconn := NewReuseConn(conn) | ||
_ = rconn.Ref() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not be necessary any more with the refactoring I suggested for the ref counter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is a bug here. it need not to increase the ref. Because of NewReuseConn already set the ref to 1. Will fix it.
reuse.go
Outdated
r.mutex.Lock() | ||
defer r.mutex.Unlock() | ||
|
||
switch { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This switch statement would greatly benefit from comments explaining what's going on here.
reuse.go
Outdated
defer r.mutex.Unlock() | ||
switch { | ||
case addr.IP.IsUnspecified(): | ||
for index, conn := range r.unspecific { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use a map
for r.unspecific
? Looping over a slice and then deleting values from the middle of it doesn't look very clean.
r.unspecific = append(r.unspecific[:index], r.unspecific[index+1:]...) | ||
return nil | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the connection is not found in the slice? We should handle that case and return an error.
The code will probably become more readable if you use an if
instead a switch
.
2. Del duplicate check of the unicast and uspecific map. Let kernel help us to check if it is already listening on. 3. Make code clean.
@marten-seemann can you comment on:
? |
This is behaving exactly as intended. If you're passing a connection to quic-go, either in |
I understand, I'm asking if we could have some way to "give" the connection to quic-go. Alternatively, we can handle the reference counting from the go-libp2p-transport Conn. |
The point is that quic-go doesn't know when it's ok to close the connection. Only the user knows when it's done |
The current patch refcounts connections, incrementing the refcount whenever it hands one to quic. (Note: I'm asking if some kind of "owned" flag would be a reasonable addition to quic-go, I know it won't work at the moment) |
I'm sorry, I'm afraid I don't understand the problem you're trying to solve. When and why do you want to close a UDP connection? |
When we, e.g., close a listener. We should be able to spin up a bunch of libp2p nodes in as single process and then tear them down without leaking anything. |
Then we need to close the underlying connections in the same step as we close the listeners. Note that this will also kill all outgoing connections we might have use the packet conn for. |
That's why this is doing ref-counting. |
@marten-seemann For creating a new libp2p instance, we might create a new global conn. When we close the libp2p instance, we need to close the global conn to prevent the connection from leaking.
@Stebalien Did this means that we need close the connection ourselves? |
Fixes #8
Implementation:#8 (comment)
I am not sure where the source select library should be placed, so I created a repository on my github to put this code. If it is not appropriate, please let me know where to put this code.