diff --git a/netlolca/NetlOlca.py b/netlolca/NetlOlca.py index 012239c..3453c5c 100644 --- a/netlolca/NetlOlca.py +++ b/netlolca/NetlOlca.py @@ -8,10 +8,12 @@ ############################################################################## import json import logging +from logging.handlers import RotatingFileHandler import os import re import shutil import sys +import copy import yaml import olca_ipc as ipc @@ -28,7 +30,7 @@ IPC server) or indirectly (via an exported JSON-LD zip file). Last Edited: - 2025-01-21 + 2026-03-30 Examples -------- @@ -86,12 +88,16 @@ __all__ = [ "FUEL_CATS", "NetlOlca", + "check_for_docker", + "check_output_dir", "get_as_yaml", "get_dict_number", + "get_logger", "make_actor_yaml", "pretty_print_dict", "print_progress", "read_yaml", + "rollover_logger", "writeout", ] @@ -663,7 +669,7 @@ def disconnect(self): self.logger.info("Disconnecting client from server") self.client = None - def find_parameter(self, p_name, as_dict=False): + def find_parameter(self, p_name, p_list=None, as_dict=False): """Return a list of parameter objects (or dictionaries) matching a given name. @@ -671,6 +677,9 @@ def find_parameter(self, p_name, as_dict=False): ---------- p_name : str Parameter name + p_list : list, optional + A list of Parameter (or Ref) objects to speed up search. + Defaults to none. as_dict : bool, optional Whether return objects should be dictionaries, by default False @@ -690,9 +699,13 @@ def find_parameter(self, p_name, as_dict=False): raise ValueError("Parameter names must be a string!") p_name = p_name.lower() + # Get parameter list if not provided. + if p_list is None: + p_list = self.list_parameters() + # Initialize return list of potential matches. p_matches = [] - for p_obj in self.list_parameters(): + for p_obj in p_list: par_name = p_obj.name.lower() if par_name == p_name: if as_dict: @@ -738,9 +751,11 @@ def find_parameter_process(self, name=None, uuid=None): raise TypeError("UUID must be a string!") proc_list = [] + all_params = self.list_parameters() + for pid in self.get_spec_ids(o.Process): - p_obj = self.query(o.Process, pid) - for par_obj in p_obj.parameters: + param_list = self.find_process_parameters(pid, all_params) + for par_obj in param_list: # Prioritize UUID over parameter name: if uuid and par_obj.id == uuid: proc_list.append(pid) @@ -751,7 +766,7 @@ def find_parameter_process(self, name=None, uuid=None): self.logger.warning("Parameter not found in a process!") return proc_list - def find_process_parameters(self, uuid): + def find_process_parameters(self, uuid, parameters=None): """Return parameter objects associated with a process. Includes parameters found in the parameters table and referenced @@ -764,6 +779,9 @@ def find_process_parameters(self, uuid): ---------- uuid : str A Process universally unique identifier. + parameters : list, optional + A list of Parameter objects to speed up searches. + Defaults to none, which triggers querying all parameters. Returns ------- @@ -785,13 +803,19 @@ def find_process_parameters(self, uuid): try: self.logger.info( "Received non-process UUID, trying to get process ID") - return self.find_process_parameters(self.get_process_id(uuid)) + return self.find_process_parameters( + self.get_process_id(uuid), parameters + ) except: raise ValueError("UUID must be for a process!") # Initialize the return object param_list = [] + # Pre-compiled list of all parameters. + if parameters is None: + parameters = self.list_parameters() + # Process-level self.logger.info("Searching process-level parameters") p_obj = self.query(o.Process, uuid) @@ -803,7 +827,9 @@ def find_process_parameters(self, uuid): if not par_obj.is_input_parameter and par_obj.formula: # Search the formula for global parameters. # NOTE: process-level params are already captured above. - f_params = self.formula_extractor(par_obj.formula, []) + f_params = self.formula_extractor( + par_obj.formula, [], parameters + ) param_list += f_params # Exchange table referenced (global level) @@ -813,19 +839,20 @@ def find_process_parameters(self, uuid): if ex_obj.amount_formula: # Again, only get global params, because process-level is # accounted for. - f_params = self.formula_extractor(ex_obj.amount_formula, []) + f_params = self.formula_extractor( + ex_obj.amount_formula, [], parameters + ) param_list += f_params if len(param_list) == 0: self.logger.warning("No parameters found for process!") else: - # Remove duplicates + # Remove duplicates. NOTE all parameters should have unique UUID. r_list = [] tmp_list = [] for param in param_list: - my_tuple = (param.id, param.parameter_scope.name) - if my_tuple not in tmp_list: - tmp_list.append(my_tuple) + if param.id not in tmp_list: + tmp_list.append(param.id) r_list.append(param) param_list = r_list @@ -884,7 +911,7 @@ def flow_is_tracked(self, uuid): self.logger.warning("Failed to find flow, '%s'" % uuid) return r_str - def formula_extractor(self, f_str, p_list=[]): + def formula_extractor(self, f_str, p_list=[], master_list=None): """Extract global parameters from a formula. The process includes the following steps to recursively search @@ -903,6 +930,8 @@ def formula_extractor(self, f_str, p_list=[]): exchange tables (e.g., amount formula). p_list : list, optional A list of parameters used for recursive searching, by default [] + master_list : list, optional + A list of all parameters to search. Returns ------- @@ -917,9 +946,6 @@ def formula_extractor(self, f_str, p_list=[]): are letters, numbers, and underscores that does not start or end with an underscore. """ - # Essentially, 'GLOBAL_SCOPE' str - g_scope = o.ParameterScope.GLOBAL_SCOPE.name - # Use regular expression to match variable names # (based on the notion that variables are letters, # numbers, and underscores that does not start or end @@ -933,22 +959,25 @@ def formula_extractor(self, f_str, p_list=[]): # entirely of numbers (i.e., coefficients or constants in equations). q_except = re.compile("[0-9]+") - # Search the formula for parameters & filter out exceptions. + # Search the formula for parameters & filter out exceptions and + # if-statements. f_params = re.findall(q, f_str.lower()) f_params = [ x for x in f_params if (not re.match(q_except, x)) and ( - x != "if") + x != "if") and (x != "iff") ] for f_param in f_params: - s_results = self.find_parameter(f_param) + s_results = self.find_parameter(f_param, p_list=master_list) for sr in s_results: # Add all referenced global parameters - if sr.parameter_scope.name == g_scope: + if sr.parameter_scope == o.ParameterScope.GLOBAL_SCOPE: p_list.append(sr) # Recursively search dependent global parameters: if not sr.is_input_parameter and sr.formula: - p_list = self.formula_extractor(sr.formula, p_list) + p_list = self.formula_extractor( + sr.formula, p_list, master_list + ) return p_list def get_actor_yaml(self, f_dir=None): @@ -1362,7 +1391,7 @@ def get_flow_by_exchange(self, uuid, ex_id): self.logger.info("Failed to find exchange ID, %d" % ex_id) return None - def get_flows(self, uuid=None, inputs=True, outputs=True): + def get_flows(self, uuid=None, inputs=True, outputs=True, as_dict=True): """Return dictionary of flow data associated with a process's exchanges. If no UUID is provided (or a UUID of a product system), then the @@ -1376,21 +1405,39 @@ def get_flows(self, uuid=None, inputs=True, outputs=True): If input flows are requested, by default true outputs : bool, optional If output flows are requested, by default true + as_dict : bool, optional + Whether to return exchange table as a dictionary (e.g., for + easy use with pandas data frames). Returns ------- - dict - A dictionary with 'name', 'amount', 'unit', 'category', 'uuid', - 'tracked', 'description', 'provider', and 'dq' (data quality) + dict, list + If ``as_dict`` is true, the return object is a dictionary with + 'name', 'amount', 'unit', 'category', 'uuid', 'tracked', + 'description', 'provider', and 'dq' (data quality) fields formatted ready for conversion to a pandas data frame. + If ``as_dict`` is false, the return object is a list of reference + objects to flows. Notes ----- The data quality index is a string (e.g., '(1;3;2;5;1)'). + For exchanges that have parameterized amounts, use the 'amountFormula' + column (if ``as_dict`` is true). + + Examples + -------- + >>> n = NetlOlca() + >>> n.connect() + >>> n.read() + >>> uid = '71b559b7-ac85-498d-80e3-70faa5d9936d' + >>> pd.DataFrame(n.get_flows(uid, False, True, True)) """ + # Initialize the empty dictionary for UP template data frame r_dict = { 'name': [], 'amount': [], + 'amountFormula': [], 'unit': [], 'category': [], 'uuid': [], @@ -1399,13 +1446,21 @@ def get_flows(self, uuid=None, inputs=True, outputs=True): 'provider': [], 'dq': [], } + + # Initialize empty flow list + f_list = [] + + # Gets either the process or reference process of a product system. uuid = self.get_process_id(uuid) p = self.query(o.Process, uuid) + if p is not None: for e_obj in p.exchanges: + # Extract metadata for data frame. e_tracked = self.flow_is_tracked(e_obj.flow.id) e_name = "%s" % e_obj.flow.name e_amount = e_obj.amount + e_form = e_obj.amount_formula e_unit = "%s" % e_obj.unit.name e_cat = "%s" % e_obj.flow.category e_uid = "%s" % e_obj.flow.id @@ -1413,28 +1468,42 @@ def get_flows(self, uuid=None, inputs=True, outputs=True): e_prov = "%s" % e_obj.to_dict().get( "defaultProvider", {}).get("@id", "") e_dq = "%s" % e_obj.to_dict().get('dq_entry', '') + if inputs and e_obj.is_input: - r_dict['name'].append(e_name) - r_dict['amount'].append(e_amount) - r_dict['unit'].append(e_unit) - r_dict['category'].append(e_cat) - r_dict['uuid'].append(e_uid) - r_dict['tracked'].append(e_tracked) - r_dict['description'].append(e_des) - r_dict['provider'].append(e_prov) - r_dict['dq'].append(e_dq) + if as_dict: + r_dict['name'].append(e_name) + r_dict['amount'].append(e_amount) + r_dict['amountFormula'].append(e_form) + r_dict['unit'].append(e_unit) + r_dict['category'].append(e_cat) + r_dict['uuid'].append(e_uid) + r_dict['tracked'].append(e_tracked) + r_dict['description'].append(e_des) + r_dict['provider'].append(e_prov) + r_dict['dq'].append(e_dq) + else: + # Add flow reference object + f_list.append(e_obj.flow) if outputs and not e_obj.is_input: - r_dict['name'].append(e_name) - r_dict['amount'].append(e_amount) - r_dict['unit'].append(e_unit) - r_dict['category'].append(e_cat) - r_dict['uuid'].append(e_uid) - r_dict['tracked'].append(e_tracked) - r_dict['description'].append(e_des) - r_dict['provider'].append(e_prov) - r_dict['dq'].append(e_dq) + if as_dict: + r_dict['name'].append(e_name) + r_dict['amount'].append(e_amount) + r_dict['amountFormula'].append(e_form) + r_dict['unit'].append(e_unit) + r_dict['category'].append(e_cat) + r_dict['uuid'].append(e_uid) + r_dict['tracked'].append(e_tracked) + r_dict['description'].append(e_des) + r_dict['provider'].append(e_prov) + r_dict['dq'].append(e_dq) + else: + f_list.append(e_obj.flow) - return r_dict + # The two return objects depending on user preference. + if as_dict: + return r_dict + else: + return f_list def get_from_file(self, d_class, d_uuid): """Return the schema object for a given UUID of a given data type @@ -1673,6 +1742,448 @@ def get_process_sources(self, uuid): return s_list + def get_process_actors(self, uuid): + """ + Return a list of actors for a given process. + + Parameters + ---------- + uuid : str + The UUID of the process. + + Returns + ------- + list + A list of reference objects to the actors of the given process. + """ + a_list = [] + if uuid in self.get_spec_ids(o.Process): + obj = self.query(o.Process, uuid) + if obj.process_documentation.data_documentor: + a_list.append(obj.process_documentation.data_documentor) + + if obj.process_documentation.data_generator and ( + obj.process_documentation.data_generator not in a_list): + a_list.append(obj.process_documentation.data_generator) + + if obj.process_documentation.data_set_owner and ( + obj.process_documentation.data_set_owner not in a_list): + a_list.append(obj.process_documentation.data_set_owner) + + return a_list + + def get_process_dq_system(self, uuid): + """ + Return a list of DQ systems for a given process. + + Parameters + ---------- + uuid : str + The UUID of the process. + + Returns + ------- + list + A list of DQ system objects used in the given process. + """ + dq_system = [] + if uuid in self.get_spec_ids(o.Process): + obj = self.query(o.Process, uuid) + + # Process schema + if obj.dq_system: + dq_system.append(obj.dq_system) + + # Flow schema + if obj.exchange_dq_system and ( + obj.exchange_dq_system not in dq_system): + dq_system.append(obj.exchange_dq_system) + + return dq_system + + def get_process_location(self, uuid): + """ + Return a location for a given process. + + Parameters + ---------- + uuid : str + The UUID of the process. + + Returns + ------- + olca_schema.schema.Location + A Location class object for the given process. + """ + location = None + if uuid in self.get_spec_ids(o.Process): + obj = self.query(o.Process, uuid) + location = obj.location + return location + + def get_process_parameters(self, uuid): + """ + Return a list of parameters objects for a given process. + + Parameters + ---------- + uuid : str + The UUID of the process. + + Returns + ------- + list + A list of parameter objects for the given process. + + Notes + ----- + Does not include global parameters that may show up in process + parameter or exchange amount formulas. For that, use + :func:`find_process_parameters`. + """ + param_list = [] + if uuid in self.get_spec_ids(o.Process): + obj = self.query(o.Process, uuid) + param_list = obj.parameters + return param_list + + def get_process_flow_properties(self, uuid): + """ + Return a list of flow properties for a given process. + + Parameters + ---------- + uuid : str + The UUID of the process. + + Returns + ------- + list + A list of flow property objects for the given process. + """ + fp_list = [] + if uuid in self.get_spec_ids(o.Process): + obj = self.query(o.Process, uuid) + for exch in obj.exchanges: + if exch.flow_property and exch.flow_property.id not in [x.id for x in fp_list]: + fp_list.append(exch.flow_property) + return fp_list + + def get_default_providers(self, uuid_list, all_prov=True): + """ + Return a list of default providers for a given process. + + This function goes through all exchanges of a process, identifies the + default providers, and returns a list of default provider objects. + Optionally, the function can go through the exchanges of the default providers, + identifies their default providers, and appends them to the list if + they are not already in the list. Then this process is repeated until all + default providers are found. + + Parameters + ---------- + uuid_list : list of strings + A list of UUIDs of the processes. + all_prov : bool, optional + Whether to search for all default providers in the supply chain of the + given process. + True: Get all default providers in the supply chain of the given process. + False: Get only the default providers in the exchange table of the given process. + + Returns + ------- + list + A list of process UUIDs of the default providers for the given process. + """ + provider_list = [] + seen = set() + to_be_checked = uuid_list + spec_ids = set(self.get_spec_ids(o.Process)) + + while to_be_checked: + uuid_being_checked = to_be_checked.pop() + + # Skip Process UUIDs that were already scanned. + if uuid_being_checked in seen: + continue + seen.add(uuid_being_checked) + + # Error handle bad requests + if uuid_being_checked not in spec_ids: + continue + + obj = self.query(o.Process, uuid_being_checked) + for exch in obj.exchanges: + if not exch.default_provider: + continue + + # Check the default provider UUID; add to provider list + # and mark for review (i.e., add to seen set) + dp = exch.default_provider.id + if dp not in provider_list: + provider_list.append(dp) + if all_prov and dp not in seen: + to_be_checked.append(dp) + + return provider_list + + def get_full_dd_root_entities_dict(self, uuid_list, add_objs=True, all_prov = True): + """ + This method takes a list of uuids for select processes and returns a + dictionary of root entities that are associated with these processes. + + The main application of this method is to get the root entities + dictionary of a potential derivative database. + + The returned dictionary consists of 17 keys for the different root + entities: 1- Actors, 2- Currency, 3- DQ system, 4- EPD, 5- Flow, + 6- Flow property, 7- Impact categories, 8- Impact methods, 9- Location, + 10- Parameter, 11- Process, 12- Product system, 13- Project, + 14- Result, 15- Social indicators, 16- Source, 17- UnitGroup. + + Each root entity key contains a dictionary with the original fields of + the root entity: 1- name, 2- display, 3- info, 4- class, 5- yaml, + 6-ids, 7- objs. + + + Notes + ----- + Below are the following assumptions made by this method. + + 1. The fields that will be updated based on the target processes are: + Actors, DQ System, Flow, Flow property, Process, Location, + Parameter, Source. + 2. This method retains the original fields of the source database for + the root entities: Currency, EPD, Impact categories, Impact methods, + and UnitGroup. + 3. This method resets the following root entities data: + Product system, Project, Result, Social indicators. + + Parameters + ---------- + uuid_list : list + A list of uuids for select processes. + add_obs : bool, optional + Whether to include entity objects in the return dictionary. + If false, will only return UUIDs. + all_prov : bool, optional + Whether to include all default providers in the supply chain of the + given processes. + True: Get all default providers in the supply chain of the given processes. + False: Get only the default providers in the exchange table of the given processes. + + Returns + ------- + dict + A dictionary of root entities that are associated with the target + processes. + + Examples + -------- + >>> uuid_list = ["123e4567-e89b-12d3-a456-426614174000"] + >>> dd_root_entities_dict = self.get_full_dd_root_entities_dict(ddb_uuids, add_objs=True) + >>> print(dd_root_entities_dict) + + """ + # Get root entities dictionary of the full database. + logging.info("Getting root entities dictionary of the source database.") + full_dict = copy.deepcopy(self._spec_map) + all_uuids = self.get_spec_ids(o.Process) + # Get full list of Process UUIDs in derivative database. + # This includes processes that are providers to the processes in + # `uuid_list`; remove duplicates (e.g., from similar providers). + logging.info("Getting Process UUIDs from the derivative database including default providers across the entire supply chain.") + uuid_list_copy = copy.deepcopy(uuid_list) + ddb_uuids = (self.get_default_providers(uuid_list, all_prov)) + ddb_uuids.extend(uuid_list_copy) + ddb_uuids = list(set(ddb_uuids)) + n_extra_processes = len(ddb_uuids) - len(uuid_list_copy) + logging.info(f"The derivative database includes {n_extra_processes} additional processes that are default providers to the selected processes or are included in the selected processes.") + + # Create new field to store objs for each root entity. + if add_objs: + logging.info( + "Creating new field to store objects for each root entity." + ) + for i in self._spec_map.keys(): + full_dict[i]["objs"] = [] + + # Reset the 'ids' fields for process-specific root entities + # and unwanted entities (i.e., Product system, project, result, and + # social indicators). + entities = [ + o.Actor, + o.DQSystem, + o.Flow, + o.FlowProperty, + o.Parameter, + o.Process, + o.Source, + o.ProductSystem, + o.Project, + o.Result, + o.SocialIndicator, + ] + for name in entities: + logging.info("Resetting 'ids' field for %s." % name) + i = get_dict_number(self._spec_map, name, 'class') + full_dict[i]["ids"] = [] + + # Get all parameters from the full database. + logging.info("Getting all parameters objects from the full database.") + all_parameters = self.list_parameters() + + # Add back entities. + for uuid in ddb_uuids: + if uuid not in all_uuids: + logging.info("Process UUID %s is not in the full database. Skipping." % uuid) + continue + logging.info("Processing root entities for process UUID: %s" % uuid) + # Actors #1 + try: + actors = self.get_process_actors(uuid) + except Exception as e: + logging.error(f"Error getting actors for process {uuid}: {e}") + continue + if actors: + i = get_dict_number(self._spec_map, o.Actor, "class") + for actor in actors: + if actor.id not in full_dict[i]["ids"]: + full_dict[i]["ids"].append(actor.id) + if add_objs: + full_dict[i]["objs"].append( + self.query(o.Actor, actor.id) + ) + + # Currency #2 - add all + if add_objs: + i = get_dict_number(self._spec_map, o.Currency, "class") + for _id in full_dict[i]["ids"]: + full_dict[i]["objs"].append(self.query(o.Currency, _id)) + + # DQ system #3 + try: + dq_systems = self.get_process_dq_system(uuid) + except Exception as e: + logging.error(f"Error getting DQ systems for process {uuid}: {e}") + continue + if dq_systems: + i = get_dict_number(self._spec_map, o.DQSystem, "class") + for dq_system in dq_systems: + if dq_system.id not in full_dict[i]["ids"]: + full_dict[i]["ids"].append(dq_system.id) + if add_objs: + full_dict[i]["objs"].append( + self.query(o.DQSystem, dq_system.id) + ) + + # EPD #4 - keep all + if add_objs: + i = get_dict_number(self._spec_map, o.Epd, "class") + for _id in full_dict[i]["ids"]: + full_dict[i]["objs"].append(self.query(o.Epd, _id)) + + # Flow #5 + try: + flows = self.get_flows(uuid, True, True, False) + except Exception as e: + logging.error(f"Error getting flows for process {uuid}: {e}") + continue + if flows: + i = get_dict_number(self._spec_map, o.Flow, "class") + for flow in flows: + if flow.id not in full_dict[i]["ids"]: + full_dict[i]["ids"].append(flow.id) + if add_objs: + full_dict[i]["objs"].append( + self.query(o.Flow, flow.id) + ) + + # Flow property #6 + try: + flow_properties = self.get_process_flow_properties(uuid) + except Exception as e: + logging.error(f"Error getting flow properties for process {uuid}: {e}") + continue + if flow_properties: + i = get_dict_number(self._spec_map, o.FlowProperty, "class") + for flow_property in flow_properties: + if flow_property.id not in full_dict[i]["ids"]: + full_dict[i]["ids"].append(flow_property.id) + if add_objs: + full_dict[i]["objs"].append( + self.query(o.FlowProperty, flow_property.id) + ) + + # Parameter #10 + try: + parameters = self.find_process_parameters(uuid, all_parameters) + except Exception as e: + logging.error(f"Error getting parameters for process {uuid}: {e}") + continue + if parameters: + i = get_dict_number(self._spec_map, o.Parameter, "class") + for parameter in parameters: + if parameter.id not in full_dict[i]["ids"]: + full_dict[i]["ids"].append(parameter.id) + if add_objs: + full_dict[i]["objs"].append( + self.query(o.Parameter, parameter.id) + ) + + # Process #11 - adjusted based on final list of uuids + i = get_dict_number(self._spec_map, o.Process, "class") + if uuid not in full_dict[i]["ids"]: + full_dict[i]["ids"].append(uuid) + if add_objs: + full_dict[i]["objs"].append(self.query(o.Process, uuid)) + + # Source #16 + try: + sources = self.get_process_sources(uuid) + except Exception as e: + logging.error(f"Error getting sources for process {uuid}: {e}") + continue + if sources: + i = get_dict_number(self._spec_map, o.Source, "class") + for source in sources: + if source.id not in full_dict[i]["ids"]: + full_dict[i]["ids"].append(source.id) + if add_objs: + full_dict[i]["objs"].append( + self.query(o.Source, source.id) + ) + + # Impact categories #7 - keep all + if add_objs: + logging.info("Collecting reference objects for impact categories.") + i = get_dict_number(self._spec_map, o.ImpactCategory, "class") + for _id in full_dict[i]["ids"]: + full_dict[i]["objs"].append(self.query(o.ImpactCategory, _id)) + + # Impact methods #8 - keep all + if add_objs: + logging.info("Collecting reference objects for impact methods.") + i = get_dict_number(self._spec_map, o.ImpactMethod, "class") + for _id in full_dict[i]["ids"]: + full_dict[i]["objs"].append(self.query(o.ImpactMethod, _id)) + + # Location #9 - keep all + if add_objs: + logging.info("Collecting reference objects for locations.") + i = get_dict_number(self._spec_map, o.Location, "class") + for _id in full_dict[i]["ids"]: + full_dict[i]["objs"].append(self.query(o.Location, _id)) + + # UnitGroup #17 - keep all + if add_objs: + logging.info("Collecting reference objects for unit groups.") + i = get_dict_number(self._spec_map, o.UnitGroup, "class") + for _id in full_dict[i]["ids"]: + full_dict[i]["objs"].append(self.query(o.UnitGroup, _id)) + + return full_dict + def get_sources(self, uuid, all_p=False): # Sources are bound to a process's documentation attribute. # There are two routes: the reference process or all processes. @@ -1732,6 +2243,37 @@ def get_providers(self): self.logger.warning("No connection!") return r_list + def get_quant_ref_flow(self, p_uuid): + """Helper function to return a process's quantitative reference flow. + + Parameters + ---------- + p_uuid : str + A process (or product system) universally unique identifier. + + Returns + ------- + olca-schema.Exchange + An exchange object associated as quantitative reference flow, + or NoneType (if method fails to find process or no flows are + labeled as quantitative reference). + """ + # Ensure UUID is from a process. + _uuid = self.get_process_id(p_uuid) + _found = False + if _uuid: + p = self.query(o.Process, _uuid) + for ex in p.exchanges: + # A process has at most 1 quantitative reference. + if ex.is_quantitative_reference: + return ex + if not _found: + self.logger.warning( + "Failed to find quantitative reference flow " + "for process, '%s'" % p.name + ) + return None + def get_reviewer(self, uuid=None): """Return tuple of current unit process reviewer. @@ -2091,16 +2633,39 @@ def list_parameters(self, inc_process=True, input_only=False, as_dict=False): - # IN PROGRESS - # To get a master list of global and/or process parameters filterable - # by input parameter status. + """Return a list of parameters based on scope and classification. + + Parameters + ---------- + inc_global : bool, optional + Whether to include global scope parameters, by default True + inc_process : bool, optional + Whether to include process scope parameters, by default True + input_only : bool, optional + Whether to include only input parameter types. If false, both + input and dependent parameters are returned. + Default is false. + as_dict : bool, optional + Whether to return parameters as dictionary objects; otherwise + returns Parameter class objects, by default False + + Returns + ------- + list + A list of parameter objects or dictionaries (if ``as_dict`` is true) + """ + # Initialize empty return list param_list = [] - # Global scope - if inc_global: - for par_id in self.get_spec_ids(o.Parameter): - par_obj = self.query(o.Parameter, par_id) + # HOTFIX: all parameters (including process and global) are found + # in the Parameter root entity list. + # HOTFIX: use :func:`get_all` to speed up search [26.03.24; TWD] + for par_obj in self.get_all(o.Parameter): + # Global scope + if inc_global and ( + par_obj.parameter_scope == o.ParameterScope.GLOBAL_SCOPE): if input_only and par_obj.is_input_parameter: + # Input global parameters if as_dict: param_list.append(par_obj.to_dict()) else: @@ -2111,21 +2676,20 @@ def list_parameters(self, else: param_list.append(par_obj) - # Process scope - if inc_process: - for p_id in self.get_spec_ids(o.Process): - p_obj = self.query(o.Process, p_id) - for par_obj in p_obj.parameters: - if input_only and par_obj.is_input_parameter: - if as_dict: - param_list.append(par_obj.to_dict()) - else: - param_list.append(par_obj) - elif not input_only: - if as_dict: - param_list.append(par_obj.to_dict()) - else: - param_list.append(par_obj) + # Process scope + if inc_process and ( + par_obj.parameter_scope == o.ParameterScope.PROCESS_SCOPE): + if input_only and par_obj.is_input_parameter: + # Input process parameters + if as_dict: + param_list.append(par_obj.to_dict()) + else: + param_list.append(par_obj) + elif not input_only: + if as_dict: + param_list.append(par_obj.to_dict()) + else: + param_list.append(par_obj) return param_list @@ -2704,6 +3268,40 @@ def check_for_docker(): return False +def check_output_dir(out_dir): + """Helper method to ensure a directory exists. + + If a given directory does not exist, this method attempts to create it. + + Parameters + ---------- + out_dir : str + A path to a directory. + + Returns + ------- + bool + Whether the directory exists. + """ + if not os.path.isdir(out_dir): + try: + # Start with super mkdir + os.makedirs(out_dir) + except: + logging.warning("Failed to create folder %s!" % out_dir) + try: + # Revert to simple mkdir + os.mkdir(out_dir) + except: + logging.error("Could not create folder, %s" % out_dir) + else: + logging.info("Created %s" % out_dir) + else: + logging.info("Created %s" % out_dir) + + return os.path.isdir(out_dir) + + def get_as_yaml(my_dict, rm_at=False): """Return a dictionary as a YAML string. @@ -2766,6 +3364,91 @@ def get_dict_number(dobj, val, key): return None +def get_logger(stream=True, rfh=True, str_lv='INFO', rfh_lv='DEBUG'): + """A helper function for creating or retrieving a root logger with + only one instance of stream and/or rotating file handler. + + Parameters + ---------- + stream : bool, optional + Whether to create a stream handler, by default True + rfh : bool, optional + Whether to create a rotating file handler, by default True + str_lv : str, optional + Stream handler logging level, by default 'INFO' + rfh_lv : str, optional + Rotating file handler logging level, by default 'DEBUG' + + Returns + ------- + logging.Logger + The root logger. + + Notes + ----- + This could be expanded to allow the user to set the logging + level of a specific handler (or just overwrite all levels). + """ + # Create/retrieve the root logger + log = logging.getLogger() + log.setLevel("DEBUG") + + # Congrats, you now have a hidden folder in your user's home directory. + output_dir = os.path.join( + os.path.expanduser("~"), + ".netlolca" + ) + + # Define log format + rec_format = ( + "%(asctime)s.%(msecs)03d:%(levelname)s:%(module)s:%(funcName)s:" + "%(message)s") + formatter = logging.Formatter(rec_format, datefmt='%Y-%m-%d %H:%M:%S') + + # Check what handlers the root logger already has + has_stream = False + has_rfh = False + for h in log.handlers: + if h.name == 'nolca_stream': + has_stream = True + h.setLevel(str_lv) # handle level change requests + elif h.name == 'nolca_rfh': + has_rfh = True + h.setLevel(rfh_lv) + + # Create stream handler for info messages + if stream and not has_stream: + s_handler = logging.StreamHandler() + s_handler.setLevel(str_lv) + s_handler.setFormatter(formatter) + s_handler.set_name('nolca_stream') + s_handler.stream = sys.stdout # won't show pink in Jupyter notebook + log.addHandler(s_handler) + + # Create file handler for debug messages + if rfh and not has_rfh: + log_filename = "nolca.log" + check_output_dir(output_dir) + log_path = os.path.join(output_dir, log_filename) + f_handler = RotatingFileHandler( + log_path, backupCount=9, encoding='utf-8') + f_handler.setLevel(rfh_lv) + f_handler.setFormatter(formatter) + f_handler.set_name('nolca_rfh') + log.addHandler(f_handler) + if os.path.isfile(log_path): + rollover_logger(log) + + # Clean-up step; all unnamed log handlers get elevated to critical. + # NOTE: alternatively, we could drop all unnamed loggers. + num_handlers = len(log.handlers) + for i in range(num_handlers): + if not log.handlers[i].name: + log.handlers[i].setLevel("CRITICAL") + + return log + + def make_actor_yaml(yaml_name, yaml_dir="data"): """Create a YAML file with actor template. @@ -2927,6 +3610,24 @@ def read_yaml(fpath): return c_dict +def rollover_logger(logger): + """Helper method to rollover a named Rotating File Handler. + + Parameters + ---------- + logger : logging.Logger + A logger (e.g., root logger) + """ + try: + idx = [x.name for x in logger.handlers].index("nolca_rfh") + except ValueError: + idx = -1 + + # Rollover the rotating file handler (if found) + if idx != -1: + logger.handlers[idx].doRollover() + + def writeout(fpath, dstring): """Write new/overwrite existing file with given data string. @@ -2953,3 +3654,29 @@ def writeout(fpath, dstring): pass else: OUT.close() + + +############################################################################### +# SANDBOX +############################################################################### +if __name__ == "__main__": + import re + from netlolca.NetlOlca import NetlOlca + + # Initialize and connect to IPC service + n = NetlOlca() + n.connect() + n.read() + + # Start by searching the database for specific processes + search_q = re.compile("^.*scenario (\\d{1,2})$") + search_r = n.match_process_names(search_q) + + # Scrub the UUIDs from search results. + p_uuids = [p[0] for p in search_r] + + # Get parameters for a given process. Fast. Found 92. + p_params = n.get_process_parameters(p_uuids[0]) + + # Get all parameters reference within a process. Slower. Found 93. + p_all_params = n.find_process_parameters(p_uuids[0])