diff --git a/dial_sync.go b/dial_sync.go index 4c1230f5..edb6c898 100644 --- a/dial_sync.go +++ b/dial_sync.go @@ -96,13 +96,13 @@ func (ad *activeDial) start(ctx context.Context) { ad.cancel() } -func (ds *DialSync) getActiveDial(p peer.ID) *activeDial { +func (ds *DialSync) getActiveDial(ctx context.Context, p peer.ID) *activeDial { ds.dialsLk.Lock() defer ds.dialsLk.Unlock() actd, ok := ds.dials[p] if !ok { - adctx, cancel := context.WithCancel(context.Background()) + adctx, cancel := context.WithCancel(ctx) actd = &activeDial{ id: p, cancel: cancel, @@ -123,7 +123,7 @@ func (ds *DialSync) getActiveDial(p peer.ID) *activeDial { // DialLock initiates a dial to the given peer if there are none in progress // then waits for the dial to that peer to complete. func (ds *DialSync) DialLock(ctx context.Context, p peer.ID) (*Conn, error) { - return ds.getActiveDial(p).wait(ctx) + return ds.getActiveDial(ctx, p).wait(ctx) } // CancelDial cancels all in-progress dials to the given peer. diff --git a/go.mod b/go.mod index be76cdfa..5dceff46 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/jbenet/goprocess v0.1.4 github.com/libp2p/go-addr-util v0.0.2 github.com/libp2p/go-conn-security-multistream v0.2.1 - github.com/libp2p/go-libp2p-core v0.8.2 + github.com/libp2p/go-libp2p-core v0.8.3 github.com/libp2p/go-libp2p-loggables v0.1.0 github.com/libp2p/go-libp2p-peerstore v0.2.6 github.com/libp2p/go-libp2p-quic-transport v0.10.0 diff --git a/go.sum b/go.sum index ce327ec1..34920884 100644 --- a/go.sum +++ b/go.sum @@ -194,6 +194,8 @@ github.com/libp2p/go-libp2p-core v0.8.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJB github.com/libp2p/go-libp2p-core v0.8.1/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= github.com/libp2p/go-libp2p-core v0.8.2 h1:/eaSZACWftJZYm07S0nRxdI84v1hSmgnCXrGOvJdpNQ= github.com/libp2p/go-libp2p-core v0.8.2/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= +github.com/libp2p/go-libp2p-core v0.8.3 h1:BZTReEF6o8g/n4DwxTyeFannOeae35Xy0TD+mES3CNE= +github.com/libp2p/go-libp2p-core v0.8.3/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8= github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90= github.com/libp2p/go-libp2p-mplex v0.2.1/go.mod h1:SC99Rxs8Vuzrf/6WhmH41kNn13TiYdAWNYHrwImKLnE= diff --git a/swarm.go b/swarm.go index 272662a0..68f7603f 100644 --- a/swarm.go +++ b/swarm.go @@ -341,6 +341,7 @@ func (s *Swarm) NewStream(ctx context.Context, p peer.ID) (network.Stream, error // a non-closed connection. dials := 0 for { + // will prefer direct connections over relayed connections for opening streams c := s.bestConnToPeer(p) if c == nil { if nodial, _ := network.GetNoDial(ctx); nodial { @@ -392,9 +393,10 @@ func (s *Swarm) ConnsToPeer(p peer.ID) []network.Conn { // bestConnToPeer returns the best connection to peer. func (s *Swarm) bestConnToPeer(p peer.ID) *Conn { - // Selects the best connection we have to the peer. - // TODO: Prefer some transports over others. Currently, we just select - // the newest non-closed connection with the most streams. + + // TODO: Prefer some transports over others. + // For now, prefers direct connections over Relayed connections. + // For tie-breaking, select the newest non-closed connection with the most streams. s.conns.RLock() defer s.conns.RUnlock() @@ -409,15 +411,25 @@ func (s *Swarm) bestConnToPeer(p peer.ID) *Conn { cLen := len(c.streams.m) c.streams.Unlock() - if cLen >= bestLen { + // We will never prefer a Relayed connection over a direct connection. + if isDirectConn(best) && !isDirectConn(c) { + continue + } + + // 1. Always prefer a direct connection over a relayed connection. + // 2. If both conns are direct or relayed, pick the one with as many or more streams. + if (!isDirectConn(best) && isDirectConn(c)) || (cLen >= bestLen) { best = c bestLen = cLen } - } return best } +func isDirectConn(c *Conn) bool { + return c != nil && !c.conn.Transport().Proxy() +} + // Connectedness returns our "connectedness" state with the given peer. // // To check if we have an open connection, use `s.Connectedness(p) == diff --git a/swarm_dial.go b/swarm_dial.go index 65f123a0..052da67f 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -251,9 +251,14 @@ func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) { defer log.EventBegin(ctx, "swarmDialAttemptSync", p).Done() - // check if we already have an open connection first conn := s.bestConnToPeer(p) - if conn != nil { + forceDirect, _ := network.GetForceDirectDial(ctx) + if forceDirect { + if isDirectConn(conn) { + return conn, nil + } + } else if conn != nil { + // check if we already have an open connection first return conn, nil } @@ -287,8 +292,13 @@ func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) { // Short circuit. // By the time we take the dial lock, we may already *have* a connection // to the peer. + forceDirect, _ := network.GetForceDirectDial(ctx) c := s.bestConnToPeer(p) - if c != nil { + if forceDirect { + if isDirectConn(c) { + return c, nil + } + } else if c != nil { return c, nil } @@ -301,12 +311,17 @@ func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) { conn, err := s.dial(ctx, p) if err != nil { conn = s.bestConnToPeer(p) - if conn != nil { + if forceDirect { + if isDirectConn(conn) { + log.Debugf("ignoring dial error because we already have a direct connection: %s", err) + return conn, nil + } + } else if conn != nil { // Hm? What error? // Could have canceled the dial because we received a // connection or some other random reason. // Just ignore the error and return the connection. - log.Debugf("ignoring dial error because we have a connection: %s", err) + log.Debugf("ignoring dial error because we already have a connection: %s", err) return conn, nil } @@ -321,6 +336,11 @@ func (s *Swarm) canDial(addr ma.Multiaddr) bool { return t != nil && t.CanDial(addr) } +func (s *Swarm) nonProxyAddr(addr ma.Multiaddr) bool { + t := s.TransportForDialing(addr) + return !t.Proxy() +} + // ranks addresses in descending order of preference for dialing // Private UDP > Public UDP > Private TCP > Public TCP > UDP Relay server > TCP Relay server func (s *Swarm) rankAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { @@ -362,6 +382,7 @@ func (s *Swarm) rankAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { // dial is the actual swarm's dial logic, gated by Dial. func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { + forceDirect, _ := network.GetForceDirectDial(ctx) var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil) if p == s.local { log.Event(ctx, "swarmDialDoDialSelf", logdial) @@ -383,20 +404,25 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { return nil, &DialError{Peer: p, Cause: ErrNoAddresses} } goodAddrs := s.filterKnownUndialables(p, peerAddrs) + if forceDirect { + goodAddrs = addrutil.FilterAddrs(goodAddrs, s.nonProxyAddr) + } if len(goodAddrs) == 0 { return nil, &DialError{Peer: p, Cause: ErrNoGoodAddresses} } - /////// Check backoff andnRank addresses - var nonBackoff bool - for _, a := range goodAddrs { - // skip addresses in back-off - if !s.backf.Backoff(p, a) { - nonBackoff = true + if !forceDirect { + /////// Check backoff andnRank addresses + var nonBackoff bool + for _, a := range goodAddrs { + // skip addresses in back-off + if !s.backf.Backoff(p, a) { + nonBackoff = true + } + } + if !nonBackoff { + return nil, ErrDialBackoff } - } - if !nonBackoff { - return nil, ErrDialBackoff } connC, dialErr := s.dialAddrs(ctx, p, s.rankAddrs(goodAddrs))