@@ -234,31 +234,10 @@ func (s *Sender) trySend(ctx context.Context, evt eventer) error {
234
234
235
235
switch err .(type ) {
236
236
case * amqp.Error , * amqp.DetachError :
237
- tab .For (ctx ).Debug ("recovering connection" )
238
- _ , retryErr := common .Retry (10 , 10 * time .Second , func () (interface {}, error ) {
239
- ctx , sp := s .startProducerSpanFromContext (ctx , "sb.Sender.trySend.tryRecover" )
240
- defer sp .End ()
241
-
242
- err := s .Recover (ctx )
243
- if err == nil {
244
- tab .For (ctx ).Debug ("recovered connection" )
245
- return nil , nil
246
- }
247
-
248
- select {
249
- case <- ctx .Done ():
250
- return nil , ctx .Err ()
251
- default :
252
- return nil , common .Retryable (err .Error ())
253
- }
254
- })
255
-
256
- if retryErr != nil {
257
- tab .For (ctx ).Debug ("sender recovering retried, but error was unrecoverable" )
258
- if err := s .Close (ctx ); err != nil {
259
- tab .For (ctx ).Error (err )
260
- }
261
- return retryErr
237
+ err = s .handleAmqpError (ctx , err )
238
+ if err != nil {
239
+ tab .For (ctx ).Error (err )
240
+ return err
262
241
}
263
242
default :
264
243
tab .For (ctx ).Error (err )
@@ -268,6 +247,50 @@ func (s *Sender) trySend(ctx context.Context, evt eventer) error {
268
247
}
269
248
}
270
249
250
+ func (s * Sender ) handleAmqpError (ctx context.Context , err error ) error {
251
+ if amqpError , ok := err .(* amqp.Error ); ok {
252
+ switch amqpError .Condition {
253
+ case ErrorServerBusy :
254
+ return s .retryRetryableAmqpError (ctx , AmqpRetryDefaultTimes , AmqpRetryBusyServerDelay )
255
+ case ErrorTimeout :
256
+ return s .retryRetryableAmqpError (ctx , AmqpRetryDefaultTimes , AmqpRetryDefaultDelay )
257
+ case ErrorOperationCancelled :
258
+ return s .retryRetryableAmqpError (ctx , AmqpRetryDefaultTimes , AmqpRetryDefaultDelay )
259
+ case ErrorContainerClose :
260
+ return s .retryRetryableAmqpError (ctx , AmqpRetryDefaultTimes , AmqpRetryDefaultDelay )
261
+ default :
262
+ return err
263
+ }
264
+ }
265
+ return s .retryRetryableAmqpError (ctx , AmqpRetryDefaultTimes , AmqpRetryDefaultDelay )
266
+ }
267
+
268
+ func (s * Sender ) retryRetryableAmqpError (ctx context.Context , times int , delay time.Duration ) error {
269
+ tab .For (ctx ).Debug ("recovering sender connection" )
270
+ _ , retryErr := common .Retry (times , delay , func () (interface {}, error ) {
271
+ ctx , sp := s .startProducerSpanFromContext (ctx , "sb.Sender.trySend.tryRecover" )
272
+ defer sp .End ()
273
+
274
+ err := s .Recover (ctx )
275
+ if err == nil {
276
+ tab .For (ctx ).Debug ("recovered connection" )
277
+ return nil , nil
278
+ }
279
+
280
+ select {
281
+ case <- ctx .Done ():
282
+ return nil , ctx .Err ()
283
+ default :
284
+ return nil , common .Retryable (err .Error ())
285
+ }
286
+ })
287
+ if retryErr != nil {
288
+ tab .For (ctx ).Debug ("sender recovering retried, but error was unrecoverable" )
289
+ return retryErr
290
+ }
291
+ return nil
292
+ }
293
+
271
294
func (s * Sender ) connClosedError (ctx context.Context ) error {
272
295
name := "Sender"
273
296
if s .Name != "" {
0 commit comments