diff --git a/README.md b/README.md index 159e48b..6f0e850 100644 --- a/README.md +++ b/README.md @@ -7,30 +7,30 @@ options: -h, --help show this help message and exit -v, --verbose Sets the logging level to INFO and gives you a better idea of what the script is doing. - --auto_incrementing You can use this if you have auto-incrementing - enabled in your snipe instance to utilize that + --auto_incrementing You can use this if you have auto-incrementing + enabled in your snipe instance to utilize that instead of adding the Jamf ID for the asset tag. - --dryrun This checks your config and tries to contact both - the JAMFPro and Snipe-it instances, but exits before + --dryrun This checks your config and tries to contact both + the JAMFPro and Snipe-it instances, but exits before updating or syncing any assets. -d, --debug Sets logging to include additional DEBUG messages. - --do_not_update_jamf Does not update Jamf with the asset tags stored in + --do_not_update_jamf Does not update Jamf with the asset tags stored in Snipe. --do_not_verify_ssl Skips SSL verification for all requests. Helpful when you use self-signed certificate. - -r, --ratelimited Puts a half second delay between API calls to adhere + -r, --ratelimited Puts a half second delay between API calls to adhere to the standard 120/minute rate limit - -f, --force Updates the Snipe asset with information from Jamf + -f, --force Updates the Snipe asset with information from Jamf every time, despite what the timestamps indicate. --version Prints the version and exits. - -u, --users Checks out the item to the current user in Jamf if + -u, --users Checks out the item to the current user in Jamf if it's not already deployed - -ui, --users_inverse Checks out the item to the current user in Jamf if + -ui, --users_inverse Checks out the item to the current user in Jamf if it's already deployed - -uf, --users_force Checks out the item to the user specified in Jamf no + -uf, --users_force Checks out the item to the user specified in Jamf no matter what -uns, --users_no_search - Doesn't search for any users if the specified fields + Doesn't search for any users if the specified fields in Jamf and Snipe don't match. (case insensitive) -m, --mobiles Runs against the Jamf mobiles endpoint only. -c, --computers Runs against the Jamf computers endpoint only. @@ -135,3 +135,9 @@ Because `jamf2snipe` only ever writes the asset_tag for a matching serial number ## Contributing Thanks to all of the people that have already contributed to this project! If you have something you'd like to add please help by forking this project then creating a pull request to the `devel` branch. When working on new features, please try to keep existing configs running in the same manner with no changes. When possible, open up an issue and reference it when you make your pull request. + +Files are formatted with `black` formatter, please run: + +```bash +pip install black && black jamf2snipe +``` \ No newline at end of file diff --git a/jamf2snipe b/jamf2snipe index 402f8fc..23306ec 100755 --- a/jamf2snipe +++ b/jamf2snipe @@ -33,17 +33,17 @@ version = "1.0.7" validsubset = [ - "general", - "location", - "purchasing", - "peripherals", - "hardware", - "certificates", - "software", - "extension_attributes", - "groups_accounts", - "iphones", - "configuration_profiles" + "general", + "location", + "purchasing", + "peripherals", + "hardware", + "certificates", + "software", + "extension_attributes", + "groups_accounts", + "iphones", + "configuration_profiles", ] @@ -60,24 +60,96 @@ from urllib3 import Retry # Set us up for using runtime arguments by defining them. runtimeargs = argparse.ArgumentParser() -runtimeargs.add_argument("-v", "--verbose", help="Sets the logging level to INFO and gives you a better idea of what the script is doing.", action="store_true") -runtimeargs.add_argument("--auto_incrementing", help="You can use this if you have auto-incrementing enabled in your snipe instance to utilize that instead of adding the Jamf ID for the asset tag.", action="store_true") -runtimeargs.add_argument("--dryrun", help="This checks your config and tries to contact both the JAMFPro and Snipe-it instances, and will generate the assets for debugging, but not update or sync anything but exits before updating or syncing any assets.", action="store_true") -runtimeargs.add_argument("--connection_test", help="This checks your config and tries to contact both the JAMFPro and Snipe-it instances.", action="store_true") -runtimeargs.add_argument("-d", "--debug", help="Sets logging to include additional DEBUG messages.", action="store_true") -runtimeargs.add_argument("--do_not_update_jamf", help="Does not update Jamf with the asset tags stored in Snipe.", action="store_false") -runtimeargs.add_argument('--do_not_verify_ssl', help="Skips SSL verification for all requests. Helpful when you use self-signed certificate.", action="store_false") -runtimeargs.add_argument("-r", "--ratelimited", help="Puts a half second delay between API calls to adhere to the standard 120/minute rate limit", action="store_true") -runtimeargs.add_argument("-f", "--force", help="Updates the Snipe asset with information from Jamf every time, despite what the timestamps indicate.", action="store_true") -runtimeargs.add_argument("--version", help="Prints the version and exits.", action="store_true") +runtimeargs.add_argument( + "-v", + "--verbose", + help="Sets the logging level to INFO and gives you a better idea of what the script is doing.", + action="store_true", +) +runtimeargs.add_argument( + "--auto_incrementing", + help="You can use this if you have auto-incrementing enabled in your snipe instance to utilize that instead of adding the Jamf ID for the asset tag.", + action="store_true", +) +runtimeargs.add_argument( + "--dryrun", + help="This checks your config and tries to contact both the JAMFPro and Snipe-it instances, and will generate the assets for debugging, but not update or sync anything but exits before updating or syncing any assets.", + action="store_true", +) +runtimeargs.add_argument( + "--connection_test", + help="This checks your config and tries to contact both the JAMFPro and Snipe-it instances.", + action="store_true", +) +runtimeargs.add_argument( + "-d", + "--debug", + help="Sets logging to include additional DEBUG messages.", + action="store_true", +) +runtimeargs.add_argument( + "--do_not_update_jamf", + help="Does not update Jamf with the asset tags stored in Snipe.", + action="store_false", +) +runtimeargs.add_argument( + "--do_not_verify_ssl", + help="Skips SSL verification for all requests. Helpful when you use self-signed certificate.", + action="store_false", +) +runtimeargs.add_argument( + "-r", + "--ratelimited", + help="Puts a half second delay between API calls to adhere to the standard 120/minute rate limit", + action="store_true", +) +runtimeargs.add_argument( + "-f", + "--force", + help="Updates the Snipe asset with information from Jamf every time, despite what the timestamps indicate.", + action="store_true", +) +runtimeargs.add_argument( + "--version", help="Prints the version and exits.", action="store_true" +) user_opts = runtimeargs.add_mutually_exclusive_group() -user_opts.add_argument("-u", "--users", help="Checks out the item to the current user in Jamf if it's not already deployed", action="store_true") -user_opts.add_argument("-ui", "--users_inverse", help="Checks out the item to the current user in Jamf if it's already deployed", action="store_true") -user_opts.add_argument("-uf", "--users_force", help="Checks out the item to the user specified in Jamf no matter what", action="store_true") -runtimeargs.add_argument("-uns", "--users_no_search", help="Doesn't search for any users if the specified fields in Jamf and Snipe don't match. (case insensitive)", action="store_true") +user_opts.add_argument( + "-u", + "--users", + help="Checks out the item to the current user in Jamf if it's not already deployed", + action="store_true", +) +user_opts.add_argument( + "-ui", + "--users_inverse", + help="Checks out the item to the current user in Jamf if it's already deployed", + action="store_true", +) +user_opts.add_argument( + "-uf", + "--users_force", + help="Checks out the item to the user specified in Jamf no matter what", + action="store_true", +) +runtimeargs.add_argument( + "-uns", + "--users_no_search", + help="Doesn't search for any users if the specified fields in Jamf and Snipe don't match. (case insensitive)", + action="store_true", +) type_opts = runtimeargs.add_mutually_exclusive_group() -type_opts.add_argument("-m", "--mobiles", help="Runs against the Jamf mobiles endpoint only.", action="store_true") -type_opts.add_argument("-c", "--computers", help="Runs against the Jamf computers endpoint only.", action="store_true") +type_opts.add_argument( + "-m", + "--mobiles", + help="Runs against the Jamf mobiles endpoint only.", + action="store_true", +) +type_opts.add_argument( + "-c", + "--computers", + help="Runs against the Jamf computers endpoint only.", + action="store_true", +) user_args = runtimeargs.parse_args() if user_args.version: @@ -94,17 +166,25 @@ else: # Notify users if we're doing a connection test. if user_args.connection_test and user_args.dryrun: - logging.error("You can't use --connection_test and --dryrun at the same time. Please choose one or the other.") + logging.error( + "You can't use --connection_test and --dryrun at the same time. Please choose one or the other." + ) raise SystemExit("Error: Invalid runtime arguments - Exiting.") if user_args.connection_test and user_args.force: - logging.error("You can't use --connection_test and --force at the same time. Please choose one or the other.") + logging.error( + "You can't use --connection_test and --force at the same time. Please choose one or the other." + ) raise SystemExit("Error: Invalid runtime arguments - Exiting.") if user_args.connection_test: - print("Connection test: Starting jamf2snipe with a connection test where we'll try to contact both the JAMFPro and Snipe-it instances.") + print( + "Connection test: Starting jamf2snipe with a connection test where we'll try to contact both the JAMFPro and Snipe-it instances." + ) # Notify users if we're doing a dry run. if user_args.dryrun and user_args.force: - print("Running a dry run with force enabled. This will generate assets for debugging, but not update or sync anything.") + print( + "Running a dry run with force enabled. This will generate assets for debugging, but not update or sync anything." + ) elif user_args.dryrun: print("Dryrun: Starting jamf2snipe with a dry run where no assets will be updated.") @@ -113,72 +193,102 @@ logging.info("Searching for a valid settings.conf file.") config = configparser.ConfigParser() logging.debug("Checking for a settings.conf in /opt/jamf2snipe ...") config.read("/opt/jamf2snipe/settings.conf") -if 'snipe-it' not in set(config): - logging.debug("No valid config found in: /opt Checking for a settings.conf in /etc/jamf2snipe ...") - config.read('/etc/jamf2snipe/settings.conf') -if 'snipe-it' not in set(config): - logging.debug("No valid config found in /etc Checking for a settings.conf in current directory ...") +if "snipe-it" not in set(config): + logging.debug( + "No valid config found in: /opt Checking for a settings.conf in /etc/jamf2snipe ..." + ) + config.read("/etc/jamf2snipe/settings.conf") +if "snipe-it" not in set(config): + logging.debug( + "No valid config found in /etc Checking for a settings.conf in current directory ..." + ) config.read("settings.conf") -if 'snipe-it' not in set(config): +if "snipe-it" not in set(config): logging.debug("No valid config found in current folder.") - logging.error("No valid settings.conf was found. We'll need to quit while you figure out where the settings are at. You can check the README for valid locations.") + logging.error( + "No valid settings.conf was found. We'll need to quit while you figure out where the settings are at. You can check the README for valid locations." + ) raise SystemExit("Error: No valid settings.conf - Exiting.") -logging.info("Great, we found a settings file. Let's get started by parsing all of the settings.") +logging.info( + "Great, we found a settings file. Let's get started by parsing all of the settings." +) # While setting the variables, use a try loop so we can raise a error if something goes wrong. try: # Set some Variables from the settings.conf: # This is the address, cname, or FQDN for your JamfPro instance. logging.info("Setting the Jamf Pro Base url.") - jamfpro_base = config['jamf']['url'] + jamfpro_base = config["jamf"]["url"] logging.debug("The configured Jamf Pro base url is: {}".format(jamfpro_base)) logging.info("Setting the client_id to request an api key.") - jamf_client_id = config['jamf']['client_id'] + jamf_client_id = config["jamf"]["client_id"] logging.debug("The client_id you provided for Jamf is: {}".format(jamf_client_id)) logging.info("Setting the client_secret to request an api key.") - jamf_client_secret = config['jamf']['client_secret'] - logging.debug("The client_secret you provided for Jamf is: {}".format(jamf_client_secret)) + jamf_client_secret = config["jamf"]["client_secret"] + logging.debug( + "The client_secret you provided for Jamf is: {}".format(jamf_client_secret) + ) # This is the address, cname, or FQDN for your snipe-it instance. logging.info("Setting the base URL for SnipeIT.") - snipe_base = config['snipe-it']['url'] + snipe_base = config["snipe-it"]["url"] logging.debug("The configured Snipe-IT base url is: {}".format(snipe_base)) logging.info("Setting the API key for SnipeIT.") - snipe_apiKey = config['snipe-it']['apikey'] + snipe_apiKey = config["snipe-it"]["apikey"] logging.debug("The API key you provided for Snipe is: {}".format(snipe_apiKey)) logging.info("Setting the default status for SnipeIT assets.") - defaultStatus = config['snipe-it']['defaultStatus'] - logging.debug("The default status we'll be setting updated assets to is: {} (I sure hope this is a number or something is probably wrong)".format(defaultStatus)) + defaultStatus = config["snipe-it"]["defaultStatus"] + logging.debug( + "The default status we'll be setting updated assets to is: {} (I sure hope this is a number or something is probably wrong)".format( + defaultStatus + ) + ) logging.info("Setting the Snipe ID for Apple Manufacturer devices.") - apple_manufacturer_id = config['snipe-it']['manufacturer_id'] - logging.debug("The configured manufacturer ID for Apple computers in snipe is: {} (Pretty sure this needs to be a number too)".format(apple_manufacturer_id)) + apple_manufacturer_id = config["snipe-it"]["manufacturer_id"] + logging.debug( + "The configured manufacturer ID for Apple computers in snipe is: {} (Pretty sure this needs to be a number too)".format( + apple_manufacturer_id + ) + ) except: - logging.error("Some of the required settings from the settings.conf were missing or invalid. Re-run jamf2snipe with the --verbose or --debug flag to get more details on which setting is missing or misconfigured.") + logging.error( + "Some of the required settings from the settings.conf were missing or invalid. Re-run jamf2snipe with the --verbose or --debug flag to get more details on which setting is missing or misconfigured." + ) raise SystemExit("Error: Missing or invalid settings in settings.conf - Exiting.") # Check the config file for correct headers # Do some tests to see if the user has updated their settings.conf file SETTINGS_CORRECT = True -if 'api-mapping' in config: - logging.error("Looks like you're using the old method for api-mapping. Please use computers-api-mapping and mobile_devices-api-mapping.") +if "api-mapping" in config: + logging.error( + "Looks like you're using the old method for api-mapping. Please use computers-api-mapping and mobile_devices-api-mapping." + ) SETTINGS_CORRECT = False -if not 'user-mapping' in config and (user_args.users or user_args.users_force or user_args.users_inverse): - logging.error("""You've chosen to check out assets to users in some capacity using a cmdline switch, but not specified how you want to - search Snipe IT for the users from Jamf. Make sure you have a 'user-mapping' section in your settings.conf file.""") +if not "user-mapping" in config and ( + user_args.users or user_args.users_force or user_args.users_inverse +): + logging.error( + """You've chosen to check out assets to users in some capacity using a cmdline switch, but not specified how you want to + search Snipe IT for the users from Jamf. Make sure you have a 'user-mapping' section in your settings.conf file.""" + ) SETTINGS_CORRECT = False if snipe_base.endswith("/"): - logging.error("""You have a trailing forward slash in the snipe url. Please remove it.""") + logging.error( + """You have a trailing forward slash in the snipe url. Please remove it.""" + ) SETTINGS_CORRECT = False if jamfpro_base.endswith("/"): - logging.error("""You have a trailing forward slash in the JamfPro url. Please remove it.""") + logging.error( + """You have a trailing forward slash in the JamfPro url. Please remove it.""" + ) SETTINGS_CORRECT = False @@ -186,14 +296,20 @@ if not SETTINGS_CORRECT: raise SystemExit # Check the config file for valid jamf subsets. This is based off the JAMF API and if it's not right we can't map fields over to SNIPE properly. -logging.debug("Checking the settings.conf file for valid JAMF subsets of the JAMF API so mapping can occur properly.") -for key in config['computers-api-mapping']: - jamfsplit = config['computers-api-mapping'][key].split() +logging.debug( + "Checking the settings.conf file for valid JAMF subsets of the JAMF API so mapping can occur properly." +) +for key in config["computers-api-mapping"]: + jamfsplit = config["computers-api-mapping"][key].split() if jamfsplit[0] in validsubset: - logging.info('Found subset {}: Acceptable'.format(jamfsplit[0])) + logging.info("Found subset {}: Acceptable".format(jamfsplit[0])) continue else: - logging.error("Found invalid subset: {} in the settings.conf file.\nThis is not in the acceptable list of subsets. Check your settings.conf\n Valid subsets are: {}".format(jamfsplit[0], ', '.join(validsubset))) + logging.error( + "Found invalid subset: {} in the settings.conf file.\nThis is not in the acceptable list of subsets. Check your settings.conf\n Valid subsets are: {}".format( + jamfsplit[0], ", ".join(validsubset) + ) + ) raise SystemExit("Invalid Subset found in settings.conf") ### Setup Some Functions ### @@ -202,16 +318,25 @@ first_api_call = None # Headers for the API call. logging.info("Creating the headers we'll need for API calls") -jamfbasicheaders = {'Accept': 'application/json','Content-Type':'application/json'} -snipeheaders = {'Authorization': 'Bearer {}'.format(snipe_apiKey),'Accept': 'application/json','Content-Type':'application/json'} -logging.debug('Request headers for JamfPro will be: {}\nRequest headers for Snipe will be: {}'.format(jamfbasicheaders, snipeheaders)) +jamfbasicheaders = {"Accept": "application/json", "Content-Type": "application/json"} +snipeheaders = { + "Authorization": "Bearer {}".format(snipe_apiKey), + "Accept": "application/json", + "Content-Type": "application/json", +} +logging.debug( + "Request headers for JamfPro will be: {}\nRequest headers for Snipe will be: {}".format( + jamfbasicheaders, snipeheaders + ) +) session = Session() retries = Retry( total=3, - allowed_methods={'GET'}, + allowed_methods={"GET"}, ) -session.mount('https://', HTTPAdapter(max_retries=retries)) +session.mount("https://", HTTPAdapter(max_retries=retries)) + # Use the client credentials to request a Jamf Token. def request_jamf_token(): @@ -223,16 +348,22 @@ def request_jamf_token(): global expires_time token_request_time = time.time() logging.info("Requesting a new token at {}.".format(token_request_time)) - api_url = '{0}/api/v1/oauth/token'.format(jamfpro_base) + api_url = "{0}/api/v1/oauth/token".format(jamfpro_base) # No hook for this api call. - logging.debug('Calling for a token against: {}\n The username and password can be found earlier in the script.'.format(api_url)) + logging.debug( + "Calling for a token against: {}\n The username and password can be found earlier in the script.".format( + api_url + ) + ) # No hook for this API call. data = { - 'client_id': jamf_client_id, - 'client_secret': jamf_client_secret, - "grant_type": "client_credentials" + "client_id": jamf_client_id, + "client_secret": jamf_client_secret, + "grant_type": "client_credentials", } - logging.debug('The data being sent to JamfPro for token request is: {}'.format(data)) + logging.debug( + "The data being sent to JamfPro for token request is: {}".format(data) + ) response = session.post(api_url, data=data, verify=user_args.do_not_verify_ssl) if response.status_code == 200: logging.debug("Got back a valid 200 response code.") @@ -240,20 +371,42 @@ def request_jamf_token(): logging.debug(jsonresponse) # So we have our token and Expires time. Set the expires time globably so we can reset later. try: - expires_in = int(jsonresponse['expires_in']) - expires_time = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(seconds=expires_in) + expires_in = int(jsonresponse["expires_in"]) + expires_time = datetime.datetime.now( + datetime.timezone.utc + ) + datetime.timedelta(seconds=expires_in) except: - logging.error("Jamf sent a malformed timestamp: {}\n Please feel free to complain to Jamf support.".format(jsonresponse['expires_in'])) + logging.error( + "Jamf sent a malformed timestamp: {}\n Please feel free to complain to Jamf support.".format( + jsonresponse["expires_in"] + ) + ) raise SystemExit("Unable to grok Jamf Timestamp - Exiting") logging.debug("Token expires in: {}".format(expires_in)) # The headers are also global, because they get used elsewhere. logging.info("Setting new jamf headers with bearer token") - jamfheaders = {'Authorization': 'Bearer {}'.format(jsonresponse['access_token']),'Accept': 'application/json','Content-Type':'application/json'} - jamfxmlheaders = {'Authorization': 'Bearer {}'.format(jsonresponse['access_token']),'Accept': 'application/xml','Content-Type':'application/xml'} - logging.debug('Request headers for JamfPro will be: {}\nRequest headers for Snipe will be: {}'.format(jamfheaders, snipeheaders)) + jamfheaders = { + "Authorization": "Bearer {}".format(jsonresponse["access_token"]), + "Accept": "application/json", + "Content-Type": "application/json", + } + jamfxmlheaders = { + "Authorization": "Bearer {}".format(jsonresponse["access_token"]), + "Accept": "application/xml", + "Content-Type": "application/xml", + } + logging.debug( + "Request headers for JamfPro will be: {}\nRequest headers for Snipe will be: {}".format( + jamfheaders, snipeheaders + ) + ) else: - logging.error("Could not obtain a token for use with Jamf's classic API. Please check your client_id and client_secret.") - logging.debug('Response code: {} - {}'.format(response.status_code, response.content)) + logging.error( + "Could not obtain a token for use with Jamf's classic API. Please check your client_id and client_secret." + ) + logging.debug( + "Response code: {} - {}".format(response.status_code, response.content) + ) raise SystemExit("Unable to obtain Jamf Token") @@ -272,7 +425,9 @@ def request_handler(r, *args, **kwargs): # Slow and steady wins the race. Limit all API calls (not just to snipe) to the Rate limit. if user_args.ratelimited: if '"messages":429' in r.text: - logging.warn("Despite respecting the rate limit of Snipe, we've still been limited. Trying again after sleeping for 2 seconds.") + logging.warn( + "Despite respecting the rate limit of Snipe, we've still been limited. Trying again after sleeping for 2 seconds." + ) time.sleep(2) re_req = r.request return session.send(re_req) @@ -280,407 +435,712 @@ def request_handler(r, *args, **kwargs): first_api_call = time.time() time.sleep(0.5) api_count += 1 - time_elapsed = (time.time() - first_api_call) + time_elapsed = time.time() - first_api_call api_rate = api_count / time_elapsed if api_rate > 1.95: sleep_time = 0.5 + (api_rate - 1.95) - logging.debug('Going over snipe rate limit of 120/minute ({}/minute), sleeping for {}'.format(api_rate,sleep_time)) + logging.debug( + "Going over snipe rate limit of 120/minute ({}/minute), sleeping for {}".format( + api_rate, sleep_time + ) + ) time.sleep(sleep_time) - logging.debug("Made {} requests to Snipe IT in {} seconds, with a request being sent every {} seconds".format(api_count, time_elapsed, api_rate)) + logging.debug( + "Made {} requests to Snipe IT in {} seconds, with a request being sent every {} seconds".format( + api_count, time_elapsed, api_rate + ) + ) if '"messages":429' in r.text: logging.error(r.content) - raise SystemExit("We've been rate limited. Use option -r to respect the built in Snipe IT API rate limit of 120/minute.") + raise SystemExit( + "We've been rate limited. Use option -r to respect the built in Snipe IT API rate limit of 120/minute." + ) return r + # Function to make the API call for all JAMF devices def get_jamf_computers(): - api_url = '{0}/JSSResource/computers'.format(jamfpro_base) - logging.debug('Calling for JAMF computers against: {}\n The username, passwords, and headers for this GET requestcan be found near the beginning of the output.'.format(api_url)) - response = session.get(api_url, headers=jamfheaders, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) + api_url = "{0}/JSSResource/computers".format(jamfpro_base) + logging.debug( + "Calling for JAMF computers against: {}\n The username, passwords, and headers for this GET requestcan be found near the beginning of the output.".format( + api_url + ) + ) + response = session.get( + api_url, + headers=jamfheaders, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) if response.status_code == 200: logging.debug("Got back a valid 200 response code.") return response.json() - elif b'policies.ratelimit.QuotaViolation' in response.content: - logging.info('JAMFPro responded with error code: {} - Policy Ratelimit Quota Violation - when we tried to get a list of computers Waiting a bit to retry the lookup.'.format(response)) - logging.warning('JAMFPro Ratelimit exceeded: pausing ') + elif b"policies.ratelimit.QuotaViolation" in response.content: + logging.info( + "JAMFPro responded with error code: {} - Policy Ratelimit Quota Violation - when we tried to get a list of computers Waiting a bit to retry the lookup.".format( + response + ) + ) + logging.warning("JAMFPro Ratelimit exceeded: pausing ") time.sleep(75) logging.info("Finished waiting. Retrying lookup...") newresponse = get_jamf_computers() return newresponse else: - logging.warning('Received an invalid status code when trying to retreive JAMF Device list:{} - {}'.format(response.status_code, response.content)) + logging.warning( + "Received an invalid status code when trying to retreive JAMF Device list:{} - {}".format( + response.status_code, response.content + ) + ) logging.debug("Returning a null value for the function.") return None + # Function to make the API call for JAMF devices in group def get_jamf_computers_by_group(jamf_id): - api_url = '{0}/JSSResource/computergroups/id/{1}'.format(jamfpro_base, jamf_id) - logging.debug('Calling for JAMF computers against: {}\n The username, passwords, and headers for this GET requestcan be found near the beginning of the output.'.format(api_url)) - response = session.get(api_url, headers=jamfheaders, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) + api_url = "{0}/JSSResource/computergroups/id/{1}".format(jamfpro_base, jamf_id) + logging.debug( + "Calling for JAMF computers against: {}\n The username, passwords, and headers for this GET requestcan be found near the beginning of the output.".format( + api_url + ) + ) + response = session.get( + api_url, + headers=jamfheaders, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) if response.status_code == 200: logging.debug("Got back a valid 200 response code.") jsonresponse = response.json() - logging.debug("Returning: {}".format(jsonresponse['computer_group'])) - return jsonresponse['computer_group'] - elif b'policies.ratelimit.QuotaViolation' in response.content: - logging.info('JAMFPro responded with error code: {} - Policy Ratelimit Quota Violation - when we tried to get a list of computers Waiting a bit to retry the lookup.'.format(response)) - logging.warning('JAMFPro Ratelimit exceeded: pausing ') + logging.debug("Returning: {}".format(jsonresponse["computer_group"])) + return jsonresponse["computer_group"] + elif b"policies.ratelimit.QuotaViolation" in response.content: + logging.info( + "JAMFPro responded with error code: {} - Policy Ratelimit Quota Violation - when we tried to get a list of computers Waiting a bit to retry the lookup.".format( + response + ) + ) + logging.warning("JAMFPro Ratelimit exceeded: pausing ") time.sleep(75) logging.info("Finished waiting. Retrying lookup...") newresponse = get_jamf_computers_by_group(jamf_id) return newresponse else: - logging.warning('Received an invalid status code when trying to retreive JAMF Device list:{} - {}'.format(response.status_code, response.content)) + logging.warning( + "Received an invalid status code when trying to retreive JAMF Device list:{} - {}".format( + response.status_code, response.content + ) + ) logging.debug("Returning a null value for the function.") return None + # Function to make the API call for all JAMF mobile devices def get_jamf_mobiles(): - api_url = '{0}/JSSResource/mobiledevices'.format(jamfpro_base) - logging.debug('Calling for JAMF mobiles against: {}\n The username, passwords, and headers for this GET requestcan be found near the beginning of the output.'.format(api_url)) - response = session.get(api_url, headers=jamfheaders, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) + api_url = "{0}/JSSResource/mobiledevices".format(jamfpro_base) + logging.debug( + "Calling for JAMF mobiles against: {}\n The username, passwords, and headers for this GET requestcan be found near the beginning of the output.".format( + api_url + ) + ) + response = session.get( + api_url, + headers=jamfheaders, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) if response.status_code == 200: logging.debug("Got back a valid 200 response code.") return response.json() - elif b'policies.ratelimit.QuotaViolation' in response.content: - logging.info('JAMFPro responded with error code: {} - Policy Ratelimit Quota Violation - when we tried to get a list of mobiles Waiting a bit to retry the lookup.'.format(response)) - logging.warning('JAMFPro Ratelimit exceeded: pausing ') + elif b"policies.ratelimit.QuotaViolation" in response.content: + logging.info( + "JAMFPro responded with error code: {} - Policy Ratelimit Quota Violation - when we tried to get a list of mobiles Waiting a bit to retry the lookup.".format( + response + ) + ) + logging.warning("JAMFPro Ratelimit exceeded: pausing ") time.sleep(75) logging.info("Finished waiting. Retrying lookup...") newresponse = get_jamf_mobiles() return newresponse else: - logging.warning('Received an invalid status code when trying to retreive JAMF Device list:{} - {}'.format(response.status_code, response.content)) + logging.warning( + "Received an invalid status code when trying to retreive JAMF Device list:{} - {}".format( + response.status_code, response.content + ) + ) logging.debug("Returning a null value for the function.") return None + # Function to make the API call for all JAMF mobile devices in group def get_jamf_mobiles_by_group(jamf_id): - api_url = '{0}/JSSResource/mobiledevicegroups/id/{1}'.format(jamfpro_base, jamf_id) - logging.debug('Calling for JAMF mobiles against: {}\n The username, passwords, and headers for this GET requestcan be found near the beginning of the output.'.format(api_url)) - response = session.get(api_url, headers=jamfheaders, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) + api_url = "{0}/JSSResource/mobiledevicegroups/id/{1}".format(jamfpro_base, jamf_id) + logging.debug( + "Calling for JAMF mobiles against: {}\n The username, passwords, and headers for this GET requestcan be found near the beginning of the output.".format( + api_url + ) + ) + response = session.get( + api_url, + headers=jamfheaders, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) if response.status_code == 200: logging.debug("Got back a valid 200 response code.") jsonresponse = response.json() - logging.debug("Returning: {}".format(jsonresponse['mobile_device_group'])) - return jsonresponse['mobile_device_group'] - elif b'policies.ratelimit.QuotaViolation' in response.content: - logging.info('JAMFPro responded with error code: {} - Policy Ratelimit Quota Violation - when we tried to get a list of mobiles Waiting a bit to retry the lookup.'.format(response)) - logging.warning('JAMFPro Ratelimit exceeded: pausing ') + logging.debug("Returning: {}".format(jsonresponse["mobile_device_group"])) + return jsonresponse["mobile_device_group"] + elif b"policies.ratelimit.QuotaViolation" in response.content: + logging.info( + "JAMFPro responded with error code: {} - Policy Ratelimit Quota Violation - when we tried to get a list of mobiles Waiting a bit to retry the lookup.".format( + response + ) + ) + logging.warning("JAMFPro Ratelimit exceeded: pausing ") time.sleep(75) logging.info("Finished waiting. Retrying lookup...") newresponse = get_jamf_mobiles_by_group(jamf_id) return newresponse else: - logging.warning('Received an invalid status code when trying to retreive JAMF Device list:{} - {}'.format(response.status_code, response.content)) + logging.warning( + "Received an invalid status code when trying to retreive JAMF Device list:{} - {}".format( + response.status_code, response.content + ) + ) logging.debug("Returning a null value for the function.") return None + # Function to lookup a JAMF asset by id. def search_jamf_asset(jamf_id): api_url = "{}/JSSResource/computers/id/{}".format(jamfpro_base, jamf_id) - response = session.get(api_url, headers=jamfheaders, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) + response = session.get( + api_url, + headers=jamfheaders, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) if response.status_code == 200: logging.debug("Got back a valid 200 response code.") jsonresponse = response.json() - logging.debug("Returning: {}".format(jsonresponse['computer'])) - return jsonresponse['computer'] - elif b'policies.ratelimit.QuotaViolation' in response.content: - logging.info('JAMFPro responded with error code: {} - Policy Ratelimit Quota Violation - when we tried to look up id: {} Waiting a bit to retry the lookup.'.format(response, jamf_id)) - logging.warning('JAMFPro Ratelimit exceeded: pausing ') + logging.debug("Returning: {}".format(jsonresponse["computer"])) + return jsonresponse["computer"] + elif b"policies.ratelimit.QuotaViolation" in response.content: + logging.info( + "JAMFPro responded with error code: {} - Policy Ratelimit Quota Violation - when we tried to look up id: {} Waiting a bit to retry the lookup.".format( + response, jamf_id + ) + ) + logging.warning("JAMFPro Ratelimit exceeded: pausing ") time.sleep(75) logging.info("Finished waiting. Retrying lookup...") newresponse = search_jamf_asset(jamf_id) return newresponse else: - logging.warning('JAMFPro responded with error code:{} when we tried to look up id: {}'.format(response, jamf_id)) + logging.warning( + "JAMFPro responded with error code:{} when we tried to look up id: {}".format( + response, jamf_id + ) + ) logging.debug("Returning a null value for the function.") return None + # Function to lookup a JAMF mobile asset by id. def search_jamf_mobile(jamf_id): api_url = "{}/JSSResource/mobiledevices/id/{}".format(jamfpro_base, jamf_id) - response = session.get(api_url, headers=jamfheaders, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) + response = session.get( + api_url, + headers=jamfheaders, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) if response.status_code == 200: logging.debug("Got back a valid 200 response code.") jsonresponse = response.json() - logging.debug("Returning: {}".format(jsonresponse['mobile_device'])) - return jsonresponse['mobile_device'] - elif b'policies.ratelimit.QuotaViolation' in response.content: - logging.info('JAMFPro responded with error code: {} - Policy Ratelimit Quota Violation - when we tried to look up id: {} Waiting a bit to retry the lookup.'.format(response, jamf_id)) - logging.warning('JAMFPro Ratelimit exceeded: pausing ') + logging.debug("Returning: {}".format(jsonresponse["mobile_device"])) + return jsonresponse["mobile_device"] + elif b"policies.ratelimit.QuotaViolation" in response.content: + logging.info( + "JAMFPro responded with error code: {} - Policy Ratelimit Quota Violation - when we tried to look up id: {} Waiting a bit to retry the lookup.".format( + response, jamf_id + ) + ) + logging.warning("JAMFPro Ratelimit exceeded: pausing ") time.sleep(75) logging.info("Finished waiting. Retyring lookup...") newresponse = search_jamf_asset(jamf_id) return newresponse else: - logging.warning('JAMFPro responded with error code:{} when we tried to look up id: {}'.format(response, jamf_id)) + logging.warning( + "JAMFPro responded with error code:{} when we tried to look up id: {}".format( + response, jamf_id + ) + ) logging.debug("Returning a null value for the function.") return None + # Function to update the asset tag of computers in JAMF with an number passed from Snipe. def update_jamf_asset_tag(jamf_id, asset_tag): if user_args.dryrun: - logging.debug("Would have updated JAMF asset id: {} with asset tag: {}".format(jamf_id, asset_tag)) + logging.debug( + "Would have updated JAMF asset id: {} with asset tag: {}".format( + jamf_id, asset_tag + ) + ) return True api_url = "{}/JSSResource/computers/id/{}".format(jamfpro_base, jamf_id) - payload = """{}{}""".format(jamf_id, asset_tag) - logging.debug('Making Get request against: {}\nPayload for the PUT request is: {}\nThe username, password, and headers can be found near the beginning of the output.'.format(api_url, payload)) - response = session.put(api_url, data=payload, headers=jamfxmlheaders, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) + payload = """{}{}""".format( + jamf_id, asset_tag + ) + logging.debug( + "Making Get request against: {}\nPayload for the PUT request is: {}\nThe username, password, and headers can be found near the beginning of the output.".format( + api_url, payload + ) + ) + response = session.put( + api_url, + data=payload, + headers=jamfxmlheaders, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) if response.status_code == 201: logging.debug("Got a 201 response. Returning: True") return True - elif b'policies.ratelimit.QuotaViolation' in response.content: - logging.info('JAMFPro responded with error code: {} - Policy Ratelimit Quota Violation - when we tried to look up id: {} Waiting a bit to retry the lookup.'.format(response, jamf_id)) - logging.warning('JAMFPro Ratelimit exceeded: pausing ') + elif b"policies.ratelimit.QuotaViolation" in response.content: + logging.info( + "JAMFPro responded with error code: {} - Policy Ratelimit Quota Violation - when we tried to look up id: {} Waiting a bit to retry the lookup.".format( + response, jamf_id + ) + ) + logging.warning("JAMFPro Ratelimit exceeded: pausing ") time.sleep(75) logging.info("Finished waiting. Retrying update...") newresponse = update_jamf_asset_tag(jamf_id, asset_tag) return newresponse if response.status_code == 200: - logging.debug("Got a 200 response code. Returning the response: {}".format(response)) + logging.debug( + "Got a 200 response code. Returning the response: {}".format(response) + ) return response.json() else: - logging.warning('Got back an error response code:{} - {}'.format(response.status_code, response.content)) + logging.warning( + "Got back an error response code:{} - {}".format( + response.status_code, response.content + ) + ) return None + # Function to update the asset tag of mobile devices in JAMF with an number passed from Snipe. def update_jamf_mobiledevice_asset_tag(jamf_id, asset_tag): if user_args.dryrun: - logging.debug("Would have updated JAMF asset id: {} with asset tag: {}".format(jamf_id, asset_tag)) + logging.debug( + "Would have updated JAMF asset id: {} with asset tag: {}".format( + jamf_id, asset_tag + ) + ) return True api_url = "{}/JSSResource/mobiledevices/id/{}".format(jamfpro_base, jamf_id) - payload = """{}{}""".format(jamf_id, asset_tag) - logging.debug('Making Get request against: {}\nPayload for the PUT request is: {}\nThe username, password, and headers can be found near the beginning of the output.'.format(api_url, payload)) - response = session.put(api_url, data=payload, headers=jamfxmlheaders, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) + payload = """{}{}""".format( + jamf_id, asset_tag + ) + logging.debug( + "Making Get request against: {}\nPayload for the PUT request is: {}\nThe username, password, and headers can be found near the beginning of the output.".format( + api_url, payload + ) + ) + response = session.put( + api_url, + data=payload, + headers=jamfxmlheaders, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) if response.status_code == 201: logging.debug("Got a 201 response. Returning: True") return True - elif b'policies.ratelimit.QuotaViolation' in response.content: - logging.info('JAMFPro responded with error code: {} - Policy Ratelimit Quota Violation - when we tried to look up id: {} Waiting a bit to retry the lookup.'.format(response, jamf_id)) - logging.warning('JAMFPro Ratelimit exceeded: pausing ') + elif b"policies.ratelimit.QuotaViolation" in response.content: + logging.info( + "JAMFPro responded with error code: {} - Policy Ratelimit Quota Violation - when we tried to look up id: {} Waiting a bit to retry the lookup.".format( + response, jamf_id + ) + ) + logging.warning("JAMFPro Ratelimit exceeded: pausing ") time.sleep(75) logging.info("Finished waiting. Retrying update...") newresponse = update_jamf_mobiledevice_asset_tag(jamf_id, asset_tag) return newresponse if response.status_code == 200: - logging.debug("Got a 200 response code. Returning the response: {}".format(response)) + logging.debug( + "Got a 200 response code. Returning the response: {}".format(response) + ) return response.json() else: - logging.warning('Got back an error response code:{} - {}'.format(response.status_code, response.content)) + logging.warning( + "Got back an error response code:{} - {}".format( + response.status_code, response.content + ) + ) return None + # Function to lookup a snipe asset by serial number. def search_snipe_asset(serial): - api_url = '{}/api/v1/hardware/byserial/{}'.format(snipe_base, serial) - response = session.get(api_url, headers=snipeheaders, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) + api_url = "{}/api/v1/hardware/byserial/{}".format(snipe_base, serial) + response = session.get( + api_url, + headers=snipeheaders, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) if response.status_code == 200: jsonresponse = response.json() # Check to make sure there's actually a result if "total" in jsonresponse: - if jsonresponse['total'] == 1: + if jsonresponse["total"] == 1: return jsonresponse - elif jsonresponse['total'] == 0: + elif jsonresponse["total"] == 0: logging.info("No assets match {}".format(serial)) return "NoMatch" else: - logging.warning('FOUND {} matching assets while searching for: {}'.format(jsonresponse['total'], serial)) + logging.warning( + "FOUND {} matching assets while searching for: {}".format( + jsonresponse["total"], serial + ) + ) return "MultiMatch" else: logging.info("No assets match {}".format(serial)) return "NoMatch" else: - logging.warning('Snipe-IT responded with error code:{} when we tried to look up: {}'.format(response.text, serial)) - logging.debug('{} - {}'.format(response.status_code, response.content)) + logging.warning( + "Snipe-IT responded with error code:{} when we tried to look up: {}".format( + response.text, serial + ) + ) + logging.debug("{} - {}".format(response.status_code, response.content)) return "ERROR" + # Function to get all the asset models def get_snipe_models(): - api_url = '{}/api/v1/models'.format(snipe_base) - logging.debug('Calling against: {}'.format(api_url)) - response = session.get(api_url, headers=snipeheaders, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) + api_url = "{}/api/v1/models".format(snipe_base) + logging.debug("Calling against: {}".format(api_url)) + response = session.get( + api_url, + headers=snipeheaders, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) if response.status_code == 200: jsonresponse = response.json() - logging.info("Got a valid response that should have {} models.".format(jsonresponse['total'])) - if jsonresponse['total'] <= len(jsonresponse['rows']) : + logging.info( + "Got a valid response that should have {} models.".format( + jsonresponse["total"] + ) + ) + if jsonresponse["total"] <= len(jsonresponse["rows"]): return jsonresponse else: logging.info("We didn't get enough results so we need to get them again.") - api_url = '{}/api/v1/models?limit={}'.format(snipe_base, jsonresponse['total']) - newresponse = session.get(api_url, headers=snipeheaders, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) + api_url = "{}/api/v1/models?limit={}".format( + snipe_base, jsonresponse["total"] + ) + newresponse = session.get( + api_url, + headers=snipeheaders, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) if response.status_code == 200: newjsonresponse = newresponse.json() - if newjsonresponse['total'] == len(newjsonresponse['rows']) : + if newjsonresponse["total"] == len(newjsonresponse["rows"]): return newjsonresponse else: logging.error("We couldn't seem to get all of the model numbers") - raise SystemExit("Unable to get all model objects from Snipe-IT instanace") + raise SystemExit( + "Unable to get all model objects from Snipe-IT instanace" + ) else: - logging.error('When we tried to retreive a list of models, Snipe-IT responded with error status code:{} - {}'.format(response.status_code, response.content)) + logging.error( + "When we tried to retreive a list of models, Snipe-IT responded with error status code:{} - {}".format( + response.status_code, response.content + ) + ) raise SystemExit("Snipe models API endpoint failed.") else: - logging.error('When we tried to retreive a list of models, Snipe-IT responded with error status code:{} - {}'.format(response.status_code, response.content)) + logging.error( + "When we tried to retreive a list of models, Snipe-IT responded with error status code:{} - {}".format( + response.status_code, response.content + ) + ) raise SystemExit("Snipe models API endpoint failed.") + # Recursive function returns all users in a Snipe Instance, 100 at a time. def get_snipe_users(previous=[]): - user_id_url = '{}/api/v1/users'.format(snipe_base) - payload = { - 'limit': 100, - 'offset': len(previous) - } - logging.debug('The payload for the snipe users GET is {}'.format(payload)) - response = session.get(user_id_url, headers=snipeheaders, params=payload, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) + user_id_url = "{}/api/v1/users".format(snipe_base) + payload = {"limit": 100, "offset": len(previous)} + logging.debug("The payload for the snipe users GET is {}".format(payload)) + response = session.get( + user_id_url, + headers=snipeheaders, + params=payload, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) response_json = response.json() - current = response_json['rows'] + current = response_json["rows"] if len(previous) != 0: current = previous + current - if response_json['total'] > len(current): - logging.debug('We have more than 100 users, get the next page - total: {} current: {}'.format(response_json['total'], len(current))) + if response_json["total"] > len(current): + logging.debug( + "We have more than 100 users, get the next page - total: {} current: {}".format( + response_json["total"], len(current) + ) + ) return get_snipe_users(current) else: return current + # Function to search snipe for a user def get_snipe_user_id(username): - if username == '': + if username == "": return "NotFound" username = username.lower() for user in snipe_users: for value in user.values(): if str(value).lower() == username: - id = user['id'] + id = user["id"] return id if user_args.users_no_search: - logging.debug("No matches in snipe_users for {}, not querying the API for the next closest match since we've been told not to".format(username)) + logging.debug( + "No matches in snipe_users for {}, not querying the API for the next closest match since we've been told not to".format( + username + ) + ) return "NotFound" - logging.debug('No matches in snipe_users for {}, querying the API for the next closest match'.format(username)) - user_id_url = '{}/api/v1/users'.format(snipe_base) - payload = { - 'search':username, - 'limit':1, - 'sort':'username', - 'order':'asc' - } - logging.debug('The payload for the snipe user search is: {}'.format(payload)) - response = session.get(user_id_url, headers=snipeheaders, params=payload, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) + logging.debug( + "No matches in snipe_users for {}, querying the API for the next closest match".format( + username + ) + ) + user_id_url = "{}/api/v1/users".format(snipe_base) + payload = {"search": username, "limit": 1, "sort": "username", "order": "asc"} + logging.debug("The payload for the snipe user search is: {}".format(payload)) + response = session.get( + user_id_url, + headers=snipeheaders, + params=payload, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) try: - return response.json()['rows'][0]['id'] + return response.json()["rows"][0]["id"] except: return "NotFound" + # Function that creates a new Snipe Model - not an asset - with a JSON payload def create_snipe_model(payload): - api_url = '{}/api/v1/models'.format(snipe_base) - logging.debug('Calling to create new snipe model type against: {}\nThe payload for the POST request is:{}\nThe request headers can be found near the start of the output.'.format(api_url, payload)) - response = session.post(api_url, headers=snipeheaders, json=payload, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) + api_url = "{}/api/v1/models".format(snipe_base) + logging.debug( + "Calling to create new snipe model type against: {}\nThe payload for the POST request is:{}\nThe request headers can be found near the start of the output.".format( + api_url, payload + ) + ) + response = session.post( + api_url, + headers=snipeheaders, + json=payload, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) if response.status_code == 200: jsonresponse = response.json() - modelnumbers[jsonresponse['payload']['model_number']] = jsonresponse['payload']['id'] + modelnumbers[jsonresponse["payload"]["model_number"]] = jsonresponse["payload"][ + "id" + ] return True else: - logging.warning('Error code: {} while trying to create a new model.'.format(response.status_code)) + logging.warning( + "Error code: {} while trying to create a new model.".format( + response.status_code + ) + ) return False + # Function to create a new asset by passing array def create_snipe_asset(payload): - api_url = '{}/api/v1/hardware'.format(snipe_base) - logging.debug('Calling to create a new asset against: {}\nThe payload for the POST request is:{}\nThe request headers can be found near the start of the output.'.format(api_url, payload)) - response = session.post(api_url, headers=snipeheaders, json=payload, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) + api_url = "{}/api/v1/hardware".format(snipe_base) + logging.debug( + "Calling to create a new asset against: {}\nThe payload for the POST request is:{}\nThe request headers can be found near the start of the output.".format( + api_url, payload + ) + ) + response = session.post( + api_url, + headers=snipeheaders, + json=payload, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) logging.debug(response.text) if response.status_code == 200: logging.debug("Got back status code: 200 - {}".format(response.content)) jsonresponse = response.json() - if jsonresponse['status'] == "error": - logging.error('Asset creation failed for asset {} with error {}'.format(payload['name'],jsonresponse['messages'])) - return 'ERROR', response - return 'AssetCreated', response + if jsonresponse["status"] == "error": + logging.error( + "Asset creation failed for asset {} with error {}".format( + payload["name"], jsonresponse["messages"] + ) + ) + return "ERROR", response + return "AssetCreated", response else: - logging.error('Asset creation failed for asset {} with error {}'.format(payload['name'],response.text)) - return 'ERROR', response + logging.error( + "Asset creation failed for asset {} with error {}".format( + payload["name"], response.text + ) + ) + return "ERROR", response + # Function that updates a snipe asset with a JSON payload def update_snipe_asset(snipe_id, payload): if user_args.dryrun: - logging.debug("Dry run mode is enabled. We would have updated ID: {} with the following payload: {}".format(snipe_id, payload)) + logging.debug( + "Dry run mode is enabled. We would have updated ID: {} with the following payload: {}".format( + snipe_id, payload + ) + ) return True - api_url = '{}/api/v1/hardware/{}'.format(snipe_base, snipe_id) - logging.debug('The payload for the snipe update is: {}'.format(payload)) - response = session.patch(api_url, headers=snipeheaders, json=payload, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) + api_url = "{}/api/v1/hardware/{}".format(snipe_base, snipe_id) + logging.debug("The payload for the snipe update is: {}".format(payload)) + response = session.patch( + api_url, + headers=snipeheaders, + json=payload, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) # Verify that the payload updated properly. goodupdate = True if response.status_code == 200: - logging.debug("Got back status code: 200 - Checking the payload updated properly: If you error here it's because you configure the API mapping right.") + logging.debug( + "Got back status code: 200 - Checking the payload updated properly: If you error here it's because you configure the API mapping right." + ) jsonresponse = response.json() # Check if there's an Error and Log it, or parse the payload. - if jsonresponse['status'] == "error": - logging.error('Unable to update ID: {}. Error "{}"'.format(snipe_id, jsonresponse['messages'])) + if jsonresponse["status"] == "error": + logging.error( + 'Unable to update ID: {}. Error "{}"'.format( + snipe_id, jsonresponse["messages"] + ) + ) goodupdate = False else: for key in payload: - if key == 'purchase_date': + if key == "purchase_date": payload[key] = payload[key] + " 00:00:00" - if payload[key] == '': + if payload[key] == "": payload[key] = None - if jsonresponse['payload'][key] != payload[key]: - logging.warning('Unable to update ID: {}. We failed to update the {} field with "{}"'.format(snipe_id, key, payload[key])) + if jsonresponse["payload"][key] != payload[key]: + logging.warning( + 'Unable to update ID: {}. We failed to update the {} field with "{}"'.format( + snipe_id, key, payload[key] + ) + ) goodupdate = False else: - logging.info("Sucessfully updated {} with: {}".format(key, payload[key])) + logging.info( + "Sucessfully updated {} with: {}".format(key, payload[key]) + ) return goodupdate else: - logging.error('Whoops. Got an error status code while updating ID {}: {} - {}'.format(snipe_id, response.status_code, response.content)) + logging.error( + "Whoops. Got an error status code while updating ID {}: {} - {}".format( + snipe_id, response.status_code, response.content + ) + ) return False + # Function that checks in an asset in snipe def checkin_snipe_asset(asset_id): - api_url = '{}/api/v1/hardware/{}/checkin'.format(snipe_base, asset_id) - payload = { - 'note':'checked in by script from Jamf' - } - logging.debug('The payload for the snipe checkin is: {}'.format(payload)) - response = session.post(api_url, headers=snipeheaders, json=payload, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) - logging.debug('The response from Snipe IT is: {}'.format(response.json())) + api_url = "{}/api/v1/hardware/{}/checkin".format(snipe_base, asset_id) + payload = {"note": "checked in by script from Jamf"} + logging.debug("The payload for the snipe checkin is: {}".format(payload)) + response = session.post( + api_url, + headers=snipeheaders, + json=payload, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) + logging.debug("The response from Snipe IT is: {}".format(response.json())) if response.status_code == 200: logging.debug("Got back status code: 200 - {}".format(response.content)) return "CheckedOut" else: return response + # Function that checks out an asset in snipe def checkout_snipe_asset(user, asset_id, checked_out_user=None): - logging.debug('Asset {} is being checked out to {}'.format(user, asset_id)) + logging.debug("Asset {} is being checked out to {}".format(user, asset_id)) user_id = get_snipe_user_id(user) - if user_id == 'NotFound': + if user_id == "NotFound": logging.info("User {} not found".format(user)) return "NotFound" if checked_out_user == None: logging.info("Not checked out, checking out to {}".format(user)) elif checked_out_user == "NewAsset": - logging.info("First time this asset will be checked out, checking out to {}".format(user)) - elif checked_out_user['id'] == user_id: + logging.info( + "First time this asset will be checked out, checking out to {}".format(user) + ) + elif checked_out_user["id"] == user_id: logging.info(str(asset_id) + " already checked out to user " + user) - return 'CheckedOut' + return "CheckedOut" else: - logging.info("Checking in {} to check it out to {}".format(asset_id,user)) + logging.info("Checking in {} to check it out to {}".format(asset_id, user)) checkin_snipe_asset(asset_id) - api_url = '{}/api/v1/hardware/{}/checkout'.format(snipe_base, asset_id) - logging.info("Checking out {} to check it out to {}".format(asset_id,user)) + api_url = "{}/api/v1/hardware/{}/checkout".format(snipe_base, asset_id) + logging.info("Checking out {} to check it out to {}".format(asset_id, user)) payload = { - 'checkout_to_type':'user', - 'assigned_user':user_id, - 'note':'Assignment made automatically, via script from Jamf.' + "checkout_to_type": "user", + "assigned_user": user_id, + "note": "Assignment made automatically, via script from Jamf.", } - logging.debug('The payload for the snipe checkin is: {}'.format(payload)) - response = session.post(api_url, headers=snipeheaders, json=payload, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) - logging.debug('The response from Snipe IT is: {}'.format(response.json())) + logging.debug("The payload for the snipe checkin is: {}".format(payload)) + response = session.post( + api_url, + headers=snipeheaders, + json=payload, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) + logging.debug("The response from Snipe IT is: {}".format(response.json())) if response.status_code == 200: logging.debug("Got back status code: 200 - {}".format(response.content)) return "CheckedOut" else: - logging.error('Asset checkout failed for asset {} with error {}'.format(asset_id,response.text)) + logging.error( + "Asset checkout failed for asset {} with error {}".format( + asset_id, response.text + ) + ) return response + ### Run Testing ### # Report if we're verifying SSL or not. logging.info("SSL Verification is set to: {}".format(user_args.do_not_verify_ssl)) @@ -688,27 +1148,41 @@ logging.info("SSL Verification is set to: {}".format(user_args.do_not_verify_ssl # Do some tests to see if the hosts are up. Don't use hooks for these as we don't have tokens yet. logging.info("Running tests to see if hosts are up.") try: - SNIPE_UP = True if session.get(snipe_base, verify=user_args.do_not_verify_ssl).status_code == 200 else False + SNIPE_UP = ( + True + if session.get(snipe_base, verify=user_args.do_not_verify_ssl).status_code + == 200 + else False + ) except Exception as e: logging.exception(e) SNIPE_UP = False try: - JAMF_UP = True if session.get(jamfpro_base, verify=user_args.do_not_verify_ssl).status_code in (200, 401) else False + JAMF_UP = ( + True + if session.get(jamfpro_base, verify=user_args.do_not_verify_ssl).status_code + in (200, 401) + else False + ) except Exception as e: logging.exception(e) JAMF_UP = False if not SNIPE_UP: - logging.error('Snipe-IT looks like it is down from here. \nPlease check your config in the settings.conf file, or your instance.') + logging.error( + "Snipe-IT looks like it is down from here. \nPlease check your config in the settings.conf file, or your instance." + ) else: - logging.info('We were able to get a good response from your Snipe-IT instance.') + logging.info("We were able to get a good response from your Snipe-IT instance.") if not JAMF_UP: - logging.error('JAMFPro looks down from here. \nPlease check the your config in the settings.conf file, or your hosted JAMFPro instance.') + logging.error( + "JAMFPro looks down from here. \nPlease check the your config in the settings.conf file, or your hosted JAMFPro instance." + ) else: - logging.info('We were able to get a good response from your JAMFPro instance.') + logging.info("We were able to get a good response from your JAMFPro instance.") # Exit if you can't contact SNIPE -if ( JAMF_UP == False ) or ( SNIPE_UP == False ): +if (JAMF_UP == False) or (SNIPE_UP == False): raise SystemExit("Error: Host could not be contacted.") # Test that we can actually connect with the API keys by getting a bearer token. @@ -721,32 +1195,43 @@ logging.info("Finished running our tests.") # Get a list of known models from Snipe logging.info("Getting a list of computer models that snipe knows about.") snipemodels = get_snipe_models() -logging.debug("Parsing the {} model results for models with model numbers.".format(len(snipemodels['rows']))) +logging.debug( + "Parsing the {} model results for models with model numbers.".format( + len(snipemodels["rows"]) + ) +) modelnumbers = {} -for model in snipemodels['rows']: - if model['model_number'] == "": - logging.debug("The model, {}, did not have a model number. Skipping.".format(model['name'])) +for model in snipemodels["rows"]: + if model["model_number"] == "": + logging.debug( + "The model, {}, did not have a model number. Skipping.".format( + model["name"] + ) + ) continue - modelnumbers[model['model_number']] = model['id'] + modelnumbers[model["model_number"]] = model["id"] logging.info("Our list of models has {} entries.".format(len(modelnumbers))) -logging.debug("Here's the list of the {} models and their id's that we were able to collect:\n{}".format(len(modelnumbers), modelnumbers)) +logging.debug( + "Here's the list of the {} models and their id's that we were able to collect:\n{}".format( + len(modelnumbers), modelnumbers + ) +) # Get the IDS of all active assets. -if 'computer_group_id' in config['jamf'] and config['jamf']['computer_group_id']: +if "computer_group_id" in config["jamf"] and config["jamf"]["computer_group_id"]: logging.info("Getting list of computers from JAMF by computer group id.") - jamf_computer_list = get_jamf_computers_by_group(config['jamf']['computer_group_id']) + jamf_computer_list = get_jamf_computers_by_group( + config["jamf"]["computer_group_id"] + ) else: jamf_computer_list = get_jamf_computers() -if 'mobile_group_id' in config['jamf'] and config['jamf']['mobile_group_id']: +if "mobile_group_id" in config["jamf"] and config["jamf"]["mobile_group_id"]: logging.info("Getting list of mobiles from JAMF by mobile group id.") - jamf_mobile_list = get_jamf_mobiles_by_group(config['jamf']['mobile_group_id']) + jamf_mobile_list = get_jamf_mobiles_by_group(config["jamf"]["mobile_group_id"]) else: jamf_mobile_list = get_jamf_mobiles() -jamf_types = { - 'computers': jamf_computer_list, - 'mobile_devices': jamf_mobile_list -} +jamf_types = {"computers": jamf_computer_list, "mobile_devices": jamf_mobile_list} # Get a list of users from Snipe if the user has specified # they're syncing users @@ -756,18 +1241,22 @@ if user_args.users or user_args.users_force or user_args.users_inverse: TotalNumber = 0 if user_args.computers: - TotalNumber = len(jamf_types['computers']['computers']) + TotalNumber = len(jamf_types["computers"]["computers"]) elif user_args.mobiles: - TotalNumber = len(jamf_types['mobile_devices']['mobile_devices']) + TotalNumber = len(jamf_types["mobile_devices"]["mobile_devices"]) else: for jamf_type in jamf_types: TotalNumber += len(jamf_types[jamf_type][jamf_type]) # Make sure we have a good list. if jamf_computer_list != None: - logging.info('Received a list of JAMF assets that had {} entries.'.format(TotalNumber)) + logging.info( + "Received a list of JAMF assets that had {} entries.".format(TotalNumber) + ) else: - logging.error("We were not able to retreive a list of assets from your JAMF instance. It's likely that your settings, or credentials are incorrect. Check your settings.conf and verify you can make API calls outside of this system with the credentials found in your settings.conf") + logging.error( + "We were not able to retreive a list of assets from your JAMF instance. It's likely that your settings, or credentials are incorrect. Check your settings.conf and verify you can make API calls outside of this system with the credentials found in your settings.conf" + ) raise SystemExit("Unable to get JAMF Computers.") # After this point we start editing data, so quit if this is a dryrun @@ -775,110 +1264,174 @@ if user_args.connection_test: raise SystemExit("Connection Test: Complete.") # From this point on, we're editing data. -logging.info('Starting to Update Inventory') +logging.info("Starting to Update Inventory") CurrentNumber = 0 for jamf_type in jamf_types: if user_args.computers: - if jamf_type != 'computers': + if jamf_type != "computers": continue if user_args.mobiles: - if jamf_type != 'mobile_devices': + if jamf_type != "mobile_devices": continue for jamf_asset in jamf_types[jamf_type][jamf_type]: CurrentNumber += 1 - logging.info("Processing entry {} out of {} - JAMFID: {} - NAME: {}".format(CurrentNumber, TotalNumber, jamf_asset['id'], jamf_asset['name'])) + logging.info( + "Processing entry {} out of {} - JAMFID: {} - NAME: {}".format( + CurrentNumber, TotalNumber, jamf_asset["id"], jamf_asset["name"] + ) + ) # Search through the list by ID for all asset information\ - if jamf_type == 'computers': - jamf = search_jamf_asset(jamf_asset['id']) - elif jamf_type == 'mobile_devices': - jamf = search_jamf_mobile(jamf_asset['id']) + if jamf_type == "computers": + jamf = search_jamf_asset(jamf_asset["id"]) + elif jamf_type == "mobile_devices": + jamf = search_jamf_mobile(jamf_asset["id"]) if jamf == None: continue # If the entry doesn't contain a serial, then we need to skip this entry. - if jamf['general']['serial_number'] == 'Not Available': - logging.warning("The serial number is not available in JAMF. This is normal for DEP enrolled devices that have not yet checked in for the first time and for personal mobile devices. Since there's no serial number yet, we'll skip it for now.") + if jamf["general"]["serial_number"] == "Not Available": + logging.warning( + "The serial number is not available in JAMF. This is normal for DEP enrolled devices that have not yet checked in for the first time and for personal mobile devices. Since there's no serial number yet, we'll skip it for now." + ) continue - if jamf['general']['serial_number'] == None: - logging.warning("The serial number is not available in JAMF. This is normal for DEP enrolled devices that have not yet checked in for the first time and for personal mobile devices. Since there's no serial number yet, we'll skip it for now.") + if jamf["general"]["serial_number"] == None: + logging.warning( + "The serial number is not available in JAMF. This is normal for DEP enrolled devices that have not yet checked in for the first time and for personal mobile devices. Since there's no serial number yet, we'll skip it for now." + ) continue # Check that the model number exists in snipe, if not create it. - if jamf_type == 'computers': - if jamf['hardware']['model_identifier'] not in modelnumbers and jamf['hardware']['model_identifier']: - logging.info("Could not find a model ID in snipe for: {}".format(jamf['hardware']['model_identifier'])) - newmodel = {"category_id":config['snipe-it']['computer_model_category_id'],"manufacturer_id":apple_manufacturer_id,"name": jamf['hardware']['model'],"model_number":jamf['hardware']['model_identifier']} - if 'computer_custom_fieldset_id' in config['snipe-it']: - fieldset_split = config['snipe-it']['computer_custom_fieldset_id'] - newmodel['fieldset_id'] = fieldset_split + if jamf_type == "computers": + if ( + jamf["hardware"]["model_identifier"] not in modelnumbers + and jamf["hardware"]["model_identifier"] + ): + logging.info( + "Could not find a model ID in snipe for: {}".format( + jamf["hardware"]["model_identifier"] + ) + ) + newmodel = { + "category_id": config["snipe-it"]["computer_model_category_id"], + "manufacturer_id": apple_manufacturer_id, + "name": jamf["hardware"]["model"], + "model_number": jamf["hardware"]["model_identifier"], + } + if "computer_custom_fieldset_id" in config["snipe-it"]: + fieldset_split = config["snipe-it"]["computer_custom_fieldset_id"] + newmodel["fieldset_id"] = fieldset_split create_snipe_model(newmodel) - elif jamf_type == 'mobile_devices': - if jamf['general']['model_identifier'] not in modelnumbers and jamf['general']['model_identifier']: - logging.info("Could not find a model ID in snipe for: {}".format(jamf['general']['model_identifier'])) - newmodel = {"category_id":config['snipe-it']['mobile_model_category_id'],"manufacturer_id":apple_manufacturer_id,"name": jamf['general']['model'],"model_number":jamf['general']['model_identifier']} - if 'mobile_custom_fieldset_id' in config['snipe-it']: - fieldset_split = config['snipe-it']['mobile_custom_fieldset_id'] - newmodel['fieldset_id'] = fieldset_split + elif jamf_type == "mobile_devices": + if ( + jamf["general"]["model_identifier"] not in modelnumbers + and jamf["general"]["model_identifier"] + ): + logging.info( + "Could not find a model ID in snipe for: {}".format( + jamf["general"]["model_identifier"] + ) + ) + newmodel = { + "category_id": config["snipe-it"]["mobile_model_category_id"], + "manufacturer_id": apple_manufacturer_id, + "name": jamf["general"]["model"], + "model_number": jamf["general"]["model_identifier"], + } + if "mobile_custom_fieldset_id" in config["snipe-it"]: + fieldset_split = config["snipe-it"]["mobile_custom_fieldset_id"] + newmodel["fieldset_id"] = fieldset_split create_snipe_model(newmodel) # Pass the SN from JAMF to search for a match in Snipe - snipe = search_snipe_asset(jamf['general']['serial_number']) + snipe = search_snipe_asset(jamf["general"]["serial_number"]) # Create a new asset if there's no match: - if snipe == 'NoMatch': - logging.info("Creating a new asset in snipe for JAMF ID {} - {}".format(jamf['general']['id'], jamf['general']['name'])) + if snipe == "NoMatch": + logging.info( + "Creating a new asset in snipe for JAMF ID {} - {}".format( + jamf["general"]["id"], jamf["general"]["name"] + ) + ) # This section checks to see if the asset tag was already put into JAMF, if not it creates one with with Jamf's ID. - if jamf['general']['asset_tag'] == '': + if jamf["general"]["asset_tag"] == "": jamf_asset_tag = None - logging.debug('No asset tag found in Jamf, checking settings.conf for alternative specified field.') - if 'asset_tag' in config['snipe-it']: - tag_split = config['snipe-it']['asset_tag'].split() + logging.debug( + "No asset tag found in Jamf, checking settings.conf for alternative specified field." + ) + if "asset_tag" in config["snipe-it"]: + tag_split = config["snipe-it"]["asset_tag"].split() try: - jamf_asset_tag = jamf['{}'.format(tag_split[0])]['{}'.format(tag_split[1])] + jamf_asset_tag = jamf["{}".format(tag_split[0])][ + "{}".format(tag_split[1]) + ] except: - if jamf_type == 'mobile_devices': - jamf_asset_tag = 'jamfid-m-{}'.format(jamf['general']['id']) - elif jamf_type == 'computers': - jamf_asset_tag = 'jamfid-{}'.format(jamf['general']['id']) + if jamf_type == "mobile_devices": + jamf_asset_tag = "jamfid-m-{}".format(jamf["general"]["id"]) + elif jamf_type == "computers": + jamf_asset_tag = "jamfid-{}".format(jamf["general"]["id"]) else: - logging.error("Could not generate an asset tag for this device. Skipping") + logging.error( + "Could not generate an asset tag for this device. Skipping" + ) # Dump the object for debugging. logging.verbose(jamf) continue - #raise SystemError('No such attribute {} in the jamf payload. Please check your settings.conf file'.format(tag_split)) - if jamf_asset_tag == None or jamf_asset_tag == '': - logging.debug('No custom configuration found in settings.conf for asset tag name upon asset creation.') - if jamf_type == 'mobile_devices': - jamf_asset_tag = 'jamfid-m-{}'.format(jamf['general']['id']) - elif jamf_type == 'computers': - jamf_asset_tag = 'jamfid-{}'.format(jamf['general']['id']) + # raise SystemError('No such attribute {} in the jamf payload. Please check your settings.conf file'.format(tag_split)) + if jamf_asset_tag == None or jamf_asset_tag == "": + logging.debug( + "No custom configuration found in settings.conf for asset tag name upon asset creation." + ) + if jamf_type == "mobile_devices": + jamf_asset_tag = "jamfid-m-{}".format(jamf["general"]["id"]) + elif jamf_type == "computers": + jamf_asset_tag = "jamfid-{}".format(jamf["general"]["id"]) else: - jamf_asset_tag = jamf['general']['asset_tag'] - logging.info("Asset tag found in Jamf, setting it to: {}".format(jamf_asset_tag)) + jamf_asset_tag = jamf["general"]["asset_tag"] + logging.info( + "Asset tag found in Jamf, setting it to: {}".format(jamf_asset_tag) + ) # Create the payload - if jamf_type == 'mobile_devices': + if jamf_type == "mobile_devices": logging.debug("Payload is being made for a mobile device") - newasset = {'asset_tag': jamf_asset_tag, 'model_id': modelnumbers['{}'.format(jamf['general']['model_identifier'])], 'name': jamf['general']['name'], 'status_id': defaultStatus,'serial': jamf['general']['serial_number']} - elif jamf_type == 'computers': + newasset = { + "asset_tag": jamf_asset_tag, + "model_id": modelnumbers[ + "{}".format(jamf["general"]["model_identifier"]) + ], + "name": jamf["general"]["name"], + "status_id": defaultStatus, + "serial": jamf["general"]["serial_number"], + } + elif jamf_type == "computers": logging.debug("Payload is being made for a computer") - newasset = {'asset_tag': jamf_asset_tag,'model_id': modelnumbers['{}'.format(jamf['hardware']['model_identifier'])], 'name': jamf['general']['name'], 'status_id': defaultStatus,'serial': jamf['general']['serial_number']} + newasset = { + "asset_tag": jamf_asset_tag, + "model_id": modelnumbers[ + "{}".format(jamf["hardware"]["model_identifier"]) + ], + "name": jamf["general"]["name"], + "status_id": defaultStatus, + "serial": jamf["general"]["serial_number"], + } else: - for snipekey in config['{}-api-mapping'.format(jamf_type)]: - jamfsplit = config['{}-api-mapping'.format(jamf_type)][snipekey].split() + for snipekey in config["{}-api-mapping".format(jamf_type)]: + jamfsplit = config["{}-api-mapping".format(jamf_type)][ + snipekey + ].split() try: for i, item in enumerate(jamfsplit): try: item = int(item) except ValueError: - logging.debug('{} is not an integer'.format(item)) + logging.debug("{} is not an integer".format(item)) if i == 0: jamf_value = jamf[item] else: - if jamfsplit[0] == 'extension_attributes': + if jamfsplit[0] == "extension_attributes": for attribute in jamf_value: - if attribute['id'] == item: - jamf_value = attribute['value'] + if attribute["id"] == item: + jamf_value = attribute["value"] else: jamf_value = jamf_value[item] newasset[snipekey] = jamf_value @@ -886,109 +1439,194 @@ for jamf_type in jamf_types: continue # Reset the payload without the asset_tag if auto_incrementing flag is set. if user_args.auto_incrementing: - newasset.pop('asset_tag', None) + newasset.pop("asset_tag", None) new_snipe_asset = create_snipe_asset(newasset) logging.debug(new_snipe_asset) if new_snipe_asset[0] != "AssetCreated": continue if user_args.users or user_args.users_force or user_args.users_inverse: - jamfsplit = config['user-mapping']['jamf_api_field'].split() + jamfsplit = config["user-mapping"]["jamf_api_field"].split() if jamfsplit[1] not in jamf[jamfsplit[0]]: - logging.info("Couldn't find {} for this device in {}, not checking it out.".format(jamfsplit[1], jamfsplit[0])) + logging.info( + "Couldn't find {} for this device in {}, not checking it out.".format( + jamfsplit[1], jamfsplit[0] + ) + ) continue - logging.info('Checking out new item {} to user {}'.format(jamf['general']['name'], jamf['{}'.format(jamfsplit[0])]['{}'.format(jamfsplit[1])])) - checkout_snipe_asset(jamf['{}'.format(jamfsplit[0])]['{}'.format(jamfsplit[1])],new_snipe_asset[1].json()['payload']['id'], "NewAsset") + logging.info( + "Checking out new item {} to user {}".format( + jamf["general"]["name"], + jamf["{}".format(jamfsplit[0])]["{}".format(jamfsplit[1])], + ) + ) + checkout_snipe_asset( + jamf["{}".format(jamfsplit[0])]["{}".format(jamfsplit[1])], + new_snipe_asset[1].json()["payload"]["id"], + "NewAsset", + ) # Log an error if there's an issue, or more than once match. - elif snipe == 'MultiMatch': - logging.warning("WARN: You need to resolve multiple assets with the same serial number in your inventory. If you can't find them in your inventory, you might need to purge your deleted records. You can find that in the Snipe Admin settings. Skipping serial number {} for now.".format(jamf['general']['serial_number'])) - elif snipe == 'ERROR': - logging.error("We got an error when looking up serial number {} in snipe, which shouldn't happen at this point. Check your snipe instance and setup. Skipping for now.".format(jamf['general']['serial_number'])) + elif snipe == "MultiMatch": + logging.warning( + "WARN: You need to resolve multiple assets with the same serial number in your inventory. If you can't find them in your inventory, you might need to purge your deleted records. You can find that in the Snipe Admin settings. Skipping serial number {} for now.".format( + jamf["general"]["serial_number"] + ) + ) + elif snipe == "ERROR": + logging.error( + "We got an error when looking up serial number {} in snipe, which shouldn't happen at this point. Check your snipe instance and setup. Skipping for now.".format( + jamf["general"]["serial_number"] + ) + ) else: # Only update if JAMF has more recent info. - snipe_id = snipe['rows'][0]['id'] - snipe_time = snipe['rows'][0]['updated_at']['datetime'] - if jamf_type == 'computers': - jamf_time = jamf['general']['report_date'] - elif jamf_type == 'mobile_devices': - jamf_time = jamf['general']['last_inventory_update'] + snipe_id = snipe["rows"][0]["id"] + snipe_time = snipe["rows"][0]["updated_at"]["datetime"] + if jamf_type == "computers": + jamf_time = jamf["general"]["report_date"] + elif jamf_type == "mobile_devices": + jamf_time = jamf["general"]["last_inventory_update"] # Check to see that the JAMF record is newer than the previous Snipe update, or if it is a new record in Snipe - if ( jamf_time > snipe_time ) or ( user_args.force ): + if (jamf_time > snipe_time) or (user_args.force): if user_args.force: - logging.debug("Forced the Update regardless of the timestamps below.") - logging.debug("Updating the Snipe asset because JAMF has a more recent timestamp: {} > {} or the Snipe Record is new".format(jamf_time, snipe_time)) + logging.debug( + "Forced the Update regardless of the timestamps below." + ) + logging.debug( + "Updating the Snipe asset because JAMF has a more recent timestamp: {} > {} or the Snipe Record is new".format( + jamf_time, snipe_time + ) + ) updates = {} - for snipekey in config['{}-api-mapping'.format(jamf_type)]: + for snipekey in config["{}-api-mapping".format(jamf_type)]: try: - jamfsplit = config['{}-api-mapping'.format(jamf_type)][snipekey].split() + jamfsplit = config["{}-api-mapping".format(jamf_type)][ + snipekey + ].split() for i, item in enumerate(jamfsplit): try: item = int(item) except ValueError: - logging.debug('{} is not an integer'.format(item)) + logging.debug("{} is not an integer".format(item)) if i == 0: jamf_value = jamf[item] else: - if jamfsplit[0] == 'extension_attributes': + if jamfsplit[0] == "extension_attributes": for attribute in jamf_value: - if attribute['id'] == item: - jamf_value = attribute['value'] + if attribute["id"] == item: + jamf_value = attribute["value"] else: jamf_value = jamf_value[item] payload = {snipekey: jamf_value} latestvalue = jamf_value except (KeyError, TypeError): - logging.debug("Skipping the payload, because the JAMF key we're mapping to doesn't exist") + logging.debug( + "Skipping the payload, because the JAMF key we're mapping to doesn't exist" + ) continue # Need to check that we're not needlessly updating the asset. # If it's a custom value it'll fail the first section and send it to except section that will parse custom sections. try: - if snipe['rows'][0][snipekey] != latestvalue: + if snipe["rows"][0][snipekey] != latestvalue: updates.update(payload) else: - logging.debug("Skipping the payload, because it already exits.") + logging.debug( + "Skipping the payload, because it already exits." + ) except: - logging.debug("The snipekey lookup failed, which means it's a custom field. Parsing those to see if it needs to be updated or not.") + logging.debug( + "The snipekey lookup failed, which means it's a custom field. Parsing those to see if it needs to be updated or not." + ) needsupdate = False - for CustomField in snipe['rows'][0]['custom_fields']: - if snipe['rows'][0]['custom_fields'][CustomField]['field'] == snipekey : - if snipe['rows'][0]['custom_fields'][CustomField]['value'] != str(latestvalue): - logging.debug("Found the field, and the value needs to be updated from {} to {}".format(snipe['rows'][0]['custom_fields'][CustomField]['value'], latestvalue)) + for CustomField in snipe["rows"][0]["custom_fields"]: + if ( + snipe["rows"][0]["custom_fields"][CustomField]["field"] + == snipekey + ): + if snipe["rows"][0]["custom_fields"][CustomField][ + "value" + ] != str(latestvalue): + logging.debug( + "Found the field, and the value needs to be updated from {} to {}".format( + snipe["rows"][0]["custom_fields"][ + CustomField + ]["value"], + latestvalue, + ) + ) needsupdate = True if needsupdate == True: updates.update(payload) else: - logging.debug("Skipping the payload, because it already exists, or the Snipe key we're mapping to doesn't.") + logging.debug( + "Skipping the payload, because it already exists, or the Snipe key we're mapping to doesn't." + ) if updates: update_snipe_asset(snipe_id, updates) - if ((user_args.users or user_args.users_inverse) and (snipe['rows'][0]['assigned_to'] == None) == user_args.users) or user_args.users_force: + if ( + (user_args.users or user_args.users_inverse) + and (snipe["rows"][0]["assigned_to"] == None) == user_args.users + ) or user_args.users_force: - if snipe['rows'][0]['status_label']['status_meta'] in ('deployable', 'deployed'): - jamfsplit = config['user-mapping']['jamf_api_field'].split() + if snipe["rows"][0]["status_label"]["status_meta"] in ( + "deployable", + "deployed", + ): + jamfsplit = config["user-mapping"]["jamf_api_field"].split() if jamfsplit[1] not in jamf[jamfsplit[0]]: - logging.info("Couldn't find {} for this device in {}, not checking it out.".format(jamfsplit[1], jamfsplit[0])) + logging.info( + "Couldn't find {} for this device in {}, not checking it out.".format( + jamfsplit[1], jamfsplit[0] + ) + ) continue - checkout_snipe_asset(jamf['{}'.format(jamfsplit[0])]['{}'.format(jamfsplit[1])], snipe_id, snipe['rows'][0]['assigned_to']) + checkout_snipe_asset( + jamf["{}".format(jamfsplit[0])]["{}".format(jamfsplit[1])], + snipe_id, + snipe["rows"][0]["assigned_to"], + ) else: - logging.info("Can't checkout {} since the status isn't set to deployable".format(jamf['general']['name'])) + logging.info( + "Can't checkout {} since the status isn't set to deployable".format( + jamf["general"]["name"] + ) + ) else: - logging.info("Snipe Record is newer than the JAMF record. Nothing to sync. If this wrong, then force an inventory update in JAMF") - logging.debug("Not updating the Snipe asset because Snipe has a more recent timestamp: {} < {}".format(jamf_time, snipe_time)) + logging.info( + "Snipe Record is newer than the JAMF record. Nothing to sync. If this wrong, then force an inventory update in JAMF" + ) + logging.debug( + "Not updating the Snipe asset because Snipe has a more recent timestamp: {} < {}".format( + jamf_time, snipe_time + ) + ) # Update/Sync the Snipe Asset Tag Number back to JAMF # The user arg below is set to false if it's called, so this would fail if the user called it. - if (jamf['general']['asset_tag'] != snipe['rows'][0]['asset_tag']) and user_args.do_not_update_jamf : - logging.info("JAMF doesn't have the same asset tag as SNIPE so we'll update it because it should be authoritative.") - if snipe['rows'][0]['asset_tag'][0]: - if jamf_type == 'computers': - update_jamf_asset_tag("{}".format(jamf['general']['id']), '{}'.format(snipe['rows'][0]['asset_tag'])) - logging.info("Device is a computer, updating computer record") - elif jamf_type == 'mobile_devices': - update_jamf_mobiledevice_asset_tag("{}".format(jamf['general']['id']), '{}'.format(snipe['rows'][0]['asset_tag'])) - logging.info("Device is a mobile device, updating the mobile device record") - -logging.debug('Total amount of API calls made: {}'.format(api_count)) + if ( + jamf["general"]["asset_tag"] != snipe["rows"][0]["asset_tag"] + ) and user_args.do_not_update_jamf: + logging.info( + "JAMF doesn't have the same asset tag as SNIPE so we'll update it because it should be authoritative." + ) + if snipe["rows"][0]["asset_tag"][0]: + if jamf_type == "computers": + update_jamf_asset_tag( + "{}".format(jamf["general"]["id"]), + "{}".format(snipe["rows"][0]["asset_tag"]), + ) + logging.info("Device is a computer, updating computer record") + elif jamf_type == "mobile_devices": + update_jamf_mobiledevice_asset_tag( + "{}".format(jamf["general"]["id"]), + "{}".format(snipe["rows"][0]["asset_tag"]), + ) + logging.info( + "Device is a mobile device, updating the mobile device record" + ) + +logging.debug("Total amount of API calls made: {}".format(api_count))