From cbbf656e41a4d4d99be153b2d34abb7e9a699e7b Mon Sep 17 00:00:00 2001 From: Francis Hanna Date: Mon, 12 Jan 2026 12:18:59 -0500 Subject: [PATCH 01/21] DD functions added The additions consist of several functions to generate a root entities dictionary for a derivative database given a list of target process uuids. The added functions are: 1- get_process_actors 2- get_process_dq_system 3- get_process_location 4- get_process_parameters 5- get_process_flows 6- get_process_flow_properties 7- get_default_providers 8- get_full_dd_root_entities_dict 9- OPTIONAL: get_dd_root_entities_dict The last function doesnt return root entity objects - but is fast to run and might be useful in derivative database validation processes. --- netlolca/NetlOlca.py | 473 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 473 insertions(+) diff --git a/netlolca/NetlOlca.py b/netlolca/NetlOlca.py index 012239c..efd55ce 100644 --- a/netlolca/NetlOlca.py +++ b/netlolca/NetlOlca.py @@ -12,6 +12,7 @@ import re import shutil import sys +import copy import yaml import olca_ipc as ipc @@ -1673,6 +1674,468 @@ def get_process_sources(self, uuid): return s_list + def get_process_actors(self, uuid): + """ + Return a list of actors for a given process. + + Parameters + ---------- + uuid : str + The UUID of the process. + + Returns + ------- + list + A list of actor objects for the given process. + """ + a_list = [] + if uuid in self.get_spec_ids(o.Process): + obj = self.query(o.Process, uuid) + if obj.process_documentation.data_documentor: + a_list.append(obj.process_documentation.data_documentor) + elif obj.process_documentation.data_generator and obj.process_documentation.data_generator not in a_list: + a_list.append(obj.process_documentation.data_generator) + elif obj.process_documentation.data_set_owner and obj.process_documentation.data_set_owner not in a_list: + a_list.append(obj.process_documentation.data_set_owner) + + return a_list + + def get_process_dq_system(self, uuid): + """ + Return a list of DQ systems for a given process. + + Parameters + ---------- + uuid : str + The UUID of the process. + + Returns + ------- + list + A list of DQ system objects used in the given process. + """ + dq_system = [] + if uuid in self.get_spec_ids(o.Process): + obj = self.query(o.Process, uuid) + + if obj.dq_system and obj.dq_system not in dq_system: + dq_system.append(obj.dq_system) + + if obj.exchange_dq_system and obj.exchange_dq_system not in dq_system: + dq_system.append(obj.exchange_dq_system) + + return dq_system + + def get_process_location(self, uuid): + """ + Return a location for a given process. + + Parameters + ---------- + uuid : str + The UUID of the process. + + Returns + ------- + olca_schema.schema.Location + A Location class object for the given process. + """ + location = None + if uuid in self.get_spec_ids(o.Process): + obj = self.query(o.Process, uuid) + location = obj.location + return location + + def get_process_parameters(self, uuid): + """ + Return a list of parameters uuids for a given process. + + Parameters + ---------- + uuid : str + The UUID of the process. + + Returns + ------- + list + A list of parameter objects for the given process. + """ + param_list = [] + if uuid in self.get_spec_ids(o.Process): + obj = self.query(o.Process, uuid) + param_list.append(obj.parameters) + return param_list + + def get_process_flows(self, uuid): + """ + Return a list of flows for a given process. + + Parameters + ---------- + uuid : str + The UUID of the process. + + Returns + ------- + list + A list of flow objects for the given process. + """ + flow_list = [] + if uuid in self.get_spec_ids(o.Process): + obj = self.query(o.Process, uuid) + for exch in obj.exchanges: + if exch.flow: + flow_list.append(exch.flow) + return flow_list + + def get_process_flow_properties(self, uuid): + """ + Return a list of flow properties for a given process. + + Parameters + ---------- + uuid : str + The UUID of the process. + + Returns + ------- + list + A list of flow property objects for the given process. + """ + fp_list = [] + if uuid in self.get_spec_ids(o.Process): + obj = self.query(o.Process, uuid) + for exch in obj.exchanges: + if exch.flow_property: + fp_list.append(exch.flow_property) + return fp_list + + def get_default_providers(self, uuid): + """ + Return a list of default providers for a given process. + + This function goes through all exchanges of a process, + identifies the default providers, and returns a list of default provider objects. + Then the function goe through the exchanges of the default providers, + identifies their default providers, and appends them to the list if they are not already in the list. + + + This process is repeated until all default providers are found. + + Parameters + ---------- + uuid : str + The UUID of the process. + + Returns + ------- + list + A list of default provider objects for the given process. + """ + provider_list = [] + seen = set() + to_be_checked = [uuid] + spec_ids = set(self.get_spec_ids(o.Process)) + + while to_be_checked: + uuid_being_checked = to_be_checked.pop() + + if uuid_being_checked in seen: + continue + seen.add(uuid_being_checked) + + if uuid_being_checked not in spec_ids: + continue + + obj = self.query(o.Process, uuid_being_checked) + + for exch in obj.exchanges: + if not exch.default_provider: + continue + + dp = exch.default_provider.id + + if dp not in provider_list: + provider_list.append(dp) + + if dp not in seen: + to_be_checked.append(dp) + + return provider_list + + def get_full_dd_root_entities_dict (self, uuid_list): + """ + This method takes a list of uuids for select processes + and returns a dictionary of root entities that are associated with these processes. + + The main application of this method is to get the root entities dictionary of a + potential derivative database. + + The returned dictionary consists of 17 keys for the different root entities: + 1- Actors, 2- Currency, 3- DQ system, 4- EPD, 5- Flow, 6- Flow property, 7- Impact + categories, 8- Impact methods, 9- Location, 10- Parameter, 11- Process, 12- Product + system, 13- Project, 14- Result, 15- Social indicators, 16- Source, 17- UnitGroup. + + Each root entity key contains a dictionary with the original fields of the root entity: + 1- name, 2- display, 3- info, 4- class, 5- yaml, 6-ids, 7- objs + + Assumptions: + ------------ + 1- The fields that will be updated based on the target processes are: + Actors, DQ System, Flow, Flow property, Process, Location, Parameter, Source + + 2- This method retains the original fields of the source database for the following + root entities: + Currency, EPD, Impact categories, Impact methods, UnitGroup + + 3- This method resets the following root entities data: + Product system, Project, Result, Social indicators + + Parameters + ---------- + uuid_list : list + A list of uuids for select processes. + + Returns + ------- + dict + A dictionary of root entities that are associated with the target processes. + + Example + ------- + >>> uuid_list = ["123e4567-e89b-12d3-a456-426614174000"] + >>> dd_root_entities_dict = self.get_dd_root_entities_dict(uuid_list) + >>> print(dd_root_entities_dict) + + """ + # get the root entities dictionary of the full database and filter it out + full_dict = copy.deepcopy(self._spec_map) + + # get full list of uuids in derivative database + ddb_uuids = [] + for uuid in uuid_list: + ddb_uuids.append(uuid) + ddb_uuids += self.get_default_providers(uuid) + uuid_list = list(set(ddb_uuids)) # remove duplicates + + # create new field to store objs for each root entity + n = list(range(1,18)) + for i in n: + full_dict [i]["objs"] = [] + + # reset the 'ids' field for process-specific root entities + n = [1,3,5,6,9,10,11,16] + for i in n: + full_dict [i]["ids"] = [] + + for uuid in uuid_list: + # actors #1 + actors = self.get_process_actors(uuid) + if actors: + for actor in actors: + if actor.id not in full_dict [1]["ids"]: + full_dict [1]["ids"].append(actor.id) + full_dict [1]["objs"].append(self.query(o.Actor, actor.id)) + # Currency #2 - kept as is + for id in full_dict [2]["ids"]: + if self.query(o.Currency, id) not in full_dict [2]["objs"]: + full_dict [2]["objs"].append(self.query(o.Currency, id)) + # DQ system #3 + dq_systems = self.get_process_dq_system(uuid) + if dq_systems: + for dq_system in dq_systems: + if dq_system.id not in full_dict [3]["ids"]: + full_dict [3]["ids"].append(dq_system.id) + full_dict [3]["objs"].append(self.query(o.DQSystem, dq_system.id)) + # EPD #4 - kept as is + for id in full_dict [4]["ids"]: + if self.query(o.EPD, id) not in full_dict [4]["objs"]: + full_dict [4]["objs"].append(self.query(o.EPD, id)) + # Flow #5 + flows = self.get_process_flows(uuid) + if flows: + for flow in flows: + if flow.id not in full_dict [5]["ids"]: + full_dict [5]["ids"].append(flow.id) + full_dict [5]["objs"].append(self.query(o.Flow, flow.id)) + # Flow property #6 + flow_properties = self.get_process_flow_properties(uuid) + if flow_properties: + for flow_property in flow_properties: + if flow_property.id not in full_dict [6]["ids"]: + full_dict [6]["ids"].append(flow_property.id) + full_dict [6]["objs"].append(self.query(o.FlowProperty, flow_property.id)) + # Location #9 + locations = self.get_process_location(uuid) + if locations: + if isinstance(locations, list): + for location in locations: + if location.id not in full_dict [9]["ids"]: + full_dict [9]["ids"].append(location.id) + full_dict [9]["objs"].append(self.query(o.Location, location.id)) + else: + if locations.id not in full_dict [9]["ids"]: + full_dict [9]["ids"].append(locations.id) + full_dict [9]["objs"].append(self.query(o.Location, locations.id)) + # Parameter #10 + parameters = self.get_process_parameters(uuid) + if parameters: + for parameter in parameters[0]: + if parameter.id not in full_dict [10]["ids"]: + full_dict [10]["ids"].append(parameter.id) + full_dict [10]["objs"].append(self.query(o.Parameter, parameter.id)) + # Process #11 - adjusted based on final list of uuids + if uuid not in full_dict [11]["ids"]: + full_dict [11]["ids"].append(uuid) + full_dict [11]["objs"].append(self.query(o.Process, uuid)) + # Source #16 + sources = self.get_process_sources(uuid) + if sources: + for source in sources: + if source.id not in full_dict [16]["ids"]: + full_dict [16]["ids"].append(source.id) + full_dict [16]["objs"].append(self.query(o.Source, source.id)) + + # Impact categories #7 - kept as is + print ("resolving impact categories") + for id in full_dict [7]["ids"]: + full_dict [7]["objs"].append(self.query(o.ImpactCategory, id)) + # Impact methods #8 - kept as it + print ("resolving impact methods") + for id in full_dict [8]["ids"]: + full_dict [8]["objs"].append(self.query(o.ImpactMethod, id)) + # remove Product system, project, result, and social indicators ids + full_dict [12]["ids"] = [] + full_dict [13]["ids"] = [] + full_dict [14]["ids"] = [] + full_dict [15]["ids"] = [] + # UnitGroup #17 - kept as is + for id in full_dict [17]["ids"]: + full_dict [17]["objs"].append(self.query(o.UnitGroup, id)) + + return full_dict + + # NOTE FOR TYLER - I MADE THIS ONE FIRST - IT DOESNT RETURN OBJS + # BUT I THOUGHT I'D KEEP IT - MIGHT BE USEFUL IN VALIDATION + # I ASSUME ITS FASTER THAN THE FIRST ONE + def get_dd_root_entities_dict (self, uuid_list): + """ + This method takes a list of uuids for select processes + and returns a dictionary of root entities that are associated with these processes. + + The main application of this method is to get the root entities dictionary of a + potential derivative database. + + The returned dictionary consists of 17 keys for the different root entities: + 1- Actors, 2- Currency, 3- DQ system, 4- EPD, 5- Flow, 6- Flow property, 7- Impact + categories, 8- Impact methods, 9- Location, 10- Parameter, 11- Process, 12- Product + system, 13- Project, 14- Result, 15- Social indicators, 16- Source, 17- UnitGroup. + + Each root entity key contains a dictionary with the original fields of the root entity: + 1- name, 2- display, 3- info, 4- class, 5- yaml, 6-ids + + Assumptions: + 1- The fields that will be updated based on the target processes are: + Actors, DQ System, Flow, Flow property, Process, Location, Parameter, Source + + 2- This method retains the original fields of the source database for the following + root entities: + Currency, EPD, Impact categories, Impact methods, UnitGroup + + 3- This method resets the following root entities data: + Product system, Project, Result, Social indicators + + Parameters + ---------- + uuid_list : list + A list of uuids for select processes. + + Returns + ------- + dict + A dictionary of root entities that are associated with the target processes. + + Example + ------- + >>> uuid_list = ["123e4567-e89b-12d3-a456-426614174000"] + >>> dd_root_entities_dict = self.get_dd_root_entities_dict(uuid_list) + >>> print(dd_root_entities_dict) + + """ + # get the root entities dictionary of the full database and filter it out + full_dict = copy.deepcopy(self._spec_map) + + # get full list of uuids in derivative database incl. target processes and their + # default providers + ddb_uuids = [] + for uuid in uuid_list: + ddb_uuids.append(uuid) + ddb_uuids += self.get_default_providers(uuid) + uuid_list = list(set(ddb_uuids)) # remove duplicates + + # reset the 'ids' field for process-specific root entities + n = [1,3,5,6,9,10,11,16] + for i in n: + full_dict [i]["ids"] = [] + + for uuid in uuid_list: + # actors #1 + actors = self.get_process_actors(uuid) + if actors: + for actor in actors: + if actor.id not in full_dict [1]["ids"]: + full_dict [1]["ids"].append(self.query(o.Actor, actor.id)) + # DQ system #3 + dq_systems = self.get_process_dq_system(uuid) + if dq_systems: + for dq_system in dq_systems: + if dq_system.id not in full_dict [3]["ids"]: + full_dict [3]["ids"].append(self.query(o.DQSystem, dq_system.id)) + # Flow #5 + flows = self.get_process_flows(uuid) + if flows: + for flow in flows: + if flow.id not in full_dict [5]["ids"]: + full_dict [5]["ids"].append(self.query(o.Flow, flow.id)) + # Flow property #6 + flow_properties = self.get_process_flow_properties(uuid) + if flow_properties: + for flow_property in flow_properties: + if flow_property.id not in full_dict [6]["ids"]: + full_dict [6]["ids"].append(self.query(o.FlowProperty, flow_property.id)) + # Location #9 + locations = self.get_process_location(uuid) + if locations: + if isinstance(locations, list): + for location in locations: + if location.id not in full_dict [9]["ids"]: + full_dict [9]["ids"].append(self.query(o.Location, location.id)) + else: + if locations.id not in full_dict [9]["ids"]: + full_dict [9]["ids"].append(self.query(o.Location, locations.id)) + # Parameter #10 + parameters = self.get_process_parameters(uuid) + if parameters: + for parameter in parameters[0]: + if parameter.id not in full_dict [10]["ids"]: + full_dict [10]["ids"].append(self.query(o.Parameter, parameter.id)) + # Processes + full_dict [11]["ids"] = [id for id in full_dict [11]["ids"] if id in uuid_list] + # Source #16 + if self.get_process_sources(uuid): + for source in self.get_process_sources(uuid): + if source.id not in full_dict [16]["ids"]: + full_dict [16]["ids"].append(source.id) + # remove Product system, project, and result ids + full_dict [12]["ids"] = [] + full_dict [13]["ids"] = [] + full_dict [14]["ids"] = [] + full_dict [15]["ids"] = [] + + return full_dict + + + def get_sources(self, uuid, all_p=False): # Sources are bound to a process's documentation attribute. # There are two routes: the reference process or all processes. @@ -2953,3 +3416,13 @@ def writeout(fpath, dstring): pass else: OUT.close() + +# +# SANDBOX +# + +if __name__ == "__main__": + netl = NetlOlca() + netl.connect() + netl.read() + From 8df24ba26604438830ee33a73eda510d4a9132ff Mon Sep 17 00:00:00 2001 From: dt-woods Date: Mon, 12 Jan 2026 17:46:24 -0500 Subject: [PATCH 02/21] pep formatting; merge two get dd dict methods --- netlolca/NetlOlca.py | 450 ++++++++++++++++++++----------------------- 1 file changed, 214 insertions(+), 236 deletions(-) diff --git a/netlolca/NetlOlca.py b/netlolca/NetlOlca.py index efd55ce..577618f 100644 --- a/netlolca/NetlOlca.py +++ b/netlolca/NetlOlca.py @@ -29,7 +29,7 @@ IPC server) or indirectly (via an exported JSON-LD zip file). Last Edited: - 2025-01-21 + 2026-01-12 Examples -------- @@ -1693,11 +1693,13 @@ def get_process_actors(self, uuid): obj = self.query(o.Process, uuid) if obj.process_documentation.data_documentor: a_list.append(obj.process_documentation.data_documentor) - elif obj.process_documentation.data_generator and obj.process_documentation.data_generator not in a_list: + elif obj.process_documentation.data_generator and ( + obj.process_documentation.data_generator not in a_list): a_list.append(obj.process_documentation.data_generator) - elif obj.process_documentation.data_set_owner and obj.process_documentation.data_set_owner not in a_list: + elif obj.process_documentation.data_set_owner and ( + obj.process_documentation.data_set_owner not in a_list): a_list.append(obj.process_documentation.data_set_owner) - + return a_list def get_process_dq_system(self, uuid): @@ -1717,11 +1719,14 @@ def get_process_dq_system(self, uuid): dq_system = [] if uuid in self.get_spec_ids(o.Process): obj = self.query(o.Process, uuid) - + + # Process schema if obj.dq_system and obj.dq_system not in dq_system: dq_system.append(obj.dq_system) - if obj.exchange_dq_system and obj.exchange_dq_system not in dq_system: + # Flow schema + if obj.exchange_dq_system and ( + obj.exchange_dq_system not in dq_system): dq_system.append(obj.exchange_dq_system) return dq_system @@ -1776,9 +1781,13 @@ def get_process_flows(self, uuid): The UUID of the process. Returns - ------- + ------- list A list of flow objects for the given process. + + Notes + ----- + See also, :func:`get_flows` for an alternative return object. """ flow_list = [] if uuid in self.get_spec_ids(o.Process): @@ -1814,11 +1823,11 @@ def get_default_providers(self, uuid): """ Return a list of default providers for a given process. - This function goes through all exchanges of a process, - identifies the default providers, and returns a list of default provider objects. + This function goes through all exchanges of a process, identifies the + default providers, and returns a list of default provider objects. Then the function goe through the exchanges of the default providers, - identifies their default providers, and appends them to the list if they are not already in the list. - + identifies their default providers, and appends them to the list if + they are not already in the list. This process is repeated until all default providers are found. @@ -1834,308 +1843,262 @@ def get_default_providers(self, uuid): """ provider_list = [] seen = set() - to_be_checked = [uuid] + to_be_checked = [uuid,] spec_ids = set(self.get_spec_ids(o.Process)) - + while to_be_checked: uuid_being_checked = to_be_checked.pop() + # Skip Process UUIDs that were already scanned. if uuid_being_checked in seen: continue seen.add(uuid_being_checked) + # Error handle bad requests if uuid_being_checked not in spec_ids: continue - - obj = self.query(o.Process, uuid_being_checked) + obj = self.query(o.Process, uuid_being_checked) for exch in obj.exchanges: if not exch.default_provider: continue - + + # Check the default provider UUID; add to provider list + # and mark for review (i.e., add to seen set) dp = exch.default_provider.id - if dp not in provider_list: provider_list.append(dp) - if dp not in seen: to_be_checked.append(dp) return provider_list - def get_full_dd_root_entities_dict (self, uuid_list): + def get_full_dd_root_entities_dict(self, uuid_list, add_objs=True): """ - This method takes a list of uuids for select processes - and returns a dictionary of root entities that are associated with these processes. + This method takes a list of uuids for select processes and returns a + dictionary of root entities that are associated with these processes. - The main application of this method is to get the root entities dictionary of a - potential derivative database. + The main application of this method is to get the root entities + dictionary of a potential derivative database. - The returned dictionary consists of 17 keys for the different root entities: - 1- Actors, 2- Currency, 3- DQ system, 4- EPD, 5- Flow, 6- Flow property, 7- Impact - categories, 8- Impact methods, 9- Location, 10- Parameter, 11- Process, 12- Product - system, 13- Project, 14- Result, 15- Social indicators, 16- Source, 17- UnitGroup. + The returned dictionary consists of 17 keys for the different root + entities: 1- Actors, 2- Currency, 3- DQ system, 4- EPD, 5- Flow, + 6- Flow property, 7- Impact categories, 8- Impact methods, 9- Location, + 10- Parameter, 11- Process, 12- Product system, 13- Project, + 14- Result, 15- Social indicators, 16- Source, 17- UnitGroup. - Each root entity key contains a dictionary with the original fields of the root entity: - 1- name, 2- display, 3- info, 4- class, 5- yaml, 6-ids, 7- objs + Each root entity key contains a dictionary with the original fields of + the root entity: 1- name, 2- display, 3- info, 4- class, 5- yaml, + 6-ids, 7- objs. - Assumptions: - ------------ - 1- The fields that will be updated based on the target processes are: - Actors, DQ System, Flow, Flow property, Process, Location, Parameter, Source - 2- This method retains the original fields of the source database for the following - root entities: - Currency, EPD, Impact categories, Impact methods, UnitGroup + Notes + ----- + Below are the following assumptions made by this method. - 3- This method resets the following root entities data: - Product system, Project, Result, Social indicators + 1. The fields that will be updated based on the target processes are: + Actors, DQ System, Flow, Flow property, Process, Location, + Parameter, Source. + 2. This method retains the original fields of the source database for + the root entities: Currency, EPD, Impact categories, Impact methods, + and UnitGroup. + 3. This method resets the following root entities data: + Product system, Project, Result, Social indicators. Parameters ---------- uuid_list : list A list of uuids for select processes. + add_obs : bool, optional + Whether to include entity objects in the return dictionary. + If false, will only return UUIDs. Returns ------- dict - A dictionary of root entities that are associated with the target processes. + A dictionary of root entities that are associated with the target + processes. - Example - ------- + Examples + -------- >>> uuid_list = ["123e4567-e89b-12d3-a456-426614174000"] >>> dd_root_entities_dict = self.get_dd_root_entities_dict(uuid_list) >>> print(dd_root_entities_dict) """ - # get the root entities dictionary of the full database and filter it out + # Get root entities dictionary of the full database. full_dict = copy.deepcopy(self._spec_map) - # get full list of uuids in derivative database + # Get full list of Process UUIDs in derivative database. + # This includes processes that are providers to the processes in + # `uuid_list`; remove duplicates (e.g., from similar providers). ddb_uuids = [] for uuid in uuid_list: ddb_uuids.append(uuid) ddb_uuids += self.get_default_providers(uuid) - uuid_list = list(set(ddb_uuids)) # remove duplicates - - # create new field to store objs for each root entity - n = list(range(1,18)) - for i in n: - full_dict [i]["objs"] = [] - - # reset the 'ids' field for process-specific root entities - n = [1,3,5,6,9,10,11,16] - for i in n: - full_dict [i]["ids"] = [] + uuid_list = list(set(ddb_uuids)) + + # Create new field to store objs for each root entity. + if add_objs: + for i in self._spec_map.keys(): + full_dict[i]["objs"] = [] + + # Reset the 'ids' fields for process-specific root entities + # and unwanted entities (i.e., Product system, project, result, and + # social indicators). + entities = [ + o.Actor, + o.DQSystem, + o.Flow, + o.FlowProperty, + o.Location, + o.Parameter, + o.Process, + o.Source, + o.ProductSystem, + o.Project, + o.Result, + o.SocialIndicator, + ] + for name in entities: + i = get_dict_number(self._spec_map, name, 'class') + full_dict[i]["ids"] = [] + # Add back entities. for uuid in uuid_list: - # actors #1 + # Actors #1 actors = self.get_process_actors(uuid) if actors: - for actor in actors: - if actor.id not in full_dict [1]["ids"]: - full_dict [1]["ids"].append(actor.id) - full_dict [1]["objs"].append(self.query(o.Actor, actor.id)) - # Currency #2 - kept as is - for id in full_dict [2]["ids"]: - if self.query(o.Currency, id) not in full_dict [2]["objs"]: - full_dict [2]["objs"].append(self.query(o.Currency, id)) + i = get_dict_number(self._spec_map, o.Actor, "class") + for actor in actors: + if actor.id not in full_dict[i]["ids"]: + full_dict[i]["ids"].append(actor.id) + if add_objs: + full_dict[i]["objs"].append( + self.query(o.Actor, actor.id) + ) + + # Currency #2 - add all + if add_objs: + i = get_dict_number(self._spec_map, o.Currency, "class") + for _id in full_dict[i]["ids"]: + full_dict[i]["objs"].append(self.query(o.Currency, _id)) + # DQ system #3 dq_systems = self.get_process_dq_system(uuid) if dq_systems: + i = get_dict_number(self._spec_map, o.DQSystem, "class") for dq_system in dq_systems: - if dq_system.id not in full_dict [3]["ids"]: - full_dict [3]["ids"].append(dq_system.id) - full_dict [3]["objs"].append(self.query(o.DQSystem, dq_system.id)) - # EPD #4 - kept as is - for id in full_dict [4]["ids"]: - if self.query(o.EPD, id) not in full_dict [4]["objs"]: - full_dict [4]["objs"].append(self.query(o.EPD, id)) + if dq_system.id not in full_dict[i]["ids"]: + full_dict[i]["ids"].append(dq_system.id) + if add_objs: + full_dict[i]["objs"].append( + self.query(o.DQSystem, dq_system.id) + ) + + # EPD #4 - keep all + if add_objs: + i = get_dict_number(self._spec_map, o.EPD, "class") + for _id in full_dict[i]["ids"]: + full_dict[i]["objs"].append(self.query(o.EPD, _id)) + # Flow #5 flows = self.get_process_flows(uuid) if flows: + i = get_dict_number(self._spec_map, o.Flow, "class") for flow in flows: - if flow.id not in full_dict [5]["ids"]: - full_dict [5]["ids"].append(flow.id) - full_dict [5]["objs"].append(self.query(o.Flow, flow.id)) + if flow.id not in full_dict[i]["ids"]: + full_dict[i]["ids"].append(flow.id) + if add_objs: + full_dict[i]["objs"].append( + self.query(o.Flow, flow.id) + ) + # Flow property #6 flow_properties = self.get_process_flow_properties(uuid) if flow_properties: + i = get_dict_number(self._spec_map, o.FlowProperty, "class") for flow_property in flow_properties: - if flow_property.id not in full_dict [6]["ids"]: - full_dict [6]["ids"].append(flow_property.id) - full_dict [6]["objs"].append(self.query(o.FlowProperty, flow_property.id)) + if flow_property.id not in full_dict[i]["ids"]: + full_dict[i]["ids"].append(flow_property.id) + if add_objs: + full_dict[i]["objs"].append( + self.query(o.FlowProperty, flow_property.id) + ) + # Location #9 locations = self.get_process_location(uuid) if locations: + i = get_dict_number(self._spec_map, o.Location, "class") if isinstance(locations, list): for location in locations: - if location.id not in full_dict [9]["ids"]: - full_dict [9]["ids"].append(location.id) - full_dict [9]["objs"].append(self.query(o.Location, location.id)) + if location.id not in full_dict[i]["ids"]: + full_dict[i]["ids"].append(location.id) + if add_objs: + full_dict[i]["objs"].append( + self.query(o.Location, location.id) + ) else: - if locations.id not in full_dict [9]["ids"]: - full_dict [9]["ids"].append(locations.id) - full_dict [9]["objs"].append(self.query(o.Location, locations.id)) + if locations.id not in full_dict[i]["ids"]: + full_dict[i]["ids"].append(locations.id) + if add_objs: + full_dict[i]["objs"].append( + self.query(o.Location, locations.id) + ) + # Parameter #10 parameters = self.get_process_parameters(uuid) if parameters: + i = get_dict_number(self._spec_map, o.Parameter, "class") for parameter in parameters[0]: - if parameter.id not in full_dict [10]["ids"]: - full_dict [10]["ids"].append(parameter.id) - full_dict [10]["objs"].append(self.query(o.Parameter, parameter.id)) + if parameter.id not in full_dict[i]["ids"]: + full_dict[i]["ids"].append(parameter.id) + if add_objs: + full_dict[i]["objs"].append( + self.query(o.Parameter, parameter.id) + ) + # Process #11 - adjusted based on final list of uuids - if uuid not in full_dict [11]["ids"]: - full_dict [11]["ids"].append(uuid) - full_dict [11]["objs"].append(self.query(o.Process, uuid)) + i = get_dict_number(self._spec_map, o.Process, "class") + if uuid not in full_dict[i]["ids"]: + full_dict[i]["ids"].append(uuid) + if add_objs: + full_dict[i]["objs"].append(self.query(o.Process, uuid)) + # Source #16 sources = self.get_process_sources(uuid) if sources: + i = get_dict_number(self._spec_map, o.Source, "class") for source in sources: - if source.id not in full_dict [16]["ids"]: - full_dict [16]["ids"].append(source.id) - full_dict [16]["objs"].append(self.query(o.Source, source.id)) - - # Impact categories #7 - kept as is - print ("resolving impact categories") - for id in full_dict [7]["ids"]: - full_dict [7]["objs"].append(self.query(o.ImpactCategory, id)) - # Impact methods #8 - kept as it - print ("resolving impact methods") - for id in full_dict [8]["ids"]: - full_dict [8]["objs"].append(self.query(o.ImpactMethod, id)) - # remove Product system, project, result, and social indicators ids - full_dict [12]["ids"] = [] - full_dict [13]["ids"] = [] - full_dict [14]["ids"] = [] - full_dict [15]["ids"] = [] - # UnitGroup #17 - kept as is - for id in full_dict [17]["ids"]: - full_dict [17]["objs"].append(self.query(o.UnitGroup, id)) - - return full_dict - - # NOTE FOR TYLER - I MADE THIS ONE FIRST - IT DOESNT RETURN OBJS - # BUT I THOUGHT I'D KEEP IT - MIGHT BE USEFUL IN VALIDATION - # I ASSUME ITS FASTER THAN THE FIRST ONE - def get_dd_root_entities_dict (self, uuid_list): - """ - This method takes a list of uuids for select processes - and returns a dictionary of root entities that are associated with these processes. - - The main application of this method is to get the root entities dictionary of a - potential derivative database. - - The returned dictionary consists of 17 keys for the different root entities: - 1- Actors, 2- Currency, 3- DQ system, 4- EPD, 5- Flow, 6- Flow property, 7- Impact - categories, 8- Impact methods, 9- Location, 10- Parameter, 11- Process, 12- Product - system, 13- Project, 14- Result, 15- Social indicators, 16- Source, 17- UnitGroup. - - Each root entity key contains a dictionary with the original fields of the root entity: - 1- name, 2- display, 3- info, 4- class, 5- yaml, 6-ids - - Assumptions: - 1- The fields that will be updated based on the target processes are: - Actors, DQ System, Flow, Flow property, Process, Location, Parameter, Source - - 2- This method retains the original fields of the source database for the following - root entities: - Currency, EPD, Impact categories, Impact methods, UnitGroup - - 3- This method resets the following root entities data: - Product system, Project, Result, Social indicators - - Parameters - ---------- - uuid_list : list - A list of uuids for select processes. - - Returns - ------- - dict - A dictionary of root entities that are associated with the target processes. - - Example - ------- - >>> uuid_list = ["123e4567-e89b-12d3-a456-426614174000"] - >>> dd_root_entities_dict = self.get_dd_root_entities_dict(uuid_list) - >>> print(dd_root_entities_dict) - - """ - # get the root entities dictionary of the full database and filter it out - full_dict = copy.deepcopy(self._spec_map) - - # get full list of uuids in derivative database incl. target processes and their - # default providers - ddb_uuids = [] - for uuid in uuid_list: - ddb_uuids.append(uuid) - ddb_uuids += self.get_default_providers(uuid) - uuid_list = list(set(ddb_uuids)) # remove duplicates - - # reset the 'ids' field for process-specific root entities - n = [1,3,5,6,9,10,11,16] - for i in n: - full_dict [i]["ids"] = [] + if source.id not in full_dict[i]["ids"]: + full_dict[i]["ids"].append(source.id) + if add_objs: + full_dict[i]["objs"].append( + self.query(o.Source, source.id) + ) + + # Impact categories #7 - keep all + if add_objs: + print ("resolving impact categories") + i = get_dict_number(self._spec_map, o.ImpactCategory, "class") + for _id in full_dict[i]["ids"]: + full_dict[i]["objs"].append(self.query(o.ImpactCategory, _id)) + + # Impact methods #8 - keep all + if add_objs: + print ("resolving impact methods") + i = get_dict_number(self._spec_map, o.ImpactMethod, "class") + for _id in full_dict[i]["ids"]: + full_dict[i]["objs"].append(self.query(o.ImpactMethod, _id)) + + # UnitGroup #17 - keep all + if add_objs: + i = get_dict_number(self._spec_map, o.UnitGroup, "class") + for _id in full_dict[i]["ids"]: + full_dict[i]["objs"].append(self.query(o.UnitGroup, _id)) - for uuid in uuid_list: - # actors #1 - actors = self.get_process_actors(uuid) - if actors: - for actor in actors: - if actor.id not in full_dict [1]["ids"]: - full_dict [1]["ids"].append(self.query(o.Actor, actor.id)) - # DQ system #3 - dq_systems = self.get_process_dq_system(uuid) - if dq_systems: - for dq_system in dq_systems: - if dq_system.id not in full_dict [3]["ids"]: - full_dict [3]["ids"].append(self.query(o.DQSystem, dq_system.id)) - # Flow #5 - flows = self.get_process_flows(uuid) - if flows: - for flow in flows: - if flow.id not in full_dict [5]["ids"]: - full_dict [5]["ids"].append(self.query(o.Flow, flow.id)) - # Flow property #6 - flow_properties = self.get_process_flow_properties(uuid) - if flow_properties: - for flow_property in flow_properties: - if flow_property.id not in full_dict [6]["ids"]: - full_dict [6]["ids"].append(self.query(o.FlowProperty, flow_property.id)) - # Location #9 - locations = self.get_process_location(uuid) - if locations: - if isinstance(locations, list): - for location in locations: - if location.id not in full_dict [9]["ids"]: - full_dict [9]["ids"].append(self.query(o.Location, location.id)) - else: - if locations.id not in full_dict [9]["ids"]: - full_dict [9]["ids"].append(self.query(o.Location, locations.id)) - # Parameter #10 - parameters = self.get_process_parameters(uuid) - if parameters: - for parameter in parameters[0]: - if parameter.id not in full_dict [10]["ids"]: - full_dict [10]["ids"].append(self.query(o.Parameter, parameter.id)) - # Processes - full_dict [11]["ids"] = [id for id in full_dict [11]["ids"] if id in uuid_list] - # Source #16 - if self.get_process_sources(uuid): - for source in self.get_process_sources(uuid): - if source.id not in full_dict [16]["ids"]: - full_dict [16]["ids"].append(source.id) - # remove Product system, project, and result ids - full_dict [12]["ids"] = [] - full_dict [13]["ids"] = [] - full_dict [14]["ids"] = [] - full_dict [15]["ids"] = [] - return full_dict - - def get_sources(self, uuid, all_p=False): # Sources are bound to a process's documentation attribute. # There are two routes: the reference process or all processes. @@ -3417,12 +3380,27 @@ def writeout(fpath, dstring): else: OUT.close() -# -# SANDBOX -# +############################################################################### +# SANDBOX +############################################################################### if __name__ == "__main__": - netl = NetlOlca() - netl.connect() - netl.read() - + # Initialize and connect to IPC service + n = NetlOlca() + n.connect() + n.read() + + # Start by searching the database for "Corn grain" processes + search_q = re.compile("^Corn grain,.*$") + search_r = n.match_process_names(search_q) + + # For derivate data methods, create a list of example processes. + # (i.e., "Corn grain, cultivation" and "Corn grain, harvesting") + p_uuids = [ + p[0] for p in search_r if p[1].endswith( + "cultivation") or p[1].endswith("harvesting") + ] + + d = n.get_full_dd_root_entities_dict(p_uuids, False) + i = get_dict_number(d, o.Process, 'class') + print(len(d[i]['ids'])) # 19 From 6d0b4e5d025a41a2c924d890ed7f3bd739dcbdb8 Mon Sep 17 00:00:00 2001 From: dt-woods Date: Tue, 13 Jan 2026 10:51:17 -0500 Subject: [PATCH 03/21] update get_flows to work for get_process_flows --- netlolca/NetlOlca.py | 112 ++++++++++++++++++++++++++++++------------- 1 file changed, 78 insertions(+), 34 deletions(-) diff --git a/netlolca/NetlOlca.py b/netlolca/NetlOlca.py index 577618f..70832d7 100644 --- a/netlolca/NetlOlca.py +++ b/netlolca/NetlOlca.py @@ -29,7 +29,7 @@ IPC server) or indirectly (via an exported JSON-LD zip file). Last Edited: - 2026-01-12 + 2026-01-13 Examples -------- @@ -1363,7 +1363,7 @@ def get_flow_by_exchange(self, uuid, ex_id): self.logger.info("Failed to find exchange ID, %d" % ex_id) return None - def get_flows(self, uuid=None, inputs=True, outputs=True): + def get_flows(self, uuid=None, inputs=True, outputs=True, as_dict=True): """Return dictionary of flow data associated with a process's exchanges. If no UUID is provided (or a UUID of a product system), then the @@ -1377,18 +1377,33 @@ def get_flows(self, uuid=None, inputs=True, outputs=True): If input flows are requested, by default true outputs : bool, optional If output flows are requested, by default true + as_dict : bool, optional + Whether to return exchange table as a dictionary (e.g., for + easy use with pandas data frames). Returns ------- - dict - A dictionary with 'name', 'amount', 'unit', 'category', 'uuid', - 'tracked', 'description', 'provider', and 'dq' (data quality) + dict, list + If ``as_dict`` is true, the return object is a dictionary with + 'name', 'amount', 'unit', 'category', 'uuid', 'tracked', + 'description', 'provider', and 'dq' (data quality) fields formatted ready for conversion to a pandas data frame. + If ``as_dict`` is false, the return object is a list of reference + objects to flows. Notes ----- The data quality index is a string (e.g., '(1;3;2;5;1)'). + + Examples + -------- + >>> n = NetlOlca() + >>> n.connect() + >>> n.read() + >>> uid = '71b559b7-ac85-498d-80e3-70faa5d9936d' + >>> pd.DataFrame(n.get_flows(uid, False, True, True)) """ + # Initialize the empty dictionary for UP template data frame r_dict = { 'name': [], 'amount': [], @@ -1400,10 +1415,17 @@ def get_flows(self, uuid=None, inputs=True, outputs=True): 'provider': [], 'dq': [], } + + # Initialize empty flow list + f_list = [] + + # Gets either the process or reference process of a product system. uuid = self.get_process_id(uuid) p = self.query(o.Process, uuid) + if p is not None: for e_obj in p.exchanges: + # Extract metadata for data frame. e_tracked = self.flow_is_tracked(e_obj.flow.id) e_name = "%s" % e_obj.flow.name e_amount = e_obj.amount @@ -1414,28 +1436,40 @@ def get_flows(self, uuid=None, inputs=True, outputs=True): e_prov = "%s" % e_obj.to_dict().get( "defaultProvider", {}).get("@id", "") e_dq = "%s" % e_obj.to_dict().get('dq_entry', '') + if inputs and e_obj.is_input: - r_dict['name'].append(e_name) - r_dict['amount'].append(e_amount) - r_dict['unit'].append(e_unit) - r_dict['category'].append(e_cat) - r_dict['uuid'].append(e_uid) - r_dict['tracked'].append(e_tracked) - r_dict['description'].append(e_des) - r_dict['provider'].append(e_prov) - r_dict['dq'].append(e_dq) + if as_dict: + r_dict['name'].append(e_name) + r_dict['amount'].append(e_amount) + r_dict['unit'].append(e_unit) + r_dict['category'].append(e_cat) + r_dict['uuid'].append(e_uid) + r_dict['tracked'].append(e_tracked) + r_dict['description'].append(e_des) + r_dict['provider'].append(e_prov) + r_dict['dq'].append(e_dq) + else: + # Add flow reference object + f_list.append(e_obj.flow) if outputs and not e_obj.is_input: - r_dict['name'].append(e_name) - r_dict['amount'].append(e_amount) - r_dict['unit'].append(e_unit) - r_dict['category'].append(e_cat) - r_dict['uuid'].append(e_uid) - r_dict['tracked'].append(e_tracked) - r_dict['description'].append(e_des) - r_dict['provider'].append(e_prov) - r_dict['dq'].append(e_dq) + if as_dict: + r_dict['name'].append(e_name) + r_dict['amount'].append(e_amount) + r_dict['unit'].append(e_unit) + r_dict['category'].append(e_cat) + r_dict['uuid'].append(e_uid) + r_dict['tracked'].append(e_tracked) + r_dict['description'].append(e_des) + r_dict['provider'].append(e_prov) + r_dict['dq'].append(e_dq) + else: + f_list.append(e_obj.flow) - return r_dict + # The two return objects depending on user preference. + if as_dict: + return r_dict + else: + return f_list def get_from_file(self, d_class, d_uuid): """Return the schema object for a given UUID of a given data type @@ -1787,15 +1821,9 @@ def get_process_flows(self, uuid): Notes ----- - See also, :func:`get_flows` for an alternative return object. + Uses :func:`get_flows` with list return object. """ - flow_list = [] - if uuid in self.get_spec_ids(o.Process): - obj = self.query(o.Process, uuid) - for exch in obj.exchanges: - if exch.flow: - flow_list.append(exch.flow) - return flow_list + return self.get_flows(uuid, True, True, False) def get_process_flow_properties(self, uuid): """ @@ -3385,15 +3413,31 @@ def writeout(fpath, dstring): # SANDBOX ############################################################################### if __name__ == "__main__": + import re + from netlolca.NetlOlca import NetlOlca + # Initialize and connect to IPC service n = NetlOlca() n.connect() n.read() - # Start by searching the database for "Corn grain" processes - search_q = re.compile("^Corn grain,.*$") + # Start by searching the database for specific processes + search_q = re.compile("^.*scenario 1$") search_r = n.match_process_names(search_q) + # For UP template, just scrape the UUIDs: + p_uuids = [p[0] for p in search_r] + + # Goal is to get input and output flows where the flow amounts are + # parameterized. The basic functions are ``get_input_flows`` and + # ``get_output_flows``, which call on ``get_flows``. + # The issue is that currently the numeric values are returned. + + + # \\\\\\\\\\\\\\\\\\\ + # Derivative Database + # /////////////////// + # For derivate data methods, create a list of example processes. # (i.e., "Corn grain, cultivation" and "Corn grain, harvesting") p_uuids = [ From 439c3707c9fe804beba911b5ced4e91c27af9267 Mon Sep 17 00:00:00 2001 From: dt-woods Date: Tue, 13 Jan 2026 12:31:13 -0500 Subject: [PATCH 04/21] parameter updates sped up searches for process parameter lists and processes that reference a given parameter (now searches exchange table formulas). Added amountFormula to exchange table return dictionary in get_flows, which should be parameter names or equations. Hotfix list_parameters to actually return parameters based on scope and type. --- netlolca/NetlOlca.py | 153 ++++++++++++++++++++++++++++++------------- 1 file changed, 106 insertions(+), 47 deletions(-) diff --git a/netlolca/NetlOlca.py b/netlolca/NetlOlca.py index 70832d7..3732bb3 100644 --- a/netlolca/NetlOlca.py +++ b/netlolca/NetlOlca.py @@ -664,7 +664,7 @@ def disconnect(self): self.logger.info("Disconnecting client from server") self.client = None - def find_parameter(self, p_name, as_dict=False): + def find_parameter(self, p_name, p_list=None, as_dict=False): """Return a list of parameter objects (or dictionaries) matching a given name. @@ -672,6 +672,9 @@ def find_parameter(self, p_name, as_dict=False): ---------- p_name : str Parameter name + p_list : list, optional + A list of Parameter (or Ref) objects to speed up search. + Defaults to none. as_dict : bool, optional Whether return objects should be dictionaries, by default False @@ -691,9 +694,13 @@ def find_parameter(self, p_name, as_dict=False): raise ValueError("Parameter names must be a string!") p_name = p_name.lower() + # Get parameter list if not provided. + if p_list is None: + p_list = self.list_parameters() + # Initialize return list of potential matches. p_matches = [] - for p_obj in self.list_parameters(): + for p_obj in p_list: par_name = p_obj.name.lower() if par_name == p_name: if as_dict: @@ -739,9 +746,11 @@ def find_parameter_process(self, name=None, uuid=None): raise TypeError("UUID must be a string!") proc_list = [] + all_params = self.list_parameters() + for pid in self.get_spec_ids(o.Process): - p_obj = self.query(o.Process, pid) - for par_obj in p_obj.parameters: + param_list = self.find_process_parameters(pid, all_params) + for par_obj in param_list: # Prioritize UUID over parameter name: if uuid and par_obj.id == uuid: proc_list.append(pid) @@ -752,7 +761,7 @@ def find_parameter_process(self, name=None, uuid=None): self.logger.warning("Parameter not found in a process!") return proc_list - def find_process_parameters(self, uuid): + def find_process_parameters(self, uuid, parameters=None): """Return parameter objects associated with a process. Includes parameters found in the parameters table and referenced @@ -765,6 +774,9 @@ def find_process_parameters(self, uuid): ---------- uuid : str A Process universally unique identifier. + parameters : list, optional + A list of Parameter objects to speed up searches. + Defaults to none, which triggers querying all parameters. Returns ------- @@ -786,13 +798,19 @@ def find_process_parameters(self, uuid): try: self.logger.info( "Received non-process UUID, trying to get process ID") - return self.find_process_parameters(self.get_process_id(uuid)) + return self.find_process_parameters( + self.get_process_id(uuid), parameters + ) except: raise ValueError("UUID must be for a process!") # Initialize the return object param_list = [] + # Pre-compiled list of all parameters. + if parameters is None: + parameters = self.list_parameters() + # Process-level self.logger.info("Searching process-level parameters") p_obj = self.query(o.Process, uuid) @@ -804,7 +822,9 @@ def find_process_parameters(self, uuid): if not par_obj.is_input_parameter and par_obj.formula: # Search the formula for global parameters. # NOTE: process-level params are already captured above. - f_params = self.formula_extractor(par_obj.formula, []) + f_params = self.formula_extractor( + par_obj.formula, [], parameters + ) param_list += f_params # Exchange table referenced (global level) @@ -814,19 +834,20 @@ def find_process_parameters(self, uuid): if ex_obj.amount_formula: # Again, only get global params, because process-level is # accounted for. - f_params = self.formula_extractor(ex_obj.amount_formula, []) + f_params = self.formula_extractor( + ex_obj.amount_formula, [], parameters + ) param_list += f_params if len(param_list) == 0: self.logger.warning("No parameters found for process!") else: - # Remove duplicates + # Remove duplicates. NOTE all parameters should have unique UUID. r_list = [] tmp_list = [] for param in param_list: - my_tuple = (param.id, param.parameter_scope.name) - if my_tuple not in tmp_list: - tmp_list.append(my_tuple) + if param.id not in tmp_list: + tmp_list.append(param.id) r_list.append(param) param_list = r_list @@ -885,7 +906,7 @@ def flow_is_tracked(self, uuid): self.logger.warning("Failed to find flow, '%s'" % uuid) return r_str - def formula_extractor(self, f_str, p_list=[]): + def formula_extractor(self, f_str, p_list=[], master_list=None): """Extract global parameters from a formula. The process includes the following steps to recursively search @@ -904,6 +925,8 @@ def formula_extractor(self, f_str, p_list=[]): exchange tables (e.g., amount formula). p_list : list, optional A list of parameters used for recursive searching, by default [] + master_list : list, optional + A list of all parameters to search. Returns ------- @@ -918,9 +941,6 @@ def formula_extractor(self, f_str, p_list=[]): are letters, numbers, and underscores that does not start or end with an underscore. """ - # Essentially, 'GLOBAL_SCOPE' str - g_scope = o.ParameterScope.GLOBAL_SCOPE.name - # Use regular expression to match variable names # (based on the notion that variables are letters, # numbers, and underscores that does not start or end @@ -934,22 +954,25 @@ def formula_extractor(self, f_str, p_list=[]): # entirely of numbers (i.e., coefficients or constants in equations). q_except = re.compile("[0-9]+") - # Search the formula for parameters & filter out exceptions. + # Search the formula for parameters & filter out exceptions and + # if-statements. f_params = re.findall(q, f_str.lower()) f_params = [ x for x in f_params if (not re.match(q_except, x)) and ( - x != "if") + x != "if") and (x != "iff") ] for f_param in f_params: - s_results = self.find_parameter(f_param) + s_results = self.find_parameter(f_param, p_list=master_list) for sr in s_results: # Add all referenced global parameters - if sr.parameter_scope.name == g_scope: + if sr.parameter_scope == o.ParameterScope.GLOBAL_SCOPE: p_list.append(sr) # Recursively search dependent global parameters: if not sr.is_input_parameter and sr.formula: - p_list = self.formula_extractor(sr.formula, p_list) + p_list = self.formula_extractor( + sr.formula, p_list, master_list + ) return p_list def get_actor_yaml(self, f_dir=None): @@ -1394,6 +1417,8 @@ def get_flows(self, uuid=None, inputs=True, outputs=True, as_dict=True): Notes ----- The data quality index is a string (e.g., '(1;3;2;5;1)'). + For exchanges that have parameterized amounts, use the 'amountFormula' + column (if ``as_dict`` is true). Examples -------- @@ -1407,6 +1432,7 @@ def get_flows(self, uuid=None, inputs=True, outputs=True, as_dict=True): r_dict = { 'name': [], 'amount': [], + 'amountFormula': [], 'unit': [], 'category': [], 'uuid': [], @@ -1429,6 +1455,7 @@ def get_flows(self, uuid=None, inputs=True, outputs=True, as_dict=True): e_tracked = self.flow_is_tracked(e_obj.flow.id) e_name = "%s" % e_obj.flow.name e_amount = e_obj.amount + e_form = e_obj.amount_formula e_unit = "%s" % e_obj.unit.name e_cat = "%s" % e_obj.flow.category e_uid = "%s" % e_obj.flow.id @@ -1441,6 +1468,7 @@ def get_flows(self, uuid=None, inputs=True, outputs=True, as_dict=True): if as_dict: r_dict['name'].append(e_name) r_dict['amount'].append(e_amount) + r_dict['amountFormula'].append(e_form) r_dict['unit'].append(e_unit) r_dict['category'].append(e_cat) r_dict['uuid'].append(e_uid) @@ -1455,6 +1483,7 @@ def get_flows(self, uuid=None, inputs=True, outputs=True, as_dict=True): if as_dict: r_dict['name'].append(e_name) r_dict['amount'].append(e_amount) + r_dict['amountFormula'].append(e_form) r_dict['unit'].append(e_unit) r_dict['category'].append(e_cat) r_dict['uuid'].append(e_uid) @@ -2545,16 +2574,40 @@ def list_parameters(self, inc_process=True, input_only=False, as_dict=False): - # IN PROGRESS - # To get a master list of global and/or process parameters filterable - # by input parameter status. + """Return a list of parameters based on scope and classification. + + Parameters + ---------- + inc_global : bool, optional + Whether to include global scope parameters, by default True + inc_process : bool, optional + Whether to include process scope parameters, by default True + input_only : bool, optional + Whether to include only input parameter types. If false, both + input and dependent parameters are returned. + Default is false. + as_dict : bool, optional + Whether to return parameters as dictionary objects; otherwise + returns Parameter class objects, by default False + + Returns + ------- + list + A list of parameter objects or dictionaries (if ``as_dict`` is true) + """ + # Initialize empty return list param_list = [] - # Global scope - if inc_global: - for par_id in self.get_spec_ids(o.Parameter): - par_obj = self.query(o.Parameter, par_id) + # HOTFIX: all parameters (including process and global) are found + # in the Parameter root entity list. + for par_id in self.get_spec_ids(o.Parameter): + par_obj = self.query(o.Parameter, par_id) + + # Global scope + if inc_global and ( + par_obj.parameter_scope == o.ParameterScope.GLOBAL_SCOPE): if input_only and par_obj.is_input_parameter: + # Input global parameters if as_dict: param_list.append(par_obj.to_dict()) else: @@ -2565,21 +2618,20 @@ def list_parameters(self, else: param_list.append(par_obj) - # Process scope - if inc_process: - for p_id in self.get_spec_ids(o.Process): - p_obj = self.query(o.Process, p_id) - for par_obj in p_obj.parameters: - if input_only and par_obj.is_input_parameter: - if as_dict: - param_list.append(par_obj.to_dict()) - else: - param_list.append(par_obj) - elif not input_only: - if as_dict: - param_list.append(par_obj.to_dict()) - else: - param_list.append(par_obj) + # Process scope + if inc_process and ( + par_obj.parameter_scope == o.ParameterScope.PROCESS_SCOPE): + if input_only and par_obj.is_input_parameter: + # Input process parameters + if as_dict: + param_list.append(par_obj.to_dict()) + else: + param_list.append(par_obj) + elif not input_only: + if as_dict: + param_list.append(par_obj.to_dict()) + else: + param_list.append(par_obj) return param_list @@ -3414,6 +3466,7 @@ def writeout(fpath, dstring): ############################################################################### if __name__ == "__main__": import re + import pandas as pd from netlolca.NetlOlca import NetlOlca # Initialize and connect to IPC service @@ -3428,10 +3481,16 @@ def writeout(fpath, dstring): # For UP template, just scrape the UUIDs: p_uuids = [p[0] for p in search_r] - # Goal is to get input and output flows where the flow amounts are - # parameterized. The basic functions are ``get_input_flows`` and - # ``get_output_flows``, which call on ``get_flows``. - # The issue is that currently the numeric values are returned. + # Each output has a formula except for the reference flow. + # One formula is an equation: + # Carbon dioxide = "iff(formic_acid_case = 2;0;CO2)" + # + + # How to speed up process parameter searches? + p_params = n.find_process_parameters(p_uuids[0]) # sped up. + p_procs = n.find_parameter_process( + uuid='9d9afede-d113-4300-8766-9bd1c4ee8e9c') # sped up. + # \\\\\\\\\\\\\\\\\\\ From 8ef9599b00059199410bb619b0c396447d871e79 Mon Sep 17 00:00:00 2001 From: dt-woods Date: Tue, 13 Jan 2026 15:34:28 -0500 Subject: [PATCH 05/21] fix get process parameters --- netlolca/NetlOlca.py | 42 +++++++++++++----------------------------- 1 file changed, 13 insertions(+), 29 deletions(-) diff --git a/netlolca/NetlOlca.py b/netlolca/NetlOlca.py index 3732bb3..7a27376 100644 --- a/netlolca/NetlOlca.py +++ b/netlolca/NetlOlca.py @@ -1827,11 +1827,17 @@ def get_process_parameters(self, uuid): ------- list A list of parameter objects for the given process. + + Notes + ----- + Does not include global parameters that may show up in process + parameter or exchange amount formulas. For that, use + :func:`find_process_parameters`. """ param_list = [] if uuid in self.get_spec_ids(o.Process): obj = self.query(o.Process, uuid) - param_list.append(obj.parameters) + param_list = obj.parameters return param_list def get_process_flows(self, uuid): @@ -3466,7 +3472,6 @@ def writeout(fpath, dstring): ############################################################################### if __name__ == "__main__": import re - import pandas as pd from netlolca.NetlOlca import NetlOlca # Initialize and connect to IPC service @@ -3475,35 +3480,14 @@ def writeout(fpath, dstring): n.read() # Start by searching the database for specific processes - search_q = re.compile("^.*scenario 1$") + search_q = re.compile("^.*scenario (\\d{1,2})$") search_r = n.match_process_names(search_q) - # For UP template, just scrape the UUIDs: + # Scrub the UUIDs from search results. p_uuids = [p[0] for p in search_r] - # Each output has a formula except for the reference flow. - # One formula is an equation: - # Carbon dioxide = "iff(formic_acid_case = 2;0;CO2)" - # - - # How to speed up process parameter searches? - p_params = n.find_process_parameters(p_uuids[0]) # sped up. - p_procs = n.find_parameter_process( - uuid='9d9afede-d113-4300-8766-9bd1c4ee8e9c') # sped up. - - - - # \\\\\\\\\\\\\\\\\\\ - # Derivative Database - # /////////////////// - - # For derivate data methods, create a list of example processes. - # (i.e., "Corn grain, cultivation" and "Corn grain, harvesting") - p_uuids = [ - p[0] for p in search_r if p[1].endswith( - "cultivation") or p[1].endswith("harvesting") - ] + # Get parameters for a given process. Fast. Found 92. + p_params = n.get_process_parameters(p_uuids[0]) - d = n.get_full_dd_root_entities_dict(p_uuids, False) - i = get_dict_number(d, o.Process, 'class') - print(len(d[i]['ids'])) # 19 + # Get all parameters reference within a process. Slower. Found 93. + p_all_params = n.find_process_parameters(p_uuids[0]) From 250f7dbe3d74c51c18f10c3f6d6eba3e70d93b17 Mon Sep 17 00:00:00 2001 From: Francis Hanna Date: Tue, 13 Jan 2026 17:00:32 -0500 Subject: [PATCH 06/21] Addresses TWD Comments until line R1919 --- netlolca/NetlOlca.py | 38 ++++++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/netlolca/NetlOlca.py b/netlolca/NetlOlca.py index 577618f..764300f 100644 --- a/netlolca/NetlOlca.py +++ b/netlolca/NetlOlca.py @@ -1686,17 +1686,19 @@ def get_process_actors(self, uuid): Returns ------- list - A list of actor objects for the given process. + A list of reference objects to the actors of the given process. """ a_list = [] if uuid in self.get_spec_ids(o.Process): obj = self.query(o.Process, uuid) if obj.process_documentation.data_documentor: a_list.append(obj.process_documentation.data_documentor) - elif obj.process_documentation.data_generator and ( + + if obj.process_documentation.data_generator and ( obj.process_documentation.data_generator not in a_list): a_list.append(obj.process_documentation.data_generator) - elif obj.process_documentation.data_set_owner and ( + + if obj.process_documentation.data_set_owner and ( obj.process_documentation.data_set_owner not in a_list): a_list.append(obj.process_documentation.data_set_owner) @@ -1721,7 +1723,7 @@ def get_process_dq_system(self, uuid): obj = self.query(o.Process, uuid) # Process schema - if obj.dq_system and obj.dq_system not in dq_system: + if obj.dq_system: dq_system.append(obj.dq_system) # Flow schema @@ -1753,7 +1755,7 @@ def get_process_location(self, uuid): def get_process_parameters(self, uuid): """ - Return a list of parameters uuids for a given process. + Return a list of parameters objects for a given process. Parameters ---------- @@ -1768,7 +1770,7 @@ def get_process_parameters(self, uuid): param_list = [] if uuid in self.get_spec_ids(o.Process): obj = self.query(o.Process, uuid) - param_list.append(obj.parameters) + param_list += obj.parameters return param_list def get_process_flows(self, uuid): @@ -1783,7 +1785,7 @@ def get_process_flows(self, uuid): Returns ------- list - A list of flow objects for the given process. + A list of reference objects to the flows of the given process. Notes ----- @@ -1815,31 +1817,35 @@ def get_process_flow_properties(self, uuid): if uuid in self.get_spec_ids(o.Process): obj = self.query(o.Process, uuid) for exch in obj.exchanges: - if exch.flow_property: + if exch.flow_property and exch.flow_property.id not in [x.id for x in fp_list]: fp_list.append(exch.flow_property) return fp_list - def get_default_providers(self, uuid): + def get_default_providers(self, uuid, all_prov=True): """ Return a list of default providers for a given process. This function goes through all exchanges of a process, identifies the default providers, and returns a list of default provider objects. - Then the function goe through the exchanges of the default providers, + Optionally, the function can go through the exchanges of the default providers, identifies their default providers, and appends them to the list if - they are not already in the list. - - This process is repeated until all default providers are found. + they are not already in the list. Then this process is repeated until all + default providers are found. Parameters ---------- uuid : str The UUID of the process. - + all_prov : bool, optional + Whether to search for all default providers in the supply chain of the + given process. + True: Get all default providers in the supply chain of the given process. + False: Get only the default providers in the exchange table of the given process. + Returns ------- list - A list of default provider objects for the given process. + A list of process UUIDs of the default providers for the given process. """ provider_list = [] seen = set() @@ -1868,7 +1874,7 @@ def get_default_providers(self, uuid): dp = exch.default_provider.id if dp not in provider_list: provider_list.append(dp) - if dp not in seen: + if all_prov and dp not in seen: to_be_checked.append(dp) return provider_list From 4301143d8d54db8f9ab4f02f3afa6044c43756f7 Mon Sep 17 00:00:00 2001 From: Francis Hanna Date: Wed, 14 Jan 2026 12:50:49 -0500 Subject: [PATCH 07/21] Addresses TWD Comments --- netlolca/NetlOlca.py | 75 ++++++++++++++++++-------------------------- 1 file changed, 30 insertions(+), 45 deletions(-) diff --git a/netlolca/NetlOlca.py b/netlolca/NetlOlca.py index 5160671..9a2e1f6 100644 --- a/netlolca/NetlOlca.py +++ b/netlolca/NetlOlca.py @@ -1842,26 +1842,6 @@ def get_process_parameters(self, uuid): param_list = obj.parameters return param_list - def get_process_flows(self, uuid): - """ - Return a list of flows for a given process. - - Parameters - ---------- - uuid : str - The UUID of the process. - - Returns - ------- - list - A list of reference objects to the flows of the given process. - - Notes - ----- - Uses :func:`get_flows` with list return object. - """ - return self.get_flows(uuid, True, True, False) - def get_process_flow_properties(self, uuid): """ Return a list of flow properties for a given process. @@ -1991,24 +1971,31 @@ def get_full_dd_root_entities_dict(self, uuid_list, add_objs=True): Examples -------- >>> uuid_list = ["123e4567-e89b-12d3-a456-426614174000"] - >>> dd_root_entities_dict = self.get_dd_root_entities_dict(uuid_list) + >>> dd_root_entities_dict = self.get_full_dd_root_entities_dict(ddb_uuids, add_objs=True) >>> print(dd_root_entities_dict) """ # Get root entities dictionary of the full database. + print("Getting root entities dictionary of the source database.") full_dict = copy.deepcopy(self._spec_map) # Get full list of Process UUIDs in derivative database. # This includes processes that are providers to the processes in # `uuid_list`; remove duplicates (e.g., from similar providers). + all_prov = True + if all_prov == True: + print("Getting Process UUIDs from the derivative database including default providers across the entire supply chain.") + else: + print("Getting Process UUIDs from the derivative database including default providers for the targeted processes only.") ddb_uuids = [] for uuid in uuid_list: ddb_uuids.append(uuid) - ddb_uuids += self.get_default_providers(uuid) - uuid_list = list(set(ddb_uuids)) + ddb_uuids += self.get_default_providers(uuid, all_prov) + ddb_uuids = list(set(ddb_uuids)) # Create new field to store objs for each root entity. if add_objs: + print("Creating new field to store objects for each root entity.") for i in self._spec_map.keys(): full_dict[i]["objs"] = [] @@ -2030,11 +2017,17 @@ def get_full_dd_root_entities_dict(self, uuid_list, add_objs=True): o.SocialIndicator, ] for name in entities: + print("Resetting 'ids' field for %s." % name) i = get_dict_number(self._spec_map, name, 'class') full_dict[i]["ids"] = [] + # Get all parameters from the full database. + print("Getting all parameters objects from the full database.") + all_parameters = self.list_parameters() + # Add back entities. - for uuid in uuid_list: + for uuid in ddb_uuids: + print("Processing root entities for process UUID: %s" % uuid) # Actors #1 actors = self.get_process_actors(uuid) if actors: @@ -2067,12 +2060,12 @@ def get_full_dd_root_entities_dict(self, uuid_list, add_objs=True): # EPD #4 - keep all if add_objs: - i = get_dict_number(self._spec_map, o.EPD, "class") + i = get_dict_number(self._spec_map, o.Epd, "class") for _id in full_dict[i]["ids"]: full_dict[i]["objs"].append(self.query(o.EPD, _id)) # Flow #5 - flows = self.get_process_flows(uuid) + flows = self.get_flows(uuid, True, True, False) if flows: i = get_dict_number(self._spec_map, o.Flow, "class") for flow in flows: @@ -2099,27 +2092,18 @@ def get_full_dd_root_entities_dict(self, uuid_list, add_objs=True): locations = self.get_process_location(uuid) if locations: i = get_dict_number(self._spec_map, o.Location, "class") - if isinstance(locations, list): - for location in locations: - if location.id not in full_dict[i]["ids"]: - full_dict[i]["ids"].append(location.id) - if add_objs: - full_dict[i]["objs"].append( - self.query(o.Location, location.id) - ) - else: - if locations.id not in full_dict[i]["ids"]: - full_dict[i]["ids"].append(locations.id) - if add_objs: - full_dict[i]["objs"].append( - self.query(o.Location, locations.id) - ) + if locations.id not in full_dict[i]["ids"]: + full_dict[i]["ids"].append(locations.id) + if add_objs: + full_dict[i]["objs"].append( + self.query(o.Location, locations.id) + ) # Parameter #10 - parameters = self.get_process_parameters(uuid) + parameters = self.find_process_parameters(uuid, all_parameters) if parameters: i = get_dict_number(self._spec_map, o.Parameter, "class") - for parameter in parameters[0]: + for parameter in parameters: if parameter.id not in full_dict[i]["ids"]: full_dict[i]["ids"].append(parameter.id) if add_objs: @@ -2148,20 +2132,21 @@ def get_full_dd_root_entities_dict(self, uuid_list, add_objs=True): # Impact categories #7 - keep all if add_objs: - print ("resolving impact categories") + print("Collecting reference objects for impact categories.") i = get_dict_number(self._spec_map, o.ImpactCategory, "class") for _id in full_dict[i]["ids"]: full_dict[i]["objs"].append(self.query(o.ImpactCategory, _id)) # Impact methods #8 - keep all if add_objs: - print ("resolving impact methods") + print("Collecting reference objects for impact methods.") i = get_dict_number(self._spec_map, o.ImpactMethod, "class") for _id in full_dict[i]["ids"]: full_dict[i]["objs"].append(self.query(o.ImpactMethod, _id)) # UnitGroup #17 - keep all if add_objs: + print("Collecting reference objects for unit groups.") i = get_dict_number(self._spec_map, o.UnitGroup, "class") for _id in full_dict[i]["ids"]: full_dict[i]["objs"].append(self.query(o.UnitGroup, _id)) From 2f052f3f49aa365bbe0ab18095c47ccb215c3059 Mon Sep 17 00:00:00 2001 From: Francis Hanna Date: Thu, 15 Jan 2026 10:25:47 -0500 Subject: [PATCH 08/21] minor edit - removing print statements --- netlolca/NetlOlca.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/netlolca/NetlOlca.py b/netlolca/NetlOlca.py index 9a2e1f6..83a3192 100644 --- a/netlolca/NetlOlca.py +++ b/netlolca/NetlOlca.py @@ -1976,7 +1976,7 @@ def get_full_dd_root_entities_dict(self, uuid_list, add_objs=True): """ # Get root entities dictionary of the full database. - print("Getting root entities dictionary of the source database.") + logging.info("Getting root entities dictionary of the source database.") full_dict = copy.deepcopy(self._spec_map) # Get full list of Process UUIDs in derivative database. @@ -1984,9 +1984,9 @@ def get_full_dd_root_entities_dict(self, uuid_list, add_objs=True): # `uuid_list`; remove duplicates (e.g., from similar providers). all_prov = True if all_prov == True: - print("Getting Process UUIDs from the derivative database including default providers across the entire supply chain.") + logging.info("Getting Process UUIDs from the derivative database including default providers across the entire supply chain.") else: - print("Getting Process UUIDs from the derivative database including default providers for the targeted processes only.") + logging.info("Getting Process UUIDs from the derivative database including default providers for the targeted processes only.") ddb_uuids = [] for uuid in uuid_list: ddb_uuids.append(uuid) @@ -1995,7 +1995,7 @@ def get_full_dd_root_entities_dict(self, uuid_list, add_objs=True): # Create new field to store objs for each root entity. if add_objs: - print("Creating new field to store objects for each root entity.") + logging.info("Creating new field to store objects for each root entity.") for i in self._spec_map.keys(): full_dict[i]["objs"] = [] @@ -2017,17 +2017,17 @@ def get_full_dd_root_entities_dict(self, uuid_list, add_objs=True): o.SocialIndicator, ] for name in entities: - print("Resetting 'ids' field for %s." % name) + logging.info("Resetting 'ids' field for %s." % name) i = get_dict_number(self._spec_map, name, 'class') full_dict[i]["ids"] = [] # Get all parameters from the full database. - print("Getting all parameters objects from the full database.") + logging.info("Getting all parameters objects from the full database.") all_parameters = self.list_parameters() # Add back entities. for uuid in ddb_uuids: - print("Processing root entities for process UUID: %s" % uuid) + logging.info("Processing root entities for process UUID: %s" % uuid) # Actors #1 actors = self.get_process_actors(uuid) if actors: @@ -2132,21 +2132,21 @@ def get_full_dd_root_entities_dict(self, uuid_list, add_objs=True): # Impact categories #7 - keep all if add_objs: - print("Collecting reference objects for impact categories.") + logging.info("Collecting reference objects for impact categories.") i = get_dict_number(self._spec_map, o.ImpactCategory, "class") for _id in full_dict[i]["ids"]: full_dict[i]["objs"].append(self.query(o.ImpactCategory, _id)) # Impact methods #8 - keep all if add_objs: - print("Collecting reference objects for impact methods.") + logging.info("Collecting reference objects for impact methods.") i = get_dict_number(self._spec_map, o.ImpactMethod, "class") for _id in full_dict[i]["ids"]: full_dict[i]["objs"].append(self.query(o.ImpactMethod, _id)) # UnitGroup #17 - keep all if add_objs: - print("Collecting reference objects for unit groups.") + logging.info("Collecting reference objects for unit groups.") i = get_dict_number(self._spec_map, o.UnitGroup, "class") for _id in full_dict[i]["ids"]: full_dict[i]["objs"].append(self.query(o.UnitGroup, _id)) From cf056021b8cc0751d6df62bc10205d7e399277ab Mon Sep 17 00:00:00 2001 From: dt-woods Date: Tue, 27 Jan 2026 17:44:59 -0500 Subject: [PATCH 09/21] new method to find qr flow for a given process --- netlolca/NetlOlca.py | 47 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 40 insertions(+), 7 deletions(-) diff --git a/netlolca/NetlOlca.py b/netlolca/NetlOlca.py index 83a3192..f6b6e6d 100644 --- a/netlolca/NetlOlca.py +++ b/netlolca/NetlOlca.py @@ -29,7 +29,7 @@ IPC server) or indirectly (via an exported JSON-LD zip file). Last Edited: - 2026-01-13 + 2026-01-27 Examples -------- @@ -1756,11 +1756,11 @@ def get_process_actors(self, uuid): obj = self.query(o.Process, uuid) if obj.process_documentation.data_documentor: a_list.append(obj.process_documentation.data_documentor) - + if obj.process_documentation.data_generator and ( obj.process_documentation.data_generator not in a_list): a_list.append(obj.process_documentation.data_generator) - + if obj.process_documentation.data_set_owner and ( obj.process_documentation.data_set_owner not in a_list): a_list.append(obj.process_documentation.data_set_owner) @@ -1872,7 +1872,7 @@ def get_default_providers(self, uuid, all_prov=True): default providers, and returns a list of default provider objects. Optionally, the function can go through the exchanges of the default providers, identifies their default providers, and appends them to the list if - they are not already in the list. Then this process is repeated until all + they are not already in the list. Then this process is repeated until all default providers are found. Parameters @@ -1880,11 +1880,11 @@ def get_default_providers(self, uuid, all_prov=True): uuid : str The UUID of the process. all_prov : bool, optional - Whether to search for all default providers in the supply chain of the + Whether to search for all default providers in the supply chain of the given process. True: Get all default providers in the supply chain of the given process. False: Get only the default providers in the exchange table of the given process. - + Returns ------- list @@ -1995,7 +1995,9 @@ def get_full_dd_root_entities_dict(self, uuid_list, add_objs=True): # Create new field to store objs for each root entity. if add_objs: - logging.info("Creating new field to store objects for each root entity.") + logging.info( + "Creating new field to store objects for each root entity." + ) for i in self._spec_map.keys(): full_dict[i]["objs"] = [] @@ -2212,6 +2214,37 @@ def get_providers(self): self.logger.warning("No connection!") return r_list + def get_quant_ref_flow(self, p_uuid): + """Helper function to return a process's quantitative reference flow. + + Parameters + ---------- + p_uuid : str + A process (or product system) universally unique identifier. + + Returns + ------- + olca-schema.Exchange + An exchange object associated as quantitative reference flow, + or NoneType (if method fails to find process or no flows are + labeled as quantitative reference). + """ + # Ensure UUID is from a process. + _uuid = self.get_process_id(p_uuid) + _found = False + if _uuid: + p = self.query(o.Process, _uuid) + for ex in p.exchanges: + # A process has at most 1 quantitative reference. + if ex.is_quantitative_reference: + return ex + if not _found: + self.logger.warning( + "Failed to find quantitative reference flow " + "for process, '%s'" % p.name + ) + return None + def get_reviewer(self, uuid=None): """Return tuple of current unit process reviewer. From fc555fa7362d41bb297ba1c452ccdf7ea56d2614 Mon Sep 17 00:00:00 2001 From: Francis Hanna <91334875+frankhanna94@users.noreply.github.com> Date: Tue, 3 Feb 2026 17:25:47 -0500 Subject: [PATCH 10/21] Fix logging for default provider handling Refactor logging and set provider retrieval to True (not tunable). --- netlolca/NetlOlca.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/netlolca/NetlOlca.py b/netlolca/NetlOlca.py index f6b6e6d..4f2042c 100644 --- a/netlolca/NetlOlca.py +++ b/netlolca/NetlOlca.py @@ -1982,15 +1982,11 @@ def get_full_dd_root_entities_dict(self, uuid_list, add_objs=True): # Get full list of Process UUIDs in derivative database. # This includes processes that are providers to the processes in # `uuid_list`; remove duplicates (e.g., from similar providers). - all_prov = True - if all_prov == True: - logging.info("Getting Process UUIDs from the derivative database including default providers across the entire supply chain.") - else: - logging.info("Getting Process UUIDs from the derivative database including default providers for the targeted processes only.") + logging.info("Getting Process UUIDs from the derivative database including default providers across the entire supply chain.") ddb_uuids = [] for uuid in uuid_list: ddb_uuids.append(uuid) - ddb_uuids += self.get_default_providers(uuid, all_prov) + ddb_uuids += self.get_default_providers(uuid, True) ddb_uuids = list(set(ddb_uuids)) # Create new field to store objs for each root entity. From e123ee8e4d80f1b456594788d6757f69fbbff598 Mon Sep 17 00:00:00 2001 From: Francis Hanna <91334875+frankhanna94@users.noreply.github.com> Date: Tue, 3 Feb 2026 17:26:48 -0500 Subject: [PATCH 11/21] Fix typo in query method for Epd --- netlolca/NetlOlca.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/netlolca/NetlOlca.py b/netlolca/NetlOlca.py index 4f2042c..4cc7ace 100644 --- a/netlolca/NetlOlca.py +++ b/netlolca/NetlOlca.py @@ -2060,7 +2060,7 @@ def get_full_dd_root_entities_dict(self, uuid_list, add_objs=True): if add_objs: i = get_dict_number(self._spec_map, o.Epd, "class") for _id in full_dict[i]["ids"]: - full_dict[i]["objs"].append(self.query(o.EPD, _id)) + full_dict[i]["objs"].append(self.query(o.Epd, _id)) # Flow #5 flows = self.get_flows(uuid, True, True, False) From bab86ee840a8728dbe0d603cc6dd727ef6bf08c7 Mon Sep 17 00:00:00 2001 From: Francis Hanna Date: Mon, 9 Mar 2026 11:29:46 -0400 Subject: [PATCH 12/21] quick fix to export all locations --- netlolca/NetlOlca.py | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/netlolca/NetlOlca.py b/netlolca/NetlOlca.py index 4cc7ace..172a09b 100644 --- a/netlolca/NetlOlca.py +++ b/netlolca/NetlOlca.py @@ -2005,7 +2005,6 @@ def get_full_dd_root_entities_dict(self, uuid_list, add_objs=True): o.DQSystem, o.Flow, o.FlowProperty, - o.Location, o.Parameter, o.Process, o.Source, @@ -2086,17 +2085,6 @@ def get_full_dd_root_entities_dict(self, uuid_list, add_objs=True): self.query(o.FlowProperty, flow_property.id) ) - # Location #9 - locations = self.get_process_location(uuid) - if locations: - i = get_dict_number(self._spec_map, o.Location, "class") - if locations.id not in full_dict[i]["ids"]: - full_dict[i]["ids"].append(locations.id) - if add_objs: - full_dict[i]["objs"].append( - self.query(o.Location, locations.id) - ) - # Parameter #10 parameters = self.find_process_parameters(uuid, all_parameters) if parameters: @@ -2142,6 +2130,13 @@ def get_full_dd_root_entities_dict(self, uuid_list, add_objs=True): for _id in full_dict[i]["ids"]: full_dict[i]["objs"].append(self.query(o.ImpactMethod, _id)) + # Location #9 - keep all + if add_objs: + logging.info("Collecting reference objects for locations.") + i = get_dict_number(self._spec_map, o.Location, "class") + for _id in full_dict[i]["ids"]: + full_dict[i]["objs"].append(self.query(o.Location, _id)) + # UnitGroup #17 - keep all if add_objs: logging.info("Collecting reference objects for unit groups.") From f0fce440313d46454e6258c101b486f8bf167374 Mon Sep 17 00:00:00 2001 From: Francis Hanna Date: Tue, 10 Mar 2026 15:41:16 -0400 Subject: [PATCH 13/21] Minor edit This commit adds a log message that includes the number of extra processes add to the produced root entities dictionary in the get_full_dd_root_entities_dict method. --- netlolca/NetlOlca.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/netlolca/NetlOlca.py b/netlolca/NetlOlca.py index 172a09b..402bcc0 100644 --- a/netlolca/NetlOlca.py +++ b/netlolca/NetlOlca.py @@ -1987,6 +1987,8 @@ def get_full_dd_root_entities_dict(self, uuid_list, add_objs=True): for uuid in uuid_list: ddb_uuids.append(uuid) ddb_uuids += self.get_default_providers(uuid, True) + n_extra_processes = len(ddb_uuids) - len(uuid_list) + logging.info(f"The derivative database includes {n_extra_processes} additional processes that are default providers to the selected processes.") ddb_uuids = list(set(ddb_uuids)) # Create new field to store objs for each root entity. From 8908eb62f8b325d81e07ba4c4d1eb55aa91592aa Mon Sep 17 00:00:00 2001 From: Francis Hanna Date: Fri, 20 Mar 2026 11:34:46 -0400 Subject: [PATCH 14/21] minor edit to make all_prov tunable --- netlolca/NetlOlca.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/netlolca/NetlOlca.py b/netlolca/NetlOlca.py index 402bcc0..15fa7e6 100644 --- a/netlolca/NetlOlca.py +++ b/netlolca/NetlOlca.py @@ -1922,7 +1922,7 @@ def get_default_providers(self, uuid, all_prov=True): return provider_list - def get_full_dd_root_entities_dict(self, uuid_list, add_objs=True): + def get_full_dd_root_entities_dict(self, uuid_list, add_objs=True, all_prov = True): """ This method takes a list of uuids for select processes and returns a dictionary of root entities that are associated with these processes. @@ -1961,6 +1961,11 @@ def get_full_dd_root_entities_dict(self, uuid_list, add_objs=True): add_obs : bool, optional Whether to include entity objects in the return dictionary. If false, will only return UUIDs. + all_prov : bool, optional + Whether to include all default providers in the supply chain of the + given processes. + True: Get all default providers in the supply chain of the given processes. + False: Get only the default providers in the exchange table of the given processes. Returns ------- @@ -1986,7 +1991,7 @@ def get_full_dd_root_entities_dict(self, uuid_list, add_objs=True): ddb_uuids = [] for uuid in uuid_list: ddb_uuids.append(uuid) - ddb_uuids += self.get_default_providers(uuid, True) + ddb_uuids += self.get_default_providers(uuid, all_prov) n_extra_processes = len(ddb_uuids) - len(uuid_list) logging.info(f"The derivative database includes {n_extra_processes} additional processes that are default providers to the selected processes.") ddb_uuids = list(set(ddb_uuids)) From d4442e52003f846173863b702006465d72679857 Mon Sep 17 00:00:00 2001 From: dt-woods Date: Tue, 24 Mar 2026 14:24:12 -0400 Subject: [PATCH 15/21] speed up list_parameters Replace thousands of query statements with a single get_all statement --- netlolca/NetlOlca.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/netlolca/NetlOlca.py b/netlolca/NetlOlca.py index 15fa7e6..e3fa414 100644 --- a/netlolca/NetlOlca.py +++ b/netlolca/NetlOlca.py @@ -29,7 +29,7 @@ IPC server) or indirectly (via an exported JSON-LD zip file). Last Edited: - 2026-01-27 + 2026-03-24 Examples -------- @@ -2628,9 +2628,8 @@ def list_parameters(self, # HOTFIX: all parameters (including process and global) are found # in the Parameter root entity list. - for par_id in self.get_spec_ids(o.Parameter): - par_obj = self.query(o.Parameter, par_id) - + # HOTFIX: use :func:`get_all` to speed up search [26.03.24; TWD] + for par_obj in self.get_all(o.Parameter): # Global scope if inc_global and ( par_obj.parameter_scope == o.ParameterScope.GLOBAL_SCOPE): From 51a61cf9412f0d0bbfd5c6a07c1450b5420aa05a Mon Sep 17 00:00:00 2001 From: dt-woods Date: Tue, 24 Mar 2026 14:52:57 -0400 Subject: [PATCH 16/21] provide logging support Three new methods to support logging for NetlOlca---check_output_dir (creates .netlolca in the user's home directory), get_logger (for streaming and debug-level rotating file handler), rollover_logger (for rotating file handler). Methods are heavily based on ElectricityLCI utils.py --- netlolca/NetlOlca.py | 138 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 138 insertions(+) diff --git a/netlolca/NetlOlca.py b/netlolca/NetlOlca.py index e3fa414..c6e317c 100644 --- a/netlolca/NetlOlca.py +++ b/netlolca/NetlOlca.py @@ -8,6 +8,7 @@ ############################################################################## import json import logging +from logging.handlers import RotatingFileHandler import os import re import shutil @@ -3237,6 +3238,40 @@ def check_for_docker(): return False +def check_output_dir(out_dir): + """Helper method to ensure a directory exists. + + If a given directory does not exist, this method attempts to create it. + + Parameters + ---------- + out_dir : str + A path to a directory. + + Returns + ------- + bool + Whether the directory exists. + """ + if not os.path.isdir(out_dir): + try: + # Start with super mkdir + os.makedirs(out_dir) + except: + logging.warning("Failed to create folder %s!" % out_dir) + try: + # Revert to simple mkdir + os.mkdir(out_dir) + except: + logging.error("Could not create folder, %s" % out_dir) + else: + logging.info("Created %s" % out_dir) + else: + logging.info("Created %s" % out_dir) + + return os.path.isdir(out_dir) + + def get_as_yaml(my_dict, rm_at=False): """Return a dictionary as a YAML string. @@ -3299,6 +3334,91 @@ def get_dict_number(dobj, val, key): return None +def get_logger(stream=True, rfh=True, str_lv='INFO', rfh_lv='DEBUG'): + """A helper function for creating or retrieving a root logger with + only one instance of stream and/or rotating file handler. + + Parameters + ---------- + stream : bool, optional + Whether to create a stream handler, by default True + rfh : bool, optional + Whether to create a rotating file handler, by default True + str_lv : str, optional + Stream handler logging level, by default 'INFO' + rfh_lv : str, optional + Rotating file handler logging level, by default 'DEBUG' + + Returns + ------- + logging.Logger + The root logger. + + Notes + ----- + This could be expanded to allow the user to set the logging + level of a specific handler (or just overwrite all levels). + """ + # Create/retrieve the root logger + log = logging.getLogger() + log.setLevel("DEBUG") + + # Congrats, you now have a hidden folder in your user's home directory. + output_dir = os.path.join( + os.path.expanduser("~"), + ".netlolca" + ) + + # Define log format + rec_format = ( + "%(asctime)s.%(msecs)03d:%(levelname)s:%(module)s:%(funcName)s:" + "%(message)s") + formatter = logging.Formatter(rec_format, datefmt='%Y-%m-%d %H:%M:%S') + + # Check what handlers the root logger already has + has_stream = False + has_rfh = False + for h in log.handlers: + if h.name == 'nolca_stream': + has_stream = True + h.setLevel(str_lv) # handle level change requests + elif h.name == 'nolca_rfh': + has_rfh = True + h.setLevel(rfh_lv) + + # Create stream handler for info messages + if stream and not has_stream: + s_handler = logging.StreamHandler() + s_handler.setLevel(str_lv) + s_handler.setFormatter(formatter) + s_handler.set_name('nolca_stream') + s_handler.stream = sys.stdout # won't show pink in Jupyter notebook + log.addHandler(s_handler) + + # Create file handler for debug messages + if rfh and not has_rfh: + log_filename = "nolca.log" + check_output_dir(output_dir) + log_path = os.path.join(output_dir, log_filename) + f_handler = RotatingFileHandler( + log_path, backupCount=9, encoding='utf-8') + f_handler.setLevel(rfh_lv) + f_handler.setFormatter(formatter) + f_handler.set_name('nolca_rfh') + log.addHandler(f_handler) + if os.path.isfile(log_path): + rollover_logger(log) + + # Clean-up step; all unnamed log handlers get elevated to critical. + # NOTE: alternatively, we could drop all unnamed loggers. + num_handlers = len(log.handlers) + for i in range(num_handlers): + if not log.handlers[i].name: + log.handlers[i].setLevel("CRITICAL") + + return log + + def make_actor_yaml(yaml_name, yaml_dir="data"): """Create a YAML file with actor template. @@ -3460,6 +3580,24 @@ def read_yaml(fpath): return c_dict +def rollover_logger(logger): + """Helper method to rollover a named Rotating File Handler. + + Parameters + ---------- + logger : logging.Logger + A logger (e.g., root logger) + """ + try: + idx = [x.name for x in logger.handlers].index("nolca_rfh") + except ValueError: + idx = -1 + + # Rollover the rotating file handler (if found) + if idx != -1: + logger.handlers[idx].doRollover() + + def writeout(fpath, dstring): """Write new/overwrite existing file with given data string. From 8e85608b7cfc98a52758649f95fe178c81dce01b Mon Sep 17 00:00:00 2001 From: Francis Hanna Date: Sat, 28 Mar 2026 15:05:37 -0400 Subject: [PATCH 17/21] In this commit 03/28/2026: * Updated get_default_providers to accept a list of UUIDs rather than a single UUID * Updated the declaration of get_default_providers in get_full_dd_root_entities_dict * modified get_full_dd_root_entities_dict - added error handling for bad requests - if uuid provided does not exist in source database --- netlolca/NetlOlca.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/netlolca/NetlOlca.py b/netlolca/NetlOlca.py index c6e317c..98ac0e1 100644 --- a/netlolca/NetlOlca.py +++ b/netlolca/NetlOlca.py @@ -1865,7 +1865,7 @@ def get_process_flow_properties(self, uuid): fp_list.append(exch.flow_property) return fp_list - def get_default_providers(self, uuid, all_prov=True): + def get_default_providers(self, uuid_list, all_prov=True): """ Return a list of default providers for a given process. @@ -1878,8 +1878,8 @@ def get_default_providers(self, uuid, all_prov=True): Parameters ---------- - uuid : str - The UUID of the process. + uuid_list : list of strings + A list of UUIDs of the processes. all_prov : bool, optional Whether to search for all default providers in the supply chain of the given process. @@ -1893,7 +1893,7 @@ def get_default_providers(self, uuid, all_prov=True): """ provider_list = [] seen = set() - to_be_checked = [uuid,] + to_be_checked = uuid_list spec_ids = set(self.get_spec_ids(o.Process)) while to_be_checked: @@ -1984,18 +1984,15 @@ def get_full_dd_root_entities_dict(self, uuid_list, add_objs=True, all_prov = Tr # Get root entities dictionary of the full database. logging.info("Getting root entities dictionary of the source database.") full_dict = copy.deepcopy(self._spec_map) - + all_uuids = self.get_spec_ids(o.Process) # Get full list of Process UUIDs in derivative database. # This includes processes that are providers to the processes in # `uuid_list`; remove duplicates (e.g., from similar providers). logging.info("Getting Process UUIDs from the derivative database including default providers across the entire supply chain.") - ddb_uuids = [] - for uuid in uuid_list: - ddb_uuids.append(uuid) - ddb_uuids += self.get_default_providers(uuid, all_prov) + ddb_uuids = self.get_default_providers(uuid_list, all_prov) + ddb_uuids = list(set(ddb_uuids)) n_extra_processes = len(ddb_uuids) - len(uuid_list) logging.info(f"The derivative database includes {n_extra_processes} additional processes that are default providers to the selected processes.") - ddb_uuids = list(set(ddb_uuids)) # Create new field to store objs for each root entity. if add_objs: @@ -2032,6 +2029,9 @@ def get_full_dd_root_entities_dict(self, uuid_list, add_objs=True, all_prov = Tr # Add back entities. for uuid in ddb_uuids: + if uuid not in all_uuids: + logging.info("Process UUID %s is not in the full database. Skipping." % uuid) + continue logging.info("Processing root entities for process UUID: %s" % uuid) # Actors #1 actors = self.get_process_actors(uuid) From 96726865a1da928ea52e92ef64b982b51dc0b8c1 Mon Sep 17 00:00:00 2001 From: Francis Hanna Date: Sat, 28 Mar 2026 23:03:19 -0400 Subject: [PATCH 18/21] dd root entity dict - reappend uuid --- netlolca/NetlOlca.py | 1 + 1 file changed, 1 insertion(+) diff --git a/netlolca/NetlOlca.py b/netlolca/NetlOlca.py index 98ac0e1..e5d8110 100644 --- a/netlolca/NetlOlca.py +++ b/netlolca/NetlOlca.py @@ -1990,6 +1990,7 @@ def get_full_dd_root_entities_dict(self, uuid_list, add_objs=True, all_prov = Tr # `uuid_list`; remove duplicates (e.g., from similar providers). logging.info("Getting Process UUIDs from the derivative database including default providers across the entire supply chain.") ddb_uuids = self.get_default_providers(uuid_list, all_prov) + ddb_uuids.extend(uuid_list) ddb_uuids = list(set(ddb_uuids)) n_extra_processes = len(ddb_uuids) - len(uuid_list) logging.info(f"The derivative database includes {n_extra_processes} additional processes that are default providers to the selected processes.") From 947f001fe9dce54d7f46c23b8e4434dc430ca971 Mon Sep 17 00:00:00 2001 From: Francis Hanna Date: Sat, 28 Mar 2026 23:25:35 -0400 Subject: [PATCH 19/21] quickfix --- netlolca/NetlOlca.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/netlolca/NetlOlca.py b/netlolca/NetlOlca.py index e5d8110..13b8655 100644 --- a/netlolca/NetlOlca.py +++ b/netlolca/NetlOlca.py @@ -1989,11 +1989,11 @@ def get_full_dd_root_entities_dict(self, uuid_list, add_objs=True, all_prov = Tr # This includes processes that are providers to the processes in # `uuid_list`; remove duplicates (e.g., from similar providers). logging.info("Getting Process UUIDs from the derivative database including default providers across the entire supply chain.") - ddb_uuids = self.get_default_providers(uuid_list, all_prov) - ddb_uuids.extend(uuid_list) + ddb_uuids=uuid_list + ddb_uuids.extend(self.get_default_providers(uuid_list, all_prov)) ddb_uuids = list(set(ddb_uuids)) n_extra_processes = len(ddb_uuids) - len(uuid_list) - logging.info(f"The derivative database includes {n_extra_processes} additional processes that are default providers to the selected processes.") + logging.info(f"The derivative database includes {n_extra_processes} additional processes that are default providers to the selected processes or are included in the selected processes.") # Create new field to store objs for each root entity. if add_objs: From 443b2ff2e4168f74420b21ed6f919aef582a1fd2 Mon Sep 17 00:00:00 2001 From: Francis Hanna Date: Sat, 28 Mar 2026 23:45:06 -0400 Subject: [PATCH 20/21] dd_root_entity_dict error handling --- netlolca/NetlOlca.py | 43 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 34 insertions(+), 9 deletions(-) diff --git a/netlolca/NetlOlca.py b/netlolca/NetlOlca.py index 13b8655..be2c27c 100644 --- a/netlolca/NetlOlca.py +++ b/netlolca/NetlOlca.py @@ -1989,10 +1989,11 @@ def get_full_dd_root_entities_dict(self, uuid_list, add_objs=True, all_prov = Tr # This includes processes that are providers to the processes in # `uuid_list`; remove duplicates (e.g., from similar providers). logging.info("Getting Process UUIDs from the derivative database including default providers across the entire supply chain.") - ddb_uuids=uuid_list - ddb_uuids.extend(self.get_default_providers(uuid_list, all_prov)) + uuid_list_copy = copy.deepcopy(uuid_list) + ddb_uuids = (self.get_default_providers(uuid_list, all_prov)) + ddb_uuids.extend(uuid_list_copy) ddb_uuids = list(set(ddb_uuids)) - n_extra_processes = len(ddb_uuids) - len(uuid_list) + n_extra_processes = len(ddb_uuids) - len(uuid_list_copy) logging.info(f"The derivative database includes {n_extra_processes} additional processes that are default providers to the selected processes or are included in the selected processes.") # Create new field to store objs for each root entity. @@ -2035,7 +2036,11 @@ def get_full_dd_root_entities_dict(self, uuid_list, add_objs=True, all_prov = Tr continue logging.info("Processing root entities for process UUID: %s" % uuid) # Actors #1 - actors = self.get_process_actors(uuid) + try: + actors = self.get_process_actors(uuid) + except Exception as e: + logging.error(f"Error getting actors for process {uuid}: {e}") + continue if actors: i = get_dict_number(self._spec_map, o.Actor, "class") for actor in actors: @@ -2053,7 +2058,11 @@ def get_full_dd_root_entities_dict(self, uuid_list, add_objs=True, all_prov = Tr full_dict[i]["objs"].append(self.query(o.Currency, _id)) # DQ system #3 - dq_systems = self.get_process_dq_system(uuid) + try: + dq_systems = self.get_process_dq_system(uuid) + except Exception as e: + logging.error(f"Error getting DQ systems for process {uuid}: {e}") + continue if dq_systems: i = get_dict_number(self._spec_map, o.DQSystem, "class") for dq_system in dq_systems: @@ -2071,7 +2080,11 @@ def get_full_dd_root_entities_dict(self, uuid_list, add_objs=True, all_prov = Tr full_dict[i]["objs"].append(self.query(o.Epd, _id)) # Flow #5 - flows = self.get_flows(uuid, True, True, False) + try: + flows = self.get_flows(uuid, True, True, False) + except Exception as e: + logging.error(f"Error getting flows for process {uuid}: {e}") + continue if flows: i = get_dict_number(self._spec_map, o.Flow, "class") for flow in flows: @@ -2083,7 +2096,11 @@ def get_full_dd_root_entities_dict(self, uuid_list, add_objs=True, all_prov = Tr ) # Flow property #6 - flow_properties = self.get_process_flow_properties(uuid) + try: + flow_properties = self.get_process_flow_properties(uuid) + except Exception as e: + logging.error(f"Error getting flow properties for process {uuid}: {e}") + continue if flow_properties: i = get_dict_number(self._spec_map, o.FlowProperty, "class") for flow_property in flow_properties: @@ -2095,7 +2112,11 @@ def get_full_dd_root_entities_dict(self, uuid_list, add_objs=True, all_prov = Tr ) # Parameter #10 - parameters = self.find_process_parameters(uuid, all_parameters) + try: + parameters = self.find_process_parameters(uuid, all_parameters) + except Exception as e: + logging.error(f"Error getting parameters for process {uuid}: {e}") + continue if parameters: i = get_dict_number(self._spec_map, o.Parameter, "class") for parameter in parameters: @@ -2114,7 +2135,11 @@ def get_full_dd_root_entities_dict(self, uuid_list, add_objs=True, all_prov = Tr full_dict[i]["objs"].append(self.query(o.Process, uuid)) # Source #16 - sources = self.get_process_sources(uuid) + try: + sources = self.get_process_sources(uuid) + except Exception as e: + logging.error(f"Error getting sources for process {uuid}: {e}") + continue if sources: i = get_dict_number(self._spec_map, o.Source, "class") for source in sources: From c01daaea641faf6f088fbcfe1e659896dbd96b7a Mon Sep 17 00:00:00 2001 From: dt-woods Date: Mon, 30 Mar 2026 16:32:00 -0400 Subject: [PATCH 21/21] fix module all list --- netlolca/NetlOlca.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/netlolca/NetlOlca.py b/netlolca/NetlOlca.py index be2c27c..3453c5c 100644 --- a/netlolca/NetlOlca.py +++ b/netlolca/NetlOlca.py @@ -30,7 +30,7 @@ IPC server) or indirectly (via an exported JSON-LD zip file). Last Edited: - 2026-03-24 + 2026-03-30 Examples -------- @@ -88,12 +88,16 @@ __all__ = [ "FUEL_CATS", "NetlOlca", + "check_for_docker", + "check_output_dir", "get_as_yaml", "get_dict_number", + "get_logger", "make_actor_yaml", "pretty_print_dict", "print_progress", "read_yaml", + "rollover_logger", "writeout", ]