Skip to content

Commit 07beeeb

Browse files
authored
Cross reference ioreg info with system_profiler
- Should help with discovery when some items don't populate properly in ioreg - May lead to addressing issues if the port limit patch is used - but will show a warning
1 parent dd079d4 commit 07beeeb

File tree

1 file changed

+76
-5
lines changed

1 file changed

+76
-5
lines changed

USBMap.py

+76-5
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ def merge_controllers(self,from_cont=None,into_cont=None):
164164

165165
def save_plist(self,controllers=None):
166166
if controllers == None: controllers = self.merged_list
167+
self.sanitize_controllers(controllers)
167168
# Ensure the lists are the same
168169
try:
169170
with open(self.usb_list,"wb") as f:
@@ -173,6 +174,14 @@ def save_plist(self,controllers=None):
173174
print("Could not save to USB.plist! {}".format(e))
174175
return False
175176

177+
def sanitize_controllers(self, controllers=None):
178+
if controllers is None: controllers = self.merged_list
179+
# Walk each controller and strip any red ansi escapes from the names
180+
for controller in controllers:
181+
if not "ports" in controllers[controller]: continue
182+
for port in controllers[controller]["ports"]:
183+
controllers[controller]["ports"][port]["items"] = [x.replace(self.rs,"").replace(self.ce,"") for x in controllers[controller]["ports"][port]["items"]]
184+
176185
def sanitize_ioreg(self,ioreg):
177186
# Walks the passed ioreg and attempts to fix devices with newlines in their names
178187
# by replacing them with spaces
@@ -271,6 +280,36 @@ def get_by_ioreg(self):
271280
last_root = last_root["items"][p["id"]]
272281
return ports
273282

283+
def get_sp_usb(self,indent=" "):
284+
# Gather a top-level array of USB devices plugged in per system_profiler
285+
sp_usb_list = []
286+
try:
287+
if os.path.exists("sp_usb_list.txt"):
288+
with open("sp_usb_list.txt","rb") as f:
289+
sp_usb_xml = plist.load(f)
290+
else:
291+
sp_usb_xml = plist.loads(self.r.run({"args":["system_profiler","-xml","-detaillevel","mini","SPUSBDataType"]})[0])
292+
except:
293+
return sp_usb_list
294+
items_list = []
295+
for top in sp_usb_xml:
296+
if "_items" in top:
297+
items_list.extend(top["_items"])
298+
while items_list:
299+
item = items_list.pop()
300+
if "location_id" in item:
301+
try: item["location_id_adjusted"] = item["location_id"][2:].split()[0]
302+
except: continue # Broken
303+
if not "indent" in item:
304+
item["indent"] = ""
305+
sp_usb_list.append(item)
306+
if "_items" in item:
307+
new_items = item.pop("_items")
308+
for i in new_items:
309+
i["indent"] = item.get("indent","")+indent
310+
items_list.extend(new_items)
311+
return sp_usb_list
312+
274313
def map_inheritance(self,top_level,level=1,indent=" "):
275314
# Iterates through all "items" entries in the top_level dict
276315
# and returns a formatted string showing inheritance
@@ -284,7 +323,7 @@ def map_inheritance(self,top_level,level=1,indent=" "):
284323
addr = "Unknown"
285324
name = check_entry.get("name",check_entry.get("type","Unknown"))
286325
value = (indent * level) + "- {}{}".format(name, " (HUB-{})".format(addr) if is_hub and check_entry.get("map_hub",False) and self.map_hubs else "")
287-
text.append(value)
326+
text.append((value,name))
288327
# Verify if we're on a hub and mapping those
289328
if is_hub and check_entry.get("map_hub",False) and self.map_hubs:
290329
# Got a hub - this will be mapped elsewhere
@@ -307,24 +346,50 @@ def get_items_for_port(self,port_id,indent=" "):
307346
if not port: return [] # No items, or the port wasn't found?
308347
return self.map_inheritance(port)
309348

310-
def get_ports_and_devices_for_controller(self,controller,indent=" "):
349+
def get_ports_and_devices_for_controller(self,controller,sp_usb_list=[],indent=" "):
311350
self.check_controllers()
312351
assert controller in self.controllers # Error if the controller doesn't exist
313352
port_dict = OrderedDict()
353+
# Gather a list of any duplicate port addresses
354+
port_addrs = [self.controllers[controller]["ports"][p]["address"] for p in self.controllers[controller]["ports"]]
355+
dupe_addrs = [x for x in port_addrs if port_addrs.count(x)>1]
356+
# Walk the ports
314357
for port_num in sorted(self.controllers[controller]["ports"]):
315358
port = self.controllers[controller]["ports"][port_num]
316359
# The name of each entry should be "PortName - PortNum (Controller)"
317360
port_num = self.hex_dec(self.hex_swap(port["port"]))
318361
entry_name = "{} | {} | {} | {} | {} | {} | {}".format(port["name"],port["type"],port["port"],port["address"],port.get("connector",-1),controller,self.controllers[controller]["parent"])
319-
port_dict[entry_name] = self.get_items_for_port(port["id"],indent=indent)
362+
inheritance = self.get_items_for_port(port["id"],indent=indent)
363+
port_dict[entry_name] = [x[0] for x in inheritance] # Get the values
364+
names = [x[1] for x in inheritance] # Get the original names for comparing
365+
# Check if we're missing any items from the system_profiler output for that port
366+
for item in sp_usb_list:
367+
try:
368+
l_id = item["location_id_adjusted"]
369+
name = item["_name"]
370+
except: continue # Broken?
371+
if not l_id.startswith(port["address"].rstrip("0")):
372+
continue # Not our port
373+
if name in names:
374+
continue # Already have it
375+
# We found one that we didn't have - check if it's potentially on a duplicate
376+
# address and highlight it
377+
port_dict[entry_name].append("{}* {}{}{}".format(
378+
item.get("indent",indent),
379+
self.rs if port["address"] in dupe_addrs else "",
380+
name,
381+
self.ce if port["address"] in dupe_addrs else ""
382+
))
320383
return port_dict
321384

322385
def get_ports_and_devices(self,indent=" "):
323386
# Returns a dict of all ports and their connected devices
324387
self.check_controllers()
325388
port_dict = OrderedDict()
389+
# Get the system_profiler USB output
390+
sp_usb_list = self.get_sp_usb()
326391
for x in self.controllers:
327-
port_dict.update(self.get_ports_and_devices_for_controller(x,indent=indent))
392+
port_dict.update(self.get_ports_and_devices_for_controller(x,sp_usb_list=sp_usb_list,indent=indent))
328393
return port_dict
329394

330395
def get_populated_count_for_controller(self,controller):
@@ -681,7 +746,6 @@ def discover_ports(self):
681746
total_ports = OrderedDict()
682747
last_ports = OrderedDict()
683748
last_list = []
684-
pad = 11
685749
while True:
686750
extras = 0
687751
last_w = 80
@@ -703,6 +767,8 @@ def discover_ports(self):
703767
# Enumerate the ports
704768
last_cont = None
705769
cont_count = {}
770+
show_red_warning = False
771+
pad = 11
706772
for index,port in enumerate(check_ports):
707773
n,t,p,a,e,c,r = port.split(" | ")
708774
if len(total_ports.get(port,[])): cont_count[c] = cont_count.get(c,0)+1
@@ -740,7 +806,12 @@ def discover_ports(self):
740806
if len(check_ports[port]):
741807
extras += len(check_ports[port])
742808
print("\n".join(check_ports[port]))
809+
if any((self.rs in red_check for red_check in check_ports[port])):
810+
show_red_warning = True
743811
print("")
812+
if show_red_warning:
813+
pad = 13
814+
print("- Items in {}RED{} do not have accurate addressing\n".format(self.rs,self.ce))
744815
# List the controllers and their port counts
745816
print("Populated:")
746817
pop_list = []

0 commit comments

Comments
 (0)