8
8
destination IP to trigger the process of obtaining neighbor information
9
9
"""
10
10
import subprocess
11
+ import sys
11
12
import time
12
13
from datetime import datetime
13
14
from ipaddress import ip_interface
15
+ from queue import Queue
14
16
15
- from swsscommon .swsscommon import ConfigDBConnector , SonicV2Connector
17
+ from swsscommon .swsscommon import ConfigDBConnector , SonicV2Connector , \
18
+ DBConnector , Select , SubscriberStateTable
16
19
from sonic_py_common import logger as log
17
20
18
21
from pyroute2 import IPRoute
25
28
logger = log .Logger ()
26
29
27
30
STATE_DB = 'STATE_DB'
31
+ APPL_DB = 'APPL_DB'
28
32
PORTCHANNEL_INTERFACE_TABLE = 'PORTCHANNEL_INTERFACE'
29
33
TUNNEL_TABLE = 'TUNNEL'
30
34
PEER_SWITCH_TABLE = 'PEER_SWITCH'
31
35
INTF_TABLE_TEMPLATE = 'INTERFACE_TABLE|{}|{}'
36
+ LAG_TABLE = 'LAG_TABLE'
32
37
STATE_KEY = 'state'
33
38
TUNNEL_TYPE_KEY = 'tunnel_type'
34
39
DST_IP_KEY = 'dst_ip'
35
40
ADDRESS_IPV4_KEY = 'address_ipv4'
41
+ OPER_STATUS_KEY = 'oper_status'
36
42
IPINIP_TUNNEL = 'ipinip'
37
-
38
43
RTM_NEWLINK = 'RTM_NEWLINK'
44
+ SELECT_TIMEOUT = 1000
45
+
46
+ nl_msgs = Queue ()
47
+ portchannel_intfs = None
48
+
49
+ def add_msg_to_queue (target , msg ):
50
+ """
51
+ Adds a netlink message to a queue
39
52
53
+ Args:
54
+ target: unused, needed by NDB API
55
+ msg: a netlink message
56
+ """
57
+
58
+ if msg .get_attr ('IFLA_IFNAME' ) in portchannel_intfs :
59
+ nl_msgs .put (msg )
40
60
41
61
class TunnelPacketHandler (object ):
42
62
"""
@@ -55,7 +75,10 @@ def __init__(self):
55
75
self .sniffer = None
56
76
self .self_ip = ''
57
77
self .packet_filter = ''
58
- self .sniff_intfs = []
78
+ self .sniff_intfs = set ()
79
+
80
+ global portchannel_intfs
81
+ portchannel_intfs = [name for name , _ in self .portchannel_intfs ]
59
82
60
83
@property
61
84
def portchannel_intfs (self ):
@@ -95,17 +118,6 @@ def get_intf_name(self, msg):
95
118
96
119
return ''
97
120
98
- def netlink_msg_is_for_portchannel (self , msg ):
99
- """
100
- Determines if a netlink message is about a PortChannel interface
101
-
102
- Returns:
103
- (list) integers representing kernel indices
104
- """
105
- ifname = self .get_intf_name (msg )
106
-
107
- return ifname in [name for name , _ in self .portchannel_intfs ]
108
-
109
121
def get_up_portchannels (self ):
110
122
"""
111
123
Returns the portchannels which are operationally up
@@ -125,11 +137,11 @@ def get_up_portchannels(self):
125
137
logger .log_notice ("Skipping non-existent interface {}" .format (intf ))
126
138
continue
127
139
link_statuses .append (status [0 ])
128
- up_portchannels = []
140
+ up_portchannels = set ()
129
141
130
142
for status in link_statuses :
131
- if status [ 'state' ] == 'up' :
132
- up_portchannels .append ( self . get_intf_name ( status ))
143
+ if status . get_attr ( 'IFLA_OPERSTATE' ). lower () == 'up' :
144
+ up_portchannels .add ( status . get_attr ( 'IFLA_IFNAME' ))
133
145
134
146
return up_portchannels
135
147
@@ -242,52 +254,47 @@ def get_inner_pkt_type(self, packet):
242
254
return IPv6
243
255
return False
244
256
245
- def wait_for_netlink_msgs (self ):
246
- """
247
- Gathers any RTM_NEWLINK messages
248
-
249
- Returns:
250
- (list) containing any received messages
251
- """
252
- msgs = []
253
- with IPRoute () as ipr :
254
- ipr .bind ()
255
- for msg in ipr .get ():
256
- if msg ['event' ] == RTM_NEWLINK :
257
- msgs .append (msg )
258
-
259
- return msgs
260
-
261
- def sniffer_restart_required (self , messages ):
257
+ def sniffer_restart_required (self , lag , fvs ):
262
258
"""
263
259
Determines if the packet sniffer needs to be restarted
264
260
265
- A restart is required if all of the following conditions are met:
266
- 1. A netlink message of type RTM_NEWLINK is received
267
- (this is checked by `wait_for_netlink_msgs`)
268
- 2. The interface index of the message corresponds to a portchannel
269
- interface
270
- 3. The state of the interface in the message is 'up'
271
- Here, we do not care about an interface going down since
272
- the sniffer is able to continue sniffing on the other
273
- interfaces. However, if an interface has gone down and
274
- come back up, we need to restart the sniffer to be able
275
- to sniff traffic on the interface that has come back up.
261
+ The sniffer needs to be restarted when a portchannel interface transitions
262
+ from down to up. When a portchannel interface goes down, the sniffer is
263
+ able to continue sniffing on other portchannels.
276
264
"""
277
- for msg in messages :
278
- if self .netlink_msg_is_for_portchannel (msg ):
279
- if msg ['state' ] == 'up' :
280
- logger .log_info ('{} came back up, sniffer restart required'
281
- .format (self .get_intf_name (msg )))
282
- return True
283
- return False
265
+ oper_status = dict (fvs ).get (OPER_STATUS_KEY )
266
+ if lag not in self .sniff_intfs and oper_status == 'up' :
267
+ logger .log_info ('{} came back up, sniffer restart required'
268
+ .format (lag ))
269
+ # Don't need to modify self.sniff_intfs here since it is repopulated
270
+ # by self.get_up_portchannels()
271
+ return True
272
+ elif lag in self .sniff_intfs and oper_status == 'down' :
273
+ # A portchannel interface went down, remove it from the list of
274
+ # sniffed interfaces so we can detect when it comes back up
275
+ self .sniff_intfs .remove (lag )
276
+ return False
277
+ else :
278
+ return False
284
279
285
280
def start_sniffer (self ):
286
281
"""
287
282
Starts an AsyncSniffer and waits for it to inititalize fully
288
283
"""
284
+ start = datetime .now ()
285
+
286
+ self .sniff_intfs = self .get_up_portchannels ()
287
+
288
+ while not self .sniff_intfs :
289
+ logger .log_info ('No portchannels are up yet...' )
290
+ if (datetime .now () - start ).seconds > 180 :
291
+ logger .log_error ('All portchannels failed to come up within 3 minutes, exiting...' )
292
+ sys .exit (1 )
293
+ self .sniff_intfs = self .get_up_portchannels ()
294
+ time .sleep (10 )
295
+
289
296
self .sniffer = AsyncSniffer (
290
- iface = self .sniff_intfs ,
297
+ iface = list ( self .sniff_intfs ) ,
291
298
filter = self .packet_filter ,
292
299
prn = self .ping_inner_dst ,
293
300
store = 0
@@ -332,18 +339,35 @@ def listen_for_tunnel_pkts(self):
332
339
logger .log_notice ('Starting tunnel packet handler for {}'
333
340
.format (self .packet_filter ))
334
341
335
- self .sniff_intfs = self .get_up_portchannels ()
336
- logger .log_info ("Listening on interfaces {}" .format (self .sniff_intfs ))
342
+
343
+ app_db = DBConnector (APPL_DB , 0 )
344
+ lag_table = SubscriberStateTable (app_db , LAG_TABLE )
345
+ sel = Select ()
346
+ sel .addSelectable (lag_table )
337
347
338
348
self .start_sniffer ()
349
+ logger .log_info ("Listening on interfaces {}" .format (self .sniff_intfs ))
339
350
while True :
340
- msgs = self .wait_for_netlink_msgs ()
341
- if self .sniffer_restart_required (msgs ):
342
- self .sniffer .stop ()
343
- sniff_intfs = self .get_up_portchannels ()
344
- logger .log_notice ('Restarting tunnel packet handler on '
345
- 'interfaces {}' .format (sniff_intfs ))
346
- self .start_sniffer ()
351
+ rc , _ = sel .select (SELECT_TIMEOUT )
352
+
353
+ if rc == Select .TIMEOUT :
354
+ continue
355
+ elif rc == Select .ERROR :
356
+ raise Exception ("Select() error" )
357
+ else :
358
+ lag , op , fvs = lag_table .pop ()
359
+ if self .sniffer_restart_required (lag , fvs ):
360
+ self .sniffer .stop ()
361
+ start = datetime .now ()
362
+ # wait up to 3 seconds for the kernel interface to be synced with APPL_DB status
363
+ while (datetime .now () - start ).seconds < 3 :
364
+ self .sniff_intfs = self .get_up_portchannels ()
365
+ if lag in self .sniff_intfs :
366
+ break
367
+ time .sleep (0.1 )
368
+ logger .log_notice ('Restarting tunnel packet handler on '
369
+ 'interfaces {}' .format (self .sniff_intfs ))
370
+ self .start_sniffer ()
347
371
348
372
def run (self ):
349
373
"""
@@ -360,4 +384,4 @@ def main():
360
384
361
385
362
386
if __name__ == "__main__" :
363
- main ()
387
+ main ()
0 commit comments