diff --git a/TM1py/Services/ElementService.py b/TM1py/Services/ElementService.py index 7b8b8c77..f9569c6d 100644 --- a/TM1py/Services/ElementService.py +++ b/TM1py/Services/ElementService.py @@ -235,42 +235,18 @@ def delete_edges_use_blob( :param kwargs: Additional arguments for the process execution. :return: None """ - if not edges: - return - - process_service = ProcessService(self._rest) - file_service = FileService(self._rest) - - unique_name = self.suggest_unique_object_name() - - # Transform cells to format that's consumable for TI - csv_content = StringIO() - csv_writer = csv.writer(csv_content, delimiter=",", quoting=csv.QUOTE_ALL) - csv_writer.writerows(list(edge) for edge in edges) - - file_name = f"{unique_name}.csv" - file_service.create(file_name=file_name, file_content=csv_content.getvalue().encode("utf-8"), **kwargs) - - try: - # Create and execute unbound TI process to delete edges using blob file - process = self._build_unwind_hierarchy_edges_from_blob_process( + return self._run_blob_process( + rows=[list(edge) for edge in edges] if edges else [], + build_process=lambda process_name, blob_filename: self._build_unwind_hierarchy_edges_from_blob_process( dimension_name=dimension_name, hierarchy_name=hierarchy_name, - process_name=unique_name, - blob_filename=file_name, + process_name=process_name, + blob_filename=blob_filename, skip_invalid_edges=skip_invalid_edges, - ) - - success, status, log_file = process_service.execute_process_with_return(process=process, **kwargs) - if not success: - if status in ["HasMinorErrors"]: - raise TM1pyWritePartialFailureException([status], [log_file], 1) - else: - raise TM1pyWriteFailureException([status], [log_file]) - - finally: - if remove_blob: - file_service.delete(file_name=file_name) + ), + remove_blob=remove_blob, + **kwargs, + ) def _build_unwind_hierarchy_edges_from_blob_process( self, @@ -280,11 +256,43 @@ def _build_unwind_hierarchy_edges_from_blob_process( blob_filename: str, skip_invalid_edges: bool = True, ) -> Process: + parent_variable = "vParent" + child_variable = "vChild" + process = self._build_blob_datasource_process( + process_name=process_name, + blob_filename=blob_filename, + variables=[(parent_variable, "String"), (child_variable, "String")], + ) - # v11 automatically adds blb file extensions to documents created via the contents api + # Write the statement for delete component in hierarchy + if skip_invalid_edges: + delete_component = ( + f"IF(ElementIsParent('{dimension_name}','{hierarchy_name}',{parent_variable},{child_variable})=1);" + f"HierarchyElementComponentDelete('{dimension_name}','{hierarchy_name}',{parent_variable},{child_variable});" + f"ENDIF;" + ) + else: + delete_component = ( + f"HierarchyElementComponentDelete('{dimension_name}','{hierarchy_name}'," + f"{parent_variable},{child_variable});" + ) + + process.metadata_procedure = delete_component + return process + + def _build_blob_datasource_process( + self, process_name: str, blob_filename: str, variables: List[Tuple[str, str]] + ) -> Process: + """Build an unbound Process that reads the given CSV blob as an ASCII data source. + + Declares `variables` (a list of (name, type) tuples, each type 'String' or 'Numeric') and + sets UTF-8 input encoding. The caller is responsible for the metadata / data procedure. + """ + # v11 automatically adds a .blb file extension to documents created via the contents api if not verify_version(required_version="12", version=self.version): blob_filename += ".blb" - hierarchyupdate_process = Process( + + process = Process( name=process_name, datasource_type="ASCII", datasource_ascii_header_records=0, @@ -295,31 +303,51 @@ def _build_unwind_hierarchy_edges_from_blob_process( datasource_ascii_thousand_separator="", datasource_ascii_quote_character='"', ) + process.prolog_procedure = "SetInputCharacterSet('TM1CS_UTF8');" + for variable_name, variable_type in variables: + process.add_variable(name=variable_name, variable_type=variable_type) + return process - # Define encoding in Prolog section - hierarchyupdate_process.prolog_procedure = """ - SetInputCharacterSet('TM1CS_UTF8'); - """ - parent_variable = "vParent" - child_variable = "vChild" - hierarchyupdate_process.add_variable(name=parent_variable, variable_type="String") - hierarchyupdate_process.add_variable(name=child_variable, variable_type="String") + def _run_blob_process(self, rows: Iterable[Iterable], build_process, remove_blob: bool = True, **kwargs): + """Upload `rows` as a CSV blob and run an unbound TI process built from it. - # Write the statement for delete component in hierarchy - if skip_invalid_edges: - delete_component = ( - f"\r" - f"IF(ElementIsParent('{dimension_name}','{hierarchy_name}',{parent_variable},{child_variable})=1);" - f"HierarchyElementComponentDelete('{dimension_name}','{hierarchy_name}',{parent_variable},{child_variable});" - f"ENDIF;" - ) - else: - delete_component = f"HierarchyElementComponentDelete('{dimension_name}','{hierarchy_name}',{parent_variable},{child_variable});" + Shared plumbing for the blob-based element/edge operations: serialize the rows to a CSV, + stage it as a blob via the FileService, execute the process returned by `build_process` + (a callable taking the unique process name and the blob file name), then clean up the blob. - # Define Metadata section - metadata_statement = delete_component - hierarchyupdate_process.metadata_procedure = metadata_statement - return hierarchyupdate_process + :param rows: An iterable of rows (each an iterable of cell values) written to the CSV blob. + :param build_process: Callable (process_name, blob_filename) -> Process to execute. + :param remove_blob: Whether to delete the staged blob after execution (default: True). + :return: None + """ + rows = list(rows) + if not rows: + return + + process_service = ProcessService(self._rest) + file_service = FileService(self._rest) + + unique_name = self.suggest_unique_object_name() + + # Transform rows into a CSV that's consumable as a TI data source + csv_content = StringIO() + csv_writer = csv.writer(csv_content, delimiter=",", quoting=csv.QUOTE_ALL) + csv_writer.writerows(rows) + + file_name = f"{unique_name}.csv" + file_service.create(file_name=file_name, file_content=csv_content.getvalue().encode("utf-8"), **kwargs) + + try: + process = build_process(unique_name, file_name) + success, status, log_file = process_service.execute_process_with_return(process=process, **kwargs) + if not success: + if status in ["HasMinorErrors"]: + raise TM1pyWritePartialFailureException([status], [log_file], 1) + else: + raise TM1pyWriteFailureException([status], [log_file]) + finally: + if remove_blob: + file_service.delete(file_name=file_name) def get_elements(self, dimension_name: str, hierarchy_name: str, **kwargs) -> List[Element]: url = format_url( @@ -1332,18 +1360,36 @@ def remove_edge(self, dimension_name: str, hierarchy_name: str, parent: str, com return self._rest.DELETE(url=url, **kwargs) def add_edges( - self, dimension_name: str, hierarchy_name: str = None, edges: Dict[Tuple[str, str], int] = None, **kwargs + self, + dimension_name: str, + hierarchy_name: str = None, + edges: Dict[Tuple[str, str], int] = None, + use_blob: bool = False, + remove_blob: bool = True, + **kwargs, ) -> Response: """Add Edges to hierarchy. Fails if one edge already exists. :param dimension_name: :param hierarchy_name: - :param edges: + :param edges: A dict mapping (parent, component) tuples to the edge weight. + :param use_blob: Add the edges via an uploaded CSV blob + unbound TI process. Requires admin + permissions. Better performance on large edge sets. Returns None instead of a Response. + :param remove_blob: Remove the staged blob file after use (only with use_blob=True, default: True). :return: """ if not hierarchy_name: hierarchy_name = dimension_name + if use_blob: + return self.add_edges_use_blob( + dimension_name=dimension_name, + hierarchy_name=hierarchy_name, + edges=edges, + remove_blob=remove_blob, + **kwargs, + ) + url = format_url("/Dimensions('{}')/Hierarchies('{}')/Edges", dimension_name, hierarchy_name) body = [ {"ParentName": parent, "ComponentName": component, "Weight": float(weight)} @@ -1352,19 +1398,141 @@ def add_edges( return self._rest.POST(url=url, data=json.dumps(body), **kwargs) - def add_elements(self, dimension_name: str, hierarchy_name: str, elements: Iterable[Element], **kwargs): + @require_data_admin + @require_ops_admin + def add_edges_use_blob( + self, + dimension_name: str, + hierarchy_name: str = None, + edges: Dict[Tuple[str, str], int] = None, + remove_blob: bool = True, + **kwargs, + ): + """Add edges to a hierarchy via an unbound TI process having an uploaded CSV as the data source. + + Mirrors `add_edges` but scales better to large edge sets. Edges that already exist surface as + minor errors (raised as TM1pyWritePartialFailureException). + + :param dimension_name: The name of the dimension. + :param hierarchy_name: The name of the hierarchy. Defaults to the dimension name. + :param edges: A dict mapping (parent, component) tuples to the edge weight. + :param remove_blob: Remove the staged blob file after use (default: True). + :return: None + """ + if not hierarchy_name: + hierarchy_name = dimension_name + + return self._run_blob_process( + rows=[[parent, component, weight] for (parent, component), weight in edges.items()] if edges else [], + build_process=lambda process_name, blob_filename: self._build_add_edges_from_blob_process( + dimension_name=dimension_name, + hierarchy_name=hierarchy_name, + process_name=process_name, + blob_filename=blob_filename, + ), + remove_blob=remove_blob, + **kwargs, + ) + + def _build_add_edges_from_blob_process( + self, dimension_name: str, hierarchy_name: str, process_name: str, blob_filename: str + ) -> Process: + parent_variable = "vParent" + child_variable = "vChild" + weight_variable = "vWeight" + process = self._build_blob_datasource_process( + process_name=process_name, + blob_filename=blob_filename, + variables=[(parent_variable, "String"), (child_variable, "String"), (weight_variable, "Numeric")], + ) + process.metadata_procedure = ( + f"HierarchyElementComponentAdd('{dimension_name}','{hierarchy_name}'," + f"{parent_variable},{child_variable},{weight_variable});" + ) + return process + + def add_elements( + self, + dimension_name: str, + hierarchy_name: str, + elements: Iterable[Element], + use_blob: bool = False, + remove_blob: bool = True, + **kwargs, + ): """Add elements to hierarchy. Fails if one element already exists. :param dimension_name: :param hierarchy_name: :param elements: + :param use_blob: Add the elements via an uploaded CSV blob + unbound TI process. Requires admin + permissions. Better performance on large element sets. Returns None instead of a Response. + :param remove_blob: Remove the staged blob file after use (only with use_blob=True, default: True). :return: """ + if use_blob: + return self.add_elements_use_blob( + dimension_name=dimension_name, + hierarchy_name=hierarchy_name, + elements=elements, + remove_blob=remove_blob, + **kwargs, + ) + url = format_url("/Dimensions('{}')/Hierarchies('{}')/Elements", dimension_name, hierarchy_name) body = [element.body_as_dict for element in elements] return self._rest.POST(url=url, data=json.dumps(body), **kwargs) + @require_data_admin + @require_ops_admin + def add_elements_use_blob( + self, + dimension_name: str, + hierarchy_name: str, + elements: Iterable[Element], + remove_blob: bool = True, + **kwargs, + ): + """Add elements to a hierarchy via an unbound TI process having an uploaded CSV as the data source. + + Mirrors `add_elements` but scales better to large element sets. Elements that already exist + surface as minor errors (raised as TM1pyWritePartialFailureException). + + :param dimension_name: The name of the dimension. + :param hierarchy_name: The name of the hierarchy. + :param elements: An iterable of Element objects to add. + :param remove_blob: Remove the staged blob file after use (default: True). + :return: None + """ + return self._run_blob_process( + rows=[[element.name, str(element.element_type)[0]] for element in elements] if elements else [], + build_process=lambda process_name, blob_filename: self._build_add_elements_from_blob_process( + dimension_name=dimension_name, + hierarchy_name=hierarchy_name, + process_name=process_name, + blob_filename=blob_filename, + ), + remove_blob=remove_blob, + **kwargs, + ) + + def _build_add_elements_from_blob_process( + self, dimension_name: str, hierarchy_name: str, process_name: str, blob_filename: str + ) -> Process: + element_variable = "vElement" + type_variable = "vType" + process = self._build_blob_datasource_process( + process_name=process_name, + blob_filename=blob_filename, + variables=[(element_variable, "String"), (type_variable, "String")], + ) + # empty insertion point -> elements are appended to the hierarchy + process.metadata_procedure = ( + f"HierarchyElementInsert('{dimension_name}','{hierarchy_name}','',{element_variable},{type_variable});" + ) + return process + def add_element_attributes( self, dimension_name: str, hierarchy_name: str, element_attributes: List[ElementAttribute], **kwargs ): diff --git a/TM1py/Services/HierarchyService.py b/TM1py/Services/HierarchyService.py index b1c22d3b..b6975f2a 100644 --- a/TM1py/Services/HierarchyService.py +++ b/TM1py/Services/HierarchyService.py @@ -636,11 +636,12 @@ def update_or_create_hierarchy_from_dataframe( new_elements[element_name] = Element.Types.CONSOLIDATED if new_elements: - # add these elements to hierarchy in tm1 + # add these elements to hierarchy in tm1 (blob path for admins scales to large sets) self.elements.add_elements( dimension_name=dimension_name, hierarchy_name=hierarchy_name, elements=(Element(element_name, element_type) for element_name, element_type in new_elements.items()), + use_blob=self.is_admin, ) # define the attribute columns in df. Applies to all elements in df, not only new ones. @@ -782,7 +783,12 @@ def update_or_create_hierarchy_from_dataframe( (k, v): w for (k, v), w in edges.items() if (k, v) not in current_edges or w != current_edges[(k, v)] } if new_edges: - self.elements.add_edges(dimension_name=dimension_name, hierarchy_name=hierarchy_name, edges=new_edges) + self.elements.add_edges( + dimension_name=dimension_name, + hierarchy_name=hierarchy_name, + edges=new_edges, + use_blob=self.is_admin, + ) if hierarchy_sort_order: self._implement_hierarchy_sort_order(dimension_name, hierarchy_name, hierarchy_sort_order) diff --git a/Tests/ElementService_test.py b/Tests/ElementService_test.py index 91717f71..4b1e0fcc 100644 --- a/Tests/ElementService_test.py +++ b/Tests/ElementService_test.py @@ -17,6 +17,7 @@ ) from TM1py.Objects import Dimension, Element, ElementAttribute, Hierarchy from TM1py.Services import TM1Service +from TM1py.Services.ElementService import ElementService class TestElementService(unittest.TestCase): @@ -1129,6 +1130,71 @@ def test_add_elements_fail(self): element = Element(self.years[0], "Numeric") self.tm1.elements.add_elements(self.dimension_name, self.dimension_name, [element]) + @skip_if_version_lower_than(version="11.4") + def test_add_elements_use_blob(self): + elements = [Element("Element1", "Numeric"), Element("Element2", "String")] + self.tm1.elements.add_elements(self.dimension_name, self.hierarchy_name, elements, use_blob=True) + + for element in elements: + self.assertEqual(element, self.tm1.elements.get(self.dimension_name, self.hierarchy_name, element.name)) + + @skip_if_version_lower_than(version="11.4") + def test_add_elements_use_blob_with_consolidations(self): + elements = [ + Element("Leaf1", "Numeric"), + Element("Leaf2", "Numeric"), + Element("Cons A", "Consolidated"), + Element("Cons B", "Consolidated"), + ] + self.tm1.elements.add_elements(self.dimension_name, self.hierarchy_name, elements, use_blob=True) + + for element in elements: + self.assertEqual(element, self.tm1.elements.get(self.dimension_name, self.hierarchy_name, element.name)) + + @skip_if_version_lower_than(version="11.4") + def test_add_edges_use_blob(self): + # add new leaves first, then wire them under an existing consolidation via blob + self.tm1.elements.add_elements( + self.dimension_name, + self.hierarchy_name, + [Element("2050", "Numeric"), Element("2051", "Numeric")], + use_blob=True, + ) + self.tm1.elements.add_edges( + dimension_name=self.dimension_name, + hierarchy_name=self.hierarchy_name, + edges={("Total Years", "2050"): 1, ("Total Years", "2051"): 1}, + use_blob=True, + ) + + edges = self.tm1.elements.get_edges(self.dimension_name, self.hierarchy_name) + self.assertEqual(edges[("Total Years", "2050")], 1) + self.assertEqual(edges[("Total Years", "2051")], 1) + + @skip_if_version_lower_than(version="11.4") + def test_add_elements_and_edges_use_blob_build_consolidation(self): + # build a fresh consolidation with leaves entirely via blob (elements first, then edges) + self.tm1.elements.add_elements( + self.dimension_name, + self.hierarchy_name, + [Element("New Cons", "Consolidated"), Element("Child A", "Numeric"), Element("Child B", "Numeric")], + use_blob=True, + ) + self.tm1.elements.add_edges( + dimension_name=self.dimension_name, + hierarchy_name=self.hierarchy_name, + edges={("New Cons", "Child A"): 1, ("New Cons", "Child B"): 2}, + use_blob=True, + ) + + self.assertEqual( + Element.Types.CONSOLIDATED, + self.tm1.elements.get(self.dimension_name, self.hierarchy_name, "New Cons").element_type, + ) + edges = self.tm1.elements.get_edges(self.dimension_name, self.hierarchy_name) + self.assertEqual(edges[("New Cons", "Child A")], 1) + self.assertEqual(edges[("New Cons", "Child B")], 2) + def test_add_element_attributes_single(self): element_attribute = ElementAttribute(name="Attribute1", attribute_type="String") self.tm1.elements.add_element_attributes(self.dimension_name, self.dimension_name, [element_attribute]) @@ -1640,5 +1706,85 @@ def tearDownClass(cls): cls.tm1.logout() +class _FakeRest: + """Minimal stand-in for RestService exposing just the version (no server connection).""" + + def __init__(self, version: str): + self.version = version + + +class TestElementServiceBlobProcessBuilders(unittest.TestCase): + """Unit tests for the blob-based element/edge helpers (no server connection).""" + + @staticmethod + def _element_service(version: str = "12.0.0") -> ElementService: + service = object.__new__(ElementService) + service._rest = _FakeRest(version) + return service + + def test_blob_datasource_process_sets_utf8_and_variables(self): + service = self._element_service("12.0.0") + process = service._build_blob_datasource_process( + process_name="p", blob_filename="f.csv", variables=[("vA", "String"), ("vN", "Numeric")] + ) + self.assertIn("SetInputCharacterSet('TM1CS_UTF8')", process.prolog_procedure) + self.assertEqual(process.datasource_data_source_name_for_server, "f.csv") + names_and_types = {v["Name"]: v["Type"] for v in process.variables} + self.assertEqual(names_and_types, {"vA": "String", "vN": "Numeric"}) + + def test_blob_datasource_process_appends_blb_on_v11(self): + service = self._element_service("11.8.0") + process = service._build_blob_datasource_process(process_name="p", blob_filename="f.csv", variables=[]) + # v11 auto-appends a .blb extension to documents created via the contents api + self.assertEqual(process.datasource_data_source_name_for_server, "f.csv.blb") + + def test_build_add_elements_process(self): + service = self._element_service("12.0.0") + process = service._build_add_elements_from_blob_process("Dim", "Dim", "p", "f.csv") + self.assertEqual([v["Name"] for v in process.variables], ["vElement", "vType"]) + self.assertTrue(all(v["Type"] == "String" for v in process.variables)) + self.assertIn("HierarchyElementInsert('Dim','Dim','',vElement,vType);", process.metadata_procedure) + + def test_build_add_edges_process_has_numeric_weight(self): + service = self._element_service("12.0.0") + process = service._build_add_edges_from_blob_process("Dim", "Dim", "p", "f.csv") + names_and_types = {v["Name"]: v["Type"] for v in process.variables} + self.assertEqual(names_and_types, {"vParent": "String", "vChild": "String", "vWeight": "Numeric"}) + self.assertIn("HierarchyElementComponentAdd('Dim','Dim',vParent,vChild,vWeight);", process.metadata_procedure) + + def test_refactored_delete_edges_process_preserves_skip_invalid_guard(self): + service = self._element_service("12.0.0") + process = service._build_unwind_hierarchy_edges_from_blob_process( + dimension_name="Dim", + hierarchy_name="Dim", + process_name="p", + blob_filename="f.csv", + skip_invalid_edges=True, + ) + self.assertEqual([v["Name"] for v in process.variables], ["vParent", "vChild"]) + self.assertIn("ElementIsParent('Dim','Dim',vParent,vChild)", process.metadata_procedure) + self.assertIn("HierarchyElementComponentDelete('Dim','Dim',vParent,vChild);", process.metadata_procedure) + + def test_add_elements_dispatches_to_blob(self): + service = self._element_service("12.0.0") + captured = {} + service.add_elements_use_blob = lambda **kwargs: captured.update(kwargs) or "BLOB" + result = service.add_elements("Dim", "Dim", [Element("e", "Numeric")], use_blob=True) + self.assertEqual(result, "BLOB") + self.assertEqual(captured["dimension_name"], "Dim") + self.assertEqual(captured["hierarchy_name"], "Dim") + self.assertTrue(captured["remove_blob"]) + + def test_add_edges_dispatches_to_blob(self): + service = self._element_service("12.0.0") + captured = {} + service.add_edges_use_blob = lambda **kwargs: captured.update(kwargs) or "BLOB" + result = service.add_edges("Dim", edges={("Total", "Child1"): 1}, use_blob=True) + self.assertEqual(result, "BLOB") + # hierarchy_name defaults to the dimension name before dispatch + self.assertEqual(captured["hierarchy_name"], "Dim") + self.assertEqual(captured["edges"], {("Total", "Child1"): 1}) + + if __name__ == "__main__": unittest.main() diff --git a/Tests/HierarchyService_test.py b/Tests/HierarchyService_test.py index 880b7e11..3e8faf0c 100644 --- a/Tests/HierarchyService_test.py +++ b/Tests/HierarchyService_test.py @@ -991,6 +991,54 @@ def test_update_or_create_hierarchy_from_dataframe_with_consolidations(self): ) self._verify_region_dimension(hierarchy) + def test_update_or_create_hierarchy_from_dataframe_bulk_with_many_consolidations(self): + # builds many leaves + multiple consolidations; for admins this drives the blob-based + # element/edge add path end to end + columns = [ + self.region_dimension_name, + "ElementType", + "level001", + "level000", + "level001_weight", + "level000_weight", + ] + regions = [f"Region {i:02d}" for i in range(10)] + data = [] + for i in range(100): + region = regions[i % len(regions)] + data.append([f"Country {i:03d}", "Numeric", region, "World", 1, 1]) + for region in regions: + data.append([region, "Consolidated", "", "", 0, 0]) + data.append(["World", "Consolidated", "", "", 0, 0]) + df = DataFrame(data=data, columns=columns) + + self.tm1.hierarchies.update_or_create_hierarchy_from_dataframe( + dimension_name=self.region_dimension_name, + hierarchy_name=self.region_dimension_name, + df=df, + element_column=self.region_dimension_name, + element_type_column="ElementType", + unwind_all=True, + ) + + hierarchy = self.tm1.hierarchies.get( + dimension_name=self.region_dimension_name, hierarchy_name=self.region_dimension_name + ) + + # 100 leaves + 10 region consolidations + World + self.assertEqual(111, len(hierarchy.elements)) + elements_by_name = {element.name: element for element in hierarchy.elements.values()} + self.assertEqual(Element.Types.NUMERIC, elements_by_name["Country 000"].element_type) + self.assertEqual(Element.Types.NUMERIC, elements_by_name["Country 099"].element_type) + for region in regions: + self.assertEqual(Element.Types.CONSOLIDATED, elements_by_name[region].element_type) + self.assertEqual(Element.Types.CONSOLIDATED, elements_by_name["World"].element_type) + + # 100 country->region edges + 10 region->World edges + self.assertEqual(110, len(hierarchy.edges)) + self.assertEqual(1, hierarchy.edges["Region 00", "Country 000"]) + self.assertEqual(1, hierarchy.edges["World", "Region 00"]) + def test_update_or_create_hierarchy_from_dataframe_with_wrong_case_in_level_column(self): # create with correct case first columns = [