From 6fc02c7b2e52519216c0466a89f09de8255a4538 Mon Sep 17 00:00:00 2001 From: Evi Vanoost Date: Sun, 8 Oct 2023 14:19:22 -0400 Subject: [PATCH] Fix issues with getting tags that are deeper than 2 levels for user info Fix issues with Snipe-IT installs that have more than 500 models Fix issues with Snipe-IT installs that have many users (20k+), fetching them all is costly. Fix various code cleanliness and logic errors that crashed the program due to the weirdness in return values in large (and old) JAMF installations --- jamf2snipe | 221 ++++++++++++++++++++++++++++------------------------- 1 file changed, 117 insertions(+), 104 deletions(-) diff --git a/jamf2snipe b/jamf2snipe index a363020..b3d8f04 100755 --- a/jamf2snipe +++ b/jamf2snipe @@ -46,9 +46,8 @@ validsubset = [ "configuration_profiles" ] - # Import all the things -import json +from functools import lru_cache import requests import time import configparser @@ -463,80 +462,79 @@ def search_snipe_asset(serial): logging.debug('{} - {}'.format(response.status_code, response.content)) return "ERROR" -# Function to get all the asset models -def get_snipe_models(): - api_url = '{}/api/v1/models'.format(snipe_base) - logging.debug('Calling against: {}'.format(api_url)) - response = requests.get(api_url, headers=snipeheaders, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) - if response.status_code == 200: - jsonresponse = response.json() - logging.info("Got a valid response that should have {} models.".format(jsonresponse['total'])) - if jsonresponse['total'] <= len(jsonresponse['rows']) : - return jsonresponse - else: - logging.info("We didn't get enough results so we need to get them again.") - api_url = '{}/api/v1/models?limit={}'.format(snipe_base, jsonresponse['total']) - newresponse = requests.get(api_url, headers=snipeheaders, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) - if response.status_code == 200: - newjsonresponse = newresponse.json() - if newjsonresponse['total'] == len(newjsonresponse['rows']) : - return newjsonresponse - else: - logging.error("We couldn't seem to get all of the model numbers") - raise SystemExit("Unable to get all model objects from Snipe-IT instanace") - else: - logging.error('When we tried to retreive a list of models, Snipe-IT responded with error status code:{} - {}'.format(response.status_code, response.content)) - raise SystemExit("Snipe models API endpoint failed.") + +# Helper function to clean up the repeated codes +def api_call(endpoint, payload=None, method="GET"): + logging.debug(f"Calling {endpoint} with method {method} and payload {payload}") + api_url = f"{snipe_base}/api/v1/{endpoint}" + if method == "GET": + logging.debug(f"Calling: {api_url}") + response = requests.get(api_url, headers=snipeheaders, json=payload, verify=user_args.do_not_verify_ssl, + hooks={'response': request_handler}) + elif method == "POST": + response = requests.post(api_url, headers=snipeheaders, json=payload, verify=user_args.do_not_verify_ssl, + hooks={'response': request_handler}) + elif method == "PATCH": + response = requests.patch(api_url, headers=snipeheaders, json=payload, verify=user_args.do_not_verify_ssl, + hooks={'response': request_handler}) else: - logging.error('When we tried to retreive a list of models, Snipe-IT responded with error status code:{} - {}'.format(response.status_code, response.content)) - raise SystemExit("Snipe models API endpoint failed.") + logging.error(f"Unknown method {method}") + raise SystemExit("Unknown method") -# Recursive function returns all users in a Snipe Instance, 100 at a time. -def get_snipe_users(previous=[]): - user_id_url = '{}/api/v1/users'.format(snipe_base) - payload = { - 'limit': 100, - 'offset': len(previous) + if response.status_code != 200: + logging.error(f"Snipe-IT responded with error code:{response.text}") + logging.debug(f"{response.status_code} - {response.content}") + raise SystemExit("Snipe-IT API call failed") + logging.debug(f"Got a valid response from Snipe-IT: {response.text}") + return response.json() + + +# Function to get all the asset models +def get_snipe_models(current_models={}): + limits = { + 'limit': 500, + 'offset': len(current_models) } - logging.debug('The payload for the snipe users GET is {}'.format(payload)) - response = requests.get(user_id_url, headers=snipeheaders, params=payload, hooks={'response': request_handler}) - response_json = response.json() - current = response_json['rows'] - if len(previous) != 0: - current = previous + current - if response_json['total'] > len(current): - logging.debug('We have more than 100 users, get the next page - total: {} current: {}'.format(response_json['total'], len(current))) - return get_snipe_users(current) - else: - return current + response = api_call("models", method="GET", payload=limits) + + # This happens if there is an error + if "total" not in response: + logging.error("Fetching models failed, enable debug to see response") + raise SystemExit("Necessary Snipe-IT API call failed") + + # Quickly end if there are no rows + if "rows" not in response: + return current_models + + # Add the models to the dictionary + for row in response['rows']: + if row['model_number']: + current_models[row['model_number']] = row['id'] + + # If we haven't gotten all the models, get more + if response['total'] > len(current_models): + logging.debug(f'Fetching more models - {len(current_models)}/{response["total"]}') + current_models.update(get_snipe_models(current_models)) + + return current_models + # Function to search snipe for a user +@lru_cache(maxsize=2048) def get_snipe_user_id(username): - if username == '': - return "NotFound" + if not username: + return None username = username.lower() - for user in snipe_users: - for value in user.values(): - if str(value).lower() == username: - id = user['id'] - return id - if user_args.users_no_search: - logging.debug("No matches in snipe_users for {}, not querying the API for the next closest match since we've been told not to".format(username)) - return "NotFound" - logging.debug('No matches in snipe_users for {}, querying the API for the next closest match'.format(username)) - user_id_url = '{}/api/v1/users'.format(snipe_base) - payload = { - 'search':username, - 'limit':1, - 'sort':'username', - 'order':'asc' - } - logging.debug('The payload for the snipe user search is: {}'.format(payload)) - response = requests.get(user_id_url, headers=snipeheaders, params=payload, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) - try: - return response.json()['rows'][0]['id'] - except: - return "NotFound" + response = api_call("users", method="GET", payload={'username': username}) + if 'total' not in response or response['total'] == 0: + return None + + if response['total'] > 1: + logging.warning(f"Found {response['total']} users with username {username}, returning none") + return None + + return response['rows'][0]['id'] + # Function that creates a new Snipe Model - not an asset - with a JSON payload def create_snipe_model(payload): @@ -617,10 +615,10 @@ def checkin_snipe_asset(asset_id): def checkout_snipe_asset(user, asset_id, checked_out_user=None): logging.debug('Asset {} is being checked out to {}'.format(user, asset_id)) user_id = get_snipe_user_id(user) - if user_id == 'NotFound': + if not user_id: logging.info("User {} not found".format(user)) return "NotFound" - if checked_out_user == None: + if checked_out_user is None: logging.info("Not checked out, checking out to {}".format(user)) elif checked_out_user == "NewAsset": logging.info("First time this asset will be checked out, checking out to {}".format(user)) @@ -647,6 +645,32 @@ def checkout_snipe_asset(user, asset_id, checked_out_user=None): logging.error('Asset checkout failed for asset {} with error {}'.format(asset_id,response.text)) return response + +# Function to recursively get keys from a dictionary +def get_config_value(config_key, data): + search_keys = config_key.split(" ") + + value = data + for key in search_keys: + try: + key = int(key) + except ValueError: + logging.debug(f"{key} is not an integer") + try: + value = value[key] + except (KeyError, IndexError): + logging.info(f"{key} does not exist") + logging.debug(f"Ansible value: {value}") + value = None + break + except TypeError: + logging.error(f"Type error when fetching data for {key}, check your config") + raise SystemExit + + logging.debug(f"Got value {value} for {config_key}") + return value + + ### Run Testing ### # Report if we're verifying SSL or not. logging.info("SSL Verification is set to: {}".format(user_args.do_not_verify_ssl)) @@ -674,7 +698,7 @@ else: logging.info('We were able to get a good response from your JAMFPro instance.') # Exit if you can't contact SNIPE -if ( JAMF_UP == False ) or ( SNIPE_UP == False ): +if not JAMF_UP or not SNIPE_UP: raise SystemExit("Error: Host could not be contacted.") # Test that we can actually connect with the API keys by getting a bearer token. @@ -686,14 +710,7 @@ logging.info("Finished running our tests.") ### Get Started ### # Get a list of known models from Snipe logging.info("Getting a list of computer models that snipe knows about.") -snipemodels = get_snipe_models() -logging.debug("Parsing the {} model results for models with model numbers.".format(len(snipemodels['rows']))) -modelnumbers = {} -for model in snipemodels['rows']: - if model['model_number'] == "": - logging.debug("The model, {}, did not have a model number. Skipping.".format(model['name'])) - continue - modelnumbers[model['model_number']] = model['id'] +modelnumbers = get_snipe_models() logging.info("Our list of models has {} entries.".format(len(modelnumbers))) logging.debug("Here's the list of the {} models and their id's that we were able to collect:\n{}".format(len(modelnumbers), modelnumbers)) @@ -714,12 +731,6 @@ jamf_types = { 'mobile_devices': jamf_mobile_list } -# Get a list of users from Snipe if the user has specified -# they're syncing users - -if user_args.users or user_args.users_force or user_args.users_inverse: - snipe_users = get_snipe_users() - TotalNumber = 0 if user_args.computers: TotalNumber = len(jamf_types['computers']['computers']) @@ -730,12 +741,13 @@ else: TotalNumber += len(jamf_types[jamf_type][jamf_type]) # Make sure we have a good list. -if jamf_computer_list != None: - logging.info('Received a list of JAMF assets that had {} entries.'.format(TotalNumber)) -else: +if jamf_computer_list is None: logging.error("We were not able to retreive a list of assets from your JAMF instance. It's likely that your settings, or credentials are incorrect. Check your settings.conf and verify you can make API calls outside of this system with the credentials found in your settings.conf") raise SystemExit("Unable to get JAMF Computers.") +logging.info('Received a list of JAMF assets that had {} entries.'.format(TotalNumber)) + + # After this point we start editing data, so quit if this is a dryrun if user_args.dryrun: raise SystemExit("Dryrun: Complete.") @@ -759,15 +771,16 @@ for jamf_type in jamf_types: jamf = search_jamf_asset(jamf_asset['id']) elif jamf_type == 'mobile_devices': jamf = search_jamf_mobile(jamf_asset['id']) - if jamf == None: + if not jamf: continue # If the entry doesn't contain a serial, then we need to skip this entry. - if jamf['general']['serial_number'] == 'Not Available': + if not jamf['general']['serial_number']: logging.warning("The serial number is not available in JAMF. This is normal for DEP enrolled devices that have not yet checked in for the first time and for personal mobile devices. Since there's no serial number yet, we'll skip it for now.") continue - if jamf['general']['serial_number'] == None: - logging.warning("The serial number is not available in JAMF. This is normal for DEP enrolled devices that have not yet checked in for the first time and for personal mobile devices. Since there's no serial number yet, we'll skip it for now.") + if jamf['general']['serial_number'] == 'Not Available': + logging.warning( + "The serial number is not available in JAMF. This is normal for DEP enrolled devices that have not yet checked in for the first time and for personal mobile devices. Since there's no serial number yet, we'll skip it for now.") continue # Check that the model number exists in snipe, if not create it. @@ -813,7 +826,7 @@ for jamf_type in jamf_types: logging.verbose(jamf) continue #raise SystemError('No such attribute {} in the jamf payload. Please check your settings.conf file'.format(tag_split)) - if jamf_asset_tag == None or jamf_asset_tag == '': + if not jamf_asset_tag: logging.debug('No custom configuration found in settings.conf for asset tag name upon asset creation.') if jamf_type == 'mobile_devices': jamf_asset_tag = 'jamfid-m-{}'.format(jamf['general']['id']) @@ -858,12 +871,12 @@ for jamf_type in jamf_types: if new_snipe_asset[0] != "AssetCreated": continue if user_args.users or user_args.users_force or user_args.users_inverse: - jamfsplit = config['user-mapping']['jamf_api_field'].split() - if jamfsplit[1] not in jamf[jamfsplit[0]]: - logging.info("Couldn't find {} for this device in {}, not checking it out.".format(jamfsplit[1], jamfsplit[0])) + user = get_config_value(config['user-mapping']['jamf_api_field'], jamf) + if not user: + logging.info("User not found in JAMF information for this device. Skipping") continue - logging.info('Checking out new item {} to user {}'.format(jamf['general']['name'], jamf['{}'.format(jamfsplit[0])]['{}'.format(jamfsplit[1])])) - checkout_snipe_asset(jamf['{}'.format(jamfsplit[0])]['{}'.format(jamfsplit[1])],new_snipe_asset[1].json()['payload']['id'], "NewAsset") + checkout_snipe_asset(jamf['{}'.format(jamfsplit[0])]['{}'.format(jamfsplit[1])], + new_snipe_asset[1].json()['payload']['id'], "NewAsset") # Log an error if there's an issue, or more than once match. elif snipe == 'MultiMatch': logging.warning("WARN: You need to resolve multiple assets with the same serial number in your inventory. If you can't find them in your inventory, you might need to purge your deleted records. You can find that in the Snipe Admin settings. Skipping serial number {} for now.".format(jamf['general']['serial_number'])) @@ -903,7 +916,7 @@ for jamf_type in jamf_types: jamf_value = jamf_value[item] payload = {snipekey: jamf_value} latestvalue = jamf_value - except (KeyError, TypeError): + except (KeyError, TypeError, IndexError): logging.debug("Skipping the payload, because the JAMF key we're mapping to doesn't exist") continue @@ -933,11 +946,11 @@ for jamf_type in jamf_types: if ((user_args.users or user_args.users_inverse) and (snipe['rows'][0]['assigned_to'] == None) == user_args.users) or user_args.users_force: if snipe['rows'][0]['status_label']['status_meta'] in ('deployable', 'deployed'): - jamfsplit = config['user-mapping']['jamf_api_field'].split() - if jamfsplit[1] not in jamf[jamfsplit[0]]: - logging.info("Couldn't find {} for this device in {}, not checking it out.".format(jamfsplit[1], jamfsplit[0])) + user = get_config_value(config['user-mapping']['jamf_api_field'], jamf) + if not user: + logging.info("User not found in JAMF information for this device. Skipping") continue - checkout_snipe_asset(jamf['{}'.format(jamfsplit[0])]['{}'.format(jamfsplit[1])], snipe_id, snipe['rows'][0]['assigned_to']) + checkout_snipe_asset(user, snipe_id, snipe['rows'][0]['assigned_to']) else: logging.info("Can't checkout {} since the status isn't set to deployable".format(jamf['general']['name']))