@@ -150,20 +150,51 @@ public override void BaseSubscribe(string topic, Action<EasyCachingMessage> acti
150150 queueName = _options . QueueName ;
151151 }
152152
153- Task . Factory . StartNew ( ( ) =>
153+ Task . Factory . StartNew (
154+ ( ) => StartConsumer ( queueName , topic ) ,
155+ TaskCreationOptions . LongRunning ) ;
156+ }
157+
158+
159+ private void StartConsumer ( string queueName , string topic )
160+ {
161+ var model = _subConnection . CreateModel ( ) ;
162+
163+ model . ExchangeDeclare ( _options . TopicExchangeName , ExchangeType . Topic , true , false , null ) ;
164+ model . QueueDeclare ( queueName , false , false , true , null ) ;
165+ // bind the queue with the exchange.
166+ model . QueueBind ( queueName , _options . TopicExchangeName , topic ) ;
167+ var consumer = new EventingBasicConsumer ( model ) ;
168+ consumer . Received += OnMessage ;
169+ consumer . Shutdown += ( sender , e ) =>
154170 {
155- var model = _subConnection . CreateModel ( ) ;
156- model . ExchangeDeclare ( _options . TopicExchangeName , ExchangeType . Topic , true , false , null ) ;
157- model . QueueDeclare ( queueName , false , false , true , null ) ;
158- // bind the queue with the exchange.
159- model . QueueBind ( queueName , _options . TopicExchangeName , topic ) ;
160- var consumer = new EventingBasicConsumer ( model ) ;
161- consumer . Received += OnMessage ;
162- consumer . Shutdown += OnConsumerShutdown ;
163-
164- model . BasicConsume ( queueName , true , consumer ) ;
165-
166- } , TaskCreationOptions . LongRunning ) ;
171+ OnConsumerShutdown ( sender , e ) ;
172+ OnConsumerError ( queueName , topic , model ) ;
173+ } ;
174+
175+ consumer . ConsumerCancelled += ( s , e ) =>
176+ {
177+ OnConsumerError ( queueName , topic , model ) ;
178+ } ;
179+
180+ model . BasicConsume ( queueName , true , consumer ) ;
181+ }
182+
183+ private void OnConsumerError ( string queueName , string topic , IModel model )
184+ {
185+ StartConsumer ( queueName , topic ) ;
186+ BaseOnReconnect ( ) ;
187+ try
188+ {
189+ if ( model ? . IsOpen == true )
190+ {
191+ model ? . Dispose ( ) ;
192+ }
193+ }
194+ catch
195+ {
196+ // nothing to do
197+ }
167198 }
168199
169200 /// <summary>
0 commit comments