1- # Copyright 2020-2021 Axis Communications AB.
1+ # Copyright 2020-2022 Axis Communications AB.
22#
33# For a full list of individual contributors, please see the commit history.
44#
1717import time
1818import logging
1919import warnings
20+ from threading import RLock , current_thread
2021from copy import deepcopy
2122
2223import pika
@@ -35,6 +36,8 @@ class RabbitMQPublisher(EiffelPublisher, BaseRabbitMQ):
3536 _nacks = 0
3637 _delivered = 0
3738 _last_delivered_tag = 0
39+ # RLock is used so that a thread can acquire a lock multiple times without blocking.
40+ _lock = RLock ()
3841
3942 # pylint:disable=too-many-arguments
4043 def __init__ (self , host , exchange , routing_key = "eiffel" ,
@@ -101,13 +104,38 @@ def _publisher_canceled(self, method_frame):
101104
102105 def _resend_nacked_deliveries (self ):
103106 """Resend all NACKed deliveries. This method loops forever."""
104- deliveries = self ._nacked_deliveries .copy ()
105- if deliveries :
106- _LOG .info ("Resending %i NACKed deliveries" , len (deliveries ))
107- self ._nacked_deliveries .clear ()
108- for event in deliveries :
109- self .send_event (event )
110- self ._connection .ioloop .call_later (1 , self ._resend_nacked_deliveries )
107+ if not self .is_alive () or (self ._channel is None or not self ._channel .is_open ):
108+ _LOG .warning ("Publisher is not ready. Retry resending NACKed deliveries in 1s" )
109+ self ._connection .ioloop .call_later (1 , self ._resend_nacked_deliveries )
110+ return
111+
112+ # No need to acquire the lock if there are no nacked deliveries.
113+ if not len (self ._nacked_deliveries ):
114+ self ._connection .ioloop .call_later (1 , self ._resend_nacked_deliveries )
115+ return
116+
117+ # If we cannot acquire the lock here, retry later otherwise call the send_event method.
118+ _LOG .debug (f"[{ current_thread ().getName ()} ] Attempting to acquire '_resend_nacked_deliveries' lock" )
119+ acquired = self ._lock .acquire (blocking = False )
120+ if not acquired :
121+ _LOG .debug (f"[{ current_thread ().getName ()} ] '_resend_nacked_deliveries' Locked" )
122+ self ._connection .ioloop .call_later (1 , self ._resend_nacked_deliveries )
123+ return
124+ try :
125+ _LOG .debug (f"[{ current_thread ().getName ()} ] '_resend_nacked_deliveries' Lock acquired" )
126+ deliveries = self ._nacked_deliveries .copy ()
127+ if deliveries :
128+ _LOG .info ("Resending %i NACKed deliveries" , len (deliveries ))
129+ for event in deliveries :
130+ # It is safe to remove the event here since if it fails delivery
131+ # in send_event it will be re-added to _nacked_deliveries.
132+ self ._nacked_deliveries .remove (event )
133+ # Never block in an ioloop method
134+ self .send_event (event , block = False )
135+ finally :
136+ self ._lock .release ()
137+ _LOG .debug (f"[{ current_thread ().getName ()} ] '_resend_nacked_deliveries' Lock released" )
138+ self ._connection .ioloop .call_later (1 , self ._resend_nacked_deliveries )
111139
112140 def _confirm_delivery (self , method_frame ):
113141 """Confirm the delivery of events and make sure we resend NACKed events.
@@ -127,25 +155,34 @@ def _confirm_delivery(self, method_frame):
127155 else :
128156 number_of_acks = delivery_tag - self ._last_delivered_tag
129157
130- if confirmation_type == 'ack' :
131- self . _acks += number_of_acks
132- elif confirmation_type == 'nack' :
133- self ._nacks += number_of_acks
134-
135- if delivery_tag == 0 :
136- if confirmation_type == "nack" :
137- self . _nacked_deliveries . extend ( self . _deliveries . values ())
138- self ._deliveries . clear ()
139- else :
140- for tag in range ( self . _last_delivered_tag + 1 , delivery_tag + 1 ) :
158+ # Since _resend_nacked_deliveries runs in a thread we must protect this
159+ # part that modifies class attributes.
160+ _LOG . debug ( f"[ { current_thread (). getName () } ] Attempting to acquire '_confirm_delivery' lock" )
161+ with self ._lock :
162+ _LOG . debug ( f"[ { current_thread (). getName () } ] '_confirm_delivery' Lock acquired" )
163+ if confirmation_type == 'ack' :
164+ self . _acks += number_of_acks
165+ elif confirmation_type == 'nack' :
166+ self ._nacks += number_of_acks
167+
168+ if delivery_tag == 0 :
141169 if confirmation_type == "nack" :
142- self ._nacked_deliveries .append (self ._deliveries [tag ])
143- self ._deliveries .pop (tag )
144- self ._last_delivered_tag = delivery_tag
145-
146- _LOG .debug ('Published %i messages, %i have yet to be confirmed, '
147- '%i were acked and %i were nacked' , self ._acks + self ._nacks ,
148- len (self ._deliveries ), self ._acks , self ._nacks )
170+ self ._nacked_deliveries .extend (self ._deliveries .values ())
171+ self ._deliveries .clear ()
172+ else :
173+ for tag in range (self ._last_delivered_tag + 1 , delivery_tag + 1 ):
174+ if confirmation_type == "nack" :
175+ self ._nacked_deliveries .append (self ._deliveries [tag ])
176+ try :
177+ self ._deliveries .pop (tag )
178+ except KeyError :
179+ _LOG .warning ("KeyError when attempting to pop tag %i" )
180+ self ._last_delivered_tag = delivery_tag
181+
182+ _LOG .debug ('Published %i messages, %i have yet to be confirmed, '
183+ '%i were acked and %i were nacked' , self ._acks + self ._nacks ,
184+ len (self ._deliveries ), self ._acks , self ._nacks )
185+ _LOG .debug (f"[{ current_thread ().getName ()} ] '_confirm_delivery' Lock released" )
149186
150187 def send_event (self , event , block = True ):
151188 """Validate and send an eiffel event to the rabbitmq server.
@@ -186,17 +223,22 @@ def send_event(self, event, block=True):
186223 event .meta .add ("source" , source )
187224 event .validate ()
188225 routing_key = self .routing_key or event .routing_key
189- try :
190- self ._channel .basic_publish (
191- self .exchange ,
192- routing_key ,
193- event .serialized ,
194- properties ,
195- )
196- except :
197- self ._nacked_deliveries .append (event )
198- return
199- self ._delivered += 1
200- self ._deliveries [self ._delivered ] = event
226+
227+ _LOG .debug (f"[{ current_thread ().getName ()} ] Attempting to acquire 'send_event' lock" )
228+ with self ._lock :
229+ _LOG .debug (f"[{ current_thread ().getName ()} ] 'send_event' Lock acquired" )
230+ try :
231+ self ._channel .basic_publish (
232+ self .exchange ,
233+ routing_key ,
234+ event .serialized ,
235+ properties ,
236+ )
237+ except :
238+ self ._nacked_deliveries .append (event )
239+ return
240+ self ._delivered += 1
241+ self ._deliveries [self ._delivered ] = event
242+ _LOG .debug (f"[{ current_thread ().getName ()} ] 'send_event' Lock released" )
201243
202244 send = send_event
0 commit comments