@@ -86,58 +86,61 @@ func NewDialer(dest string, dialers []Dialer) Dialer {
86
86
return d
87
87
}
88
88
89
+ func (mpd * mpDialer ) dialOne (d * subflowDialer , cid connectionID , bc * mpConn , ctx context.Context ) (connectionID , bool , * mpConn ) {
90
+ conn , err := d .DialContext (ctx )
91
+ if err != nil {
92
+ log .Errorf ("failed to dial %s: %v" , d .Label (), err )
93
+ return zeroCID , false , bc
94
+ }
95
+ probeStart := time .Now ()
96
+ newCID , err := mpd .handshake (conn , cid )
97
+ if err != nil {
98
+ log .Errorf ("failed to handshake %s, continuing: %v" , d .Label (), err )
99
+ conn .Close ()
100
+ return zeroCID , false , bc
101
+ }
102
+ if cid == zeroCID {
103
+ bc = newMPConn (newCID )
104
+ go func () {
105
+ for {
106
+ time .Sleep (time .Second )
107
+ bc .pendingAckMu .RLock ()
108
+ oldest := time .Duration (0 )
109
+ oldestFN := uint64 (0 )
110
+ for fn , frame := range bc .pendingAckMap {
111
+ if time .Since (frame .sentAt ) > oldest {
112
+ oldest = time .Since (frame .sentAt )
113
+ oldestFN = fn
114
+ }
115
+ }
116
+ bc .pendingAckMu .RUnlock ()
117
+ if oldest > time .Second {
118
+ log .Debugf ("Frame %d has not been acked for %v\n " , oldestFN , oldest )
119
+ }
120
+ }
121
+ }()
122
+ }
123
+ bc .add (fmt .Sprintf ("%x(%s)" , newCID , d .label ), conn , true , probeStart , d )
124
+ return newCID , true , bc
125
+ }
126
+
89
127
// DialContext dials the addr using all dialers and returns a connection
90
128
// contains subflows from whatever dialers available.
91
129
func (mpd * mpDialer ) DialContext (ctx context.Context ) (net.Conn , error ) {
92
130
var bc * mpConn
93
- dialOne := func (d * subflowDialer , cid connectionID ) (connectionID , bool ) {
94
- conn , err := d .DialContext (ctx )
95
- if err != nil {
96
- log .Errorf ("failed to dial %s: %v" , d .Label (), err )
97
- return zeroCID , false
98
- }
99
- probeStart := time .Now ()
100
- newCID , err := mpd .handshake (conn , cid )
101
- if err != nil {
102
- log .Errorf ("failed to handshake %s, continuing: %v" , d .Label (), err )
103
- conn .Close ()
104
- return zeroCID , false
105
- }
106
- if cid == zeroCID {
107
- bc = newMPConn (newCID )
108
- go func () {
109
- for {
110
- time .Sleep (time .Second )
111
- bc .pendingAckMu .RLock ()
112
- oldest := time .Duration (0 )
113
- oldestFN := uint64 (0 )
114
- for fn , frame := range bc .pendingAckMap {
115
- if time .Since (frame .sentAt ) > oldest {
116
- oldest = time .Since (frame .sentAt )
117
- oldestFN = fn
118
- }
119
- }
120
- bc .pendingAckMu .RUnlock ()
121
- if oldest > time .Second {
122
- log .Debugf ("Frame %d has not been acked for %v\n " , oldestFN , oldest )
123
- }
124
- }
125
- }()
126
- }
127
- bc .add (fmt .Sprintf ("%x(%s)" , newCID , d .label ), conn , true , probeStart , d )
128
- return newCID , true
129
- }
130
131
dialers := mpd .sorted ()
131
132
for i , d := range dialers {
132
133
// dial the first connection with zero connection ID
133
- cid , ok := dialOne (d , zeroCID )
134
+ dialctx , dialcancel := context .WithTimeout (ctx , time .Second * 2 )
135
+ defer dialcancel ()
136
+ cid , ok , bc := mpd .dialOne (d , zeroCID , bc , dialctx )
134
137
if ! ok {
135
138
continue
136
139
}
137
140
if i < len (dialers )- 1 {
138
141
// dial the rest in parallel with server assigned connection ID
139
142
for _ , d := range dialers [i + 1 :] {
140
- go dialOne (d , cid )
143
+ go mpd . dialOne (d , cid , bc , ctx )
141
144
}
142
145
}
143
146
return bc , nil
0 commit comments