-
Notifications
You must be signed in to change notification settings - Fork 43
implement connection reuse #63
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
package libp2pquic | ||
|
||
import ( | ||
"net" | ||
"sync" | ||
"sync/atomic" | ||
|
||
"github.com/vishvananda/netlink" | ||
) | ||
|
||
type reuseConn struct { | ||
net.PacketConn | ||
refCount int32 // to be used as an atomic | ||
} | ||
|
||
func newReuseConn(conn net.PacketConn) *reuseConn { | ||
return &reuseConn{PacketConn: conn} | ||
} | ||
|
||
func (c *reuseConn) IncreaseCount() { atomic.AddInt32(&c.refCount, 1) } | ||
func (c *reuseConn) DecreaseCount() { atomic.AddInt32(&c.refCount, -1) } | ||
func (c *reuseConn) GetCount() int { return int(atomic.LoadInt32(&c.refCount)) } | ||
|
||
type reuse struct { | ||
mutex sync.Mutex | ||
|
||
handle *netlink.Handle // Only set on Linux. nil on other systems. | ||
|
||
unicast map[string] /* IP.String() */ map[int] /* port */ *reuseConn | ||
// global contains connections that are listening on 0.0.0.0 / :: | ||
global map[int]*reuseConn | ||
} | ||
|
||
func newReuse() (*reuse, error) { | ||
// On non-Linux systems, this will return ErrNotImplemented. | ||
handle, err := netlink.NewHandle() | ||
if err == netlink.ErrNotImplemented { | ||
handle = nil | ||
} else if err != nil { | ||
return nil, err | ||
} | ||
return &reuse{ | ||
unicast: make(map[string]map[int]*reuseConn), | ||
global: make(map[int]*reuseConn), | ||
handle: handle, | ||
}, nil | ||
} | ||
|
||
// Get the source IP that the kernel would use for dialing. | ||
// This only works on Linux. | ||
// On other systems, this returns an empty slice of IP addresses. | ||
func (r *reuse) getSourceIPs(network string, raddr *net.UDPAddr) ([]net.IP, error) { | ||
if r.handle == nil { | ||
return nil, nil | ||
} | ||
|
||
routes, err := r.handle.RouteGet(raddr.IP) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
ips := make([]net.IP, 0, len(routes)) | ||
for _, route := range routes { | ||
ips = append(ips, route.Src) | ||
} | ||
return ips, nil | ||
} | ||
|
||
func (r *reuse) Dial(network string, raddr *net.UDPAddr) (*reuseConn, error) { | ||
ips, err := r.getSourceIPs(network, raddr) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
r.mutex.Lock() | ||
defer r.mutex.Unlock() | ||
|
||
conn, err := r.dialLocked(network, raddr, ips) | ||
if err != nil { | ||
return nil, err | ||
} | ||
conn.IncreaseCount() | ||
return conn, nil | ||
} | ||
|
||
func (r *reuse) dialLocked(network string, raddr *net.UDPAddr, ips []net.IP) (*reuseConn, error) { | ||
for _, ip := range ips { | ||
// We already have at least one suitable connection... | ||
if conns, ok := r.unicast[ip.String()]; ok { | ||
// ... we don't care which port we're dialing from. Just use the first. | ||
for _, c := range conns { | ||
return c, nil | ||
} | ||
} | ||
} | ||
|
||
// Use a connection listening on 0.0.0.0 (or ::). | ||
// Again, we don't care about the port number. | ||
for _, conn := range r.global { | ||
return conn, nil | ||
} | ||
|
||
// We don't have a connection that we can use for dialing. | ||
// Dial a new connection from a random port. | ||
var addr *net.UDPAddr | ||
switch network { | ||
case "udp4": | ||
addr = &net.UDPAddr{IP: net.IPv4zero, Port: 0} | ||
case "udp6": | ||
addr = &net.UDPAddr{IP: net.IPv6zero, Port: 0} | ||
} | ||
conn, err := net.ListenUDP(network, addr) | ||
if err != nil { | ||
return nil, err | ||
} | ||
rconn := newReuseConn(conn) | ||
r.global[conn.LocalAddr().(*net.UDPAddr).Port] = rconn | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ideally, we'd mark this as a fallback connection so we can stop using it when we get a real global connection. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure I understand. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Events:
At step 3, we may use the connection from step 1 or step 3. Ideally, we'd use the connection from step 2. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess you mean that ideally, we'd use the connection from step 2. I'm going to merge this PR now, so we don't have to go through another round of review on this one. This also touches on reusing dialed connections for listening. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (yes, sorry) |
||
return rconn, nil | ||
} | ||
|
||
func (r *reuse) Listen(network string, laddr *net.UDPAddr) (*reuseConn, error) { | ||
conn, err := net.ListenUDP(network, laddr) | ||
if err != nil { | ||
return nil, err | ||
} | ||
localAddr := conn.LocalAddr().(*net.UDPAddr) | ||
|
||
rconn := newReuseConn(conn) | ||
rconn.IncreaseCount() | ||
|
||
r.mutex.Lock() | ||
defer r.mutex.Unlock() | ||
|
||
// Deal with listen on a global address | ||
if laddr.IP.IsUnspecified() { | ||
// The kernel already checked that the laddr is not already listen | ||
// so we need not check here (when we create ListenUDP). | ||
r.global[laddr.Port] = rconn | ||
return rconn, err | ||
} | ||
|
||
// Deal with listen on a unicast address | ||
if _, ok := r.unicast[localAddr.IP.String()]; !ok { | ||
r.unicast[laddr.IP.String()] = make(map[int]*reuseConn) | ||
} | ||
|
||
// The kernel already checked that the laddr is not already listen | ||
// so we need not check here (when we create ListenUDP). | ||
r.unicast[laddr.IP.String()][localAddr.Port] = rconn | ||
return rconn, err | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
package libp2pquic | ||
|
||
import ( | ||
"net" | ||
"runtime" | ||
|
||
. "github.com/onsi/ginkgo" | ||
. "github.com/onsi/gomega" | ||
) | ||
|
||
var _ = Describe("Reuse", func() { | ||
var reuse *reuse | ||
|
||
BeforeEach(func() { | ||
var err error | ||
reuse, err = newReuse() | ||
Expect(err).ToNot(HaveOccurred()) | ||
}) | ||
|
||
It("creates a new global connection when listening on 0.0.0.0", func() { | ||
addr, err := net.ResolveUDPAddr("udp4", "0.0.0.0:0") | ||
Expect(err).ToNot(HaveOccurred()) | ||
conn, err := reuse.Listen("udp4", addr) | ||
Expect(err).ToNot(HaveOccurred()) | ||
Expect(conn.GetCount()).To(Equal(1)) | ||
}) | ||
|
||
It("creates a new global connection when listening on [::]", func() { | ||
addr, err := net.ResolveUDPAddr("udp6", "[::]:1234") | ||
Expect(err).ToNot(HaveOccurred()) | ||
conn, err := reuse.Listen("udp6", addr) | ||
Expect(err).ToNot(HaveOccurred()) | ||
Expect(conn.GetCount()).To(Equal(1)) | ||
}) | ||
|
||
It("creates a new global connection when dialing", func() { | ||
addr, err := net.ResolveUDPAddr("udp4", "1.1.1.1:1234") | ||
Expect(err).ToNot(HaveOccurred()) | ||
conn, err := reuse.Dial("udp4", addr) | ||
Expect(err).ToNot(HaveOccurred()) | ||
Expect(conn.GetCount()).To(Equal(1)) | ||
laddr := conn.LocalAddr().(*net.UDPAddr) | ||
Expect(laddr.IP.String()).To(Equal("0.0.0.0")) | ||
Expect(laddr.Port).ToNot(BeZero()) | ||
}) | ||
|
||
It("reuses a connection it created for listening when dialing", func() { | ||
// listen | ||
addr, err := net.ResolveUDPAddr("udp4", "0.0.0.0:0") | ||
Expect(err).ToNot(HaveOccurred()) | ||
lconn, err := reuse.Listen("udp4", addr) | ||
Expect(err).ToNot(HaveOccurred()) | ||
Expect(lconn.GetCount()).To(Equal(1)) | ||
// dial | ||
raddr, err := net.ResolveUDPAddr("udp4", "1.1.1.1:1234") | ||
Expect(err).ToNot(HaveOccurred()) | ||
conn, err := reuse.Dial("udp4", raddr) | ||
Expect(err).ToNot(HaveOccurred()) | ||
Expect(conn.GetCount()).To(Equal(2)) | ||
}) | ||
|
||
if runtime.GOOS == "linux" { | ||
It("reuses a connection it created for listening on a specific interface", func() { | ||
raddr, err := net.ResolveUDPAddr("udp4", "1.1.1.1:1234") | ||
Expect(err).ToNot(HaveOccurred()) | ||
ips, err := reuse.getSourceIPs("udp4", raddr) | ||
Expect(err).ToNot(HaveOccurred()) | ||
Expect(ips).ToNot(BeEmpty()) | ||
// listen | ||
addr, err := net.ResolveUDPAddr("udp4", ips[0].String()+":0") | ||
Expect(err).ToNot(HaveOccurred()) | ||
lconn, err := reuse.Listen("udp4", addr) | ||
Expect(err).ToNot(HaveOccurred()) | ||
Expect(lconn.GetCount()).To(Equal(1)) | ||
// dial | ||
conn, err := reuse.Dial("udp4", raddr) | ||
Expect(err).ToNot(HaveOccurred()) | ||
Expect(conn.GetCount()).To(Equal(2)) | ||
}) | ||
} | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this can just be
string(IP)
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
other nit: do we really want to map port to conn or just
map[*reuseConn]struct{}
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It needs to be a map of port to conn, if we ever want to reuse a connection that we dialed on for listening. I admit, this is a rather rare case, but it doesn't cost us anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand what you mean by
string(IP)
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
That is, we can convert the raw bytes directly to a string rather than formatting as "xxx.xxx.xxx.xxx"). It would make lookups zero-allocation (
mymap[string(someByteArray)]
doesn't allocate).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's great! I wasn't aware that
net.IP
is just a[]byte
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great in theory, that is. It looks like there's no obvious way to normalize IP addresses, and it's failing in our tests.
I managed to reproduce the issue locally, the IP is 192.168.46.226.
The kernel resolves this to:
net.IP{0xc0, 0xa8, 0x2e, 0xe2}
.After listening, the local address is:
net.IP{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xff, 0xff, 0xc0, 0xa8, 0x2e, 0xe2}
Normalizing this looks straightforward in this case, however, I'm a bit wary of pitfalls when using IPv6 addresses. Map lookups will hardly be the most expensive operation when starting a new QUIC listener / dialer, so I think we'll be fine for now with leaving this as a
net.IP.String()
.