Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
288 changes: 228 additions & 60 deletions TM1py/Services/ElementService.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,42 +235,18 @@ def delete_edges_use_blob(
:param kwargs: Additional arguments for the process execution.
:return: None
"""
if not edges:
return

process_service = ProcessService(self._rest)
file_service = FileService(self._rest)

unique_name = self.suggest_unique_object_name()

# Transform cells to format that's consumable for TI
csv_content = StringIO()
csv_writer = csv.writer(csv_content, delimiter=",", quoting=csv.QUOTE_ALL)
csv_writer.writerows(list(edge) for edge in edges)

file_name = f"{unique_name}.csv"
file_service.create(file_name=file_name, file_content=csv_content.getvalue().encode("utf-8"), **kwargs)

try:
# Create and execute unbound TI process to delete edges using blob file
process = self._build_unwind_hierarchy_edges_from_blob_process(
return self._run_blob_process(
rows=[list(edge) for edge in edges] if edges else [],
build_process=lambda process_name, blob_filename: self._build_unwind_hierarchy_edges_from_blob_process(
dimension_name=dimension_name,
hierarchy_name=hierarchy_name,
process_name=unique_name,
blob_filename=file_name,
process_name=process_name,
blob_filename=blob_filename,
skip_invalid_edges=skip_invalid_edges,
)

success, status, log_file = process_service.execute_process_with_return(process=process, **kwargs)
if not success:
if status in ["HasMinorErrors"]:
raise TM1pyWritePartialFailureException([status], [log_file], 1)
else:
raise TM1pyWriteFailureException([status], [log_file])

finally:
if remove_blob:
file_service.delete(file_name=file_name)
),
remove_blob=remove_blob,
**kwargs,
)

def _build_unwind_hierarchy_edges_from_blob_process(
self,
Expand All @@ -280,11 +256,43 @@ def _build_unwind_hierarchy_edges_from_blob_process(
blob_filename: str,
skip_invalid_edges: bool = True,
) -> Process:
parent_variable = "vParent"
child_variable = "vChild"
process = self._build_blob_datasource_process(
process_name=process_name,
blob_filename=blob_filename,
variables=[(parent_variable, "String"), (child_variable, "String")],
)

# v11 automatically adds blb file extensions to documents created via the contents api
# Write the statement for delete component in hierarchy
if skip_invalid_edges:
delete_component = (
f"IF(ElementIsParent('{dimension_name}','{hierarchy_name}',{parent_variable},{child_variable})=1);"
f"HierarchyElementComponentDelete('{dimension_name}','{hierarchy_name}',{parent_variable},{child_variable});"
f"ENDIF;"
)
else:
delete_component = (
f"HierarchyElementComponentDelete('{dimension_name}','{hierarchy_name}',"
f"{parent_variable},{child_variable});"
)

process.metadata_procedure = delete_component
return process

def _build_blob_datasource_process(
self, process_name: str, blob_filename: str, variables: List[Tuple[str, str]]
) -> Process:
"""Build an unbound Process that reads the given CSV blob as an ASCII data source.

Declares `variables` (a list of (name, type) tuples, each type 'String' or 'Numeric') and
sets UTF-8 input encoding. The caller is responsible for the metadata / data procedure.
"""
# v11 automatically adds a .blb file extension to documents created via the contents api
if not verify_version(required_version="12", version=self.version):
blob_filename += ".blb"
hierarchyupdate_process = Process(

process = Process(
name=process_name,
datasource_type="ASCII",
datasource_ascii_header_records=0,
Expand All @@ -295,31 +303,51 @@ def _build_unwind_hierarchy_edges_from_blob_process(
datasource_ascii_thousand_separator="",
datasource_ascii_quote_character='"',
)
process.prolog_procedure = "SetInputCharacterSet('TM1CS_UTF8');"
for variable_name, variable_type in variables:
process.add_variable(name=variable_name, variable_type=variable_type)
return process

# Define encoding in Prolog section
hierarchyupdate_process.prolog_procedure = """
SetInputCharacterSet('TM1CS_UTF8');
"""
parent_variable = "vParent"
child_variable = "vChild"
hierarchyupdate_process.add_variable(name=parent_variable, variable_type="String")
hierarchyupdate_process.add_variable(name=child_variable, variable_type="String")
def _run_blob_process(self, rows: Iterable[Iterable], build_process, remove_blob: bool = True, **kwargs):
"""Upload `rows` as a CSV blob and run an unbound TI process built from it.

# Write the statement for delete component in hierarchy
if skip_invalid_edges:
delete_component = (
f"\r"
f"IF(ElementIsParent('{dimension_name}','{hierarchy_name}',{parent_variable},{child_variable})=1);"
f"HierarchyElementComponentDelete('{dimension_name}','{hierarchy_name}',{parent_variable},{child_variable});"
f"ENDIF;"
)
else:
delete_component = f"HierarchyElementComponentDelete('{dimension_name}','{hierarchy_name}',{parent_variable},{child_variable});"
Shared plumbing for the blob-based element/edge operations: serialize the rows to a CSV,
stage it as a blob via the FileService, execute the process returned by `build_process`
(a callable taking the unique process name and the blob file name), then clean up the blob.

# Define Metadata section
metadata_statement = delete_component
hierarchyupdate_process.metadata_procedure = metadata_statement
return hierarchyupdate_process
:param rows: An iterable of rows (each an iterable of cell values) written to the CSV blob.
:param build_process: Callable (process_name, blob_filename) -> Process to execute.
:param remove_blob: Whether to delete the staged blob after execution (default: True).
:return: None
"""
rows = list(rows)
if not rows:
return

process_service = ProcessService(self._rest)
file_service = FileService(self._rest)

unique_name = self.suggest_unique_object_name()

# Transform rows into a CSV that's consumable as a TI data source
csv_content = StringIO()
csv_writer = csv.writer(csv_content, delimiter=",", quoting=csv.QUOTE_ALL)
csv_writer.writerows(rows)

file_name = f"{unique_name}.csv"
file_service.create(file_name=file_name, file_content=csv_content.getvalue().encode("utf-8"), **kwargs)

try:
process = build_process(unique_name, file_name)
success, status, log_file = process_service.execute_process_with_return(process=process, **kwargs)
if not success:
if status in ["HasMinorErrors"]:
raise TM1pyWritePartialFailureException([status], [log_file], 1)
else:
raise TM1pyWriteFailureException([status], [log_file])
finally:
if remove_blob:
file_service.delete(file_name=file_name)

def get_elements(self, dimension_name: str, hierarchy_name: str, **kwargs) -> List[Element]:
url = format_url(
Expand Down Expand Up @@ -1332,18 +1360,36 @@ def remove_edge(self, dimension_name: str, hierarchy_name: str, parent: str, com
return self._rest.DELETE(url=url, **kwargs)

def add_edges(
self, dimension_name: str, hierarchy_name: str = None, edges: Dict[Tuple[str, str], int] = None, **kwargs
self,
dimension_name: str,
hierarchy_name: str = None,
edges: Dict[Tuple[str, str], int] = None,
use_blob: bool = False,
remove_blob: bool = True,
**kwargs,
) -> Response:
"""Add Edges to hierarchy. Fails if one edge already exists.

:param dimension_name:
:param hierarchy_name:
:param edges:
:param edges: A dict mapping (parent, component) tuples to the edge weight.
:param use_blob: Add the edges via an uploaded CSV blob + unbound TI process. Requires admin
permissions. Better performance on large edge sets. Returns None instead of a Response.
:param remove_blob: Remove the staged blob file after use (only with use_blob=True, default: True).
:return:
"""
if not hierarchy_name:
hierarchy_name = dimension_name

if use_blob:
return self.add_edges_use_blob(
dimension_name=dimension_name,
hierarchy_name=hierarchy_name,
edges=edges,
remove_blob=remove_blob,
**kwargs,
)

url = format_url("/Dimensions('{}')/Hierarchies('{}')/Edges", dimension_name, hierarchy_name)
body = [
{"ParentName": parent, "ComponentName": component, "Weight": float(weight)}
Expand All @@ -1352,19 +1398,141 @@ def add_edges(

return self._rest.POST(url=url, data=json.dumps(body), **kwargs)

def add_elements(self, dimension_name: str, hierarchy_name: str, elements: Iterable[Element], **kwargs):
@require_data_admin
@require_ops_admin
def add_edges_use_blob(
self,
dimension_name: str,
hierarchy_name: str = None,
edges: Dict[Tuple[str, str], int] = None,
remove_blob: bool = True,
**kwargs,
):
"""Add edges to a hierarchy via an unbound TI process having an uploaded CSV as the data source.

Mirrors `add_edges` but scales better to large edge sets. Edges that already exist surface as
minor errors (raised as TM1pyWritePartialFailureException).

:param dimension_name: The name of the dimension.
:param hierarchy_name: The name of the hierarchy. Defaults to the dimension name.
:param edges: A dict mapping (parent, component) tuples to the edge weight.
:param remove_blob: Remove the staged blob file after use (default: True).
:return: None
"""
if not hierarchy_name:
hierarchy_name = dimension_name

return self._run_blob_process(
rows=[[parent, component, weight] for (parent, component), weight in edges.items()] if edges else [],
build_process=lambda process_name, blob_filename: self._build_add_edges_from_blob_process(
dimension_name=dimension_name,
hierarchy_name=hierarchy_name,
process_name=process_name,
blob_filename=blob_filename,
),
remove_blob=remove_blob,
**kwargs,
)

def _build_add_edges_from_blob_process(
self, dimension_name: str, hierarchy_name: str, process_name: str, blob_filename: str
) -> Process:
parent_variable = "vParent"
child_variable = "vChild"
weight_variable = "vWeight"
process = self._build_blob_datasource_process(
process_name=process_name,
blob_filename=blob_filename,
variables=[(parent_variable, "String"), (child_variable, "String"), (weight_variable, "Numeric")],
)
process.metadata_procedure = (
f"HierarchyElementComponentAdd('{dimension_name}','{hierarchy_name}',"
f"{parent_variable},{child_variable},{weight_variable});"
)
return process

def add_elements(
self,
dimension_name: str,
hierarchy_name: str,
elements: Iterable[Element],
use_blob: bool = False,
remove_blob: bool = True,
**kwargs,
):
"""Add elements to hierarchy. Fails if one element already exists.

:param dimension_name:
:param hierarchy_name:
:param elements:
:param use_blob: Add the elements via an uploaded CSV blob + unbound TI process. Requires admin
permissions. Better performance on large element sets. Returns None instead of a Response.
:param remove_blob: Remove the staged blob file after use (only with use_blob=True, default: True).
:return:
"""
if use_blob:
return self.add_elements_use_blob(
dimension_name=dimension_name,
hierarchy_name=hierarchy_name,
elements=elements,
remove_blob=remove_blob,
**kwargs,
)

url = format_url("/Dimensions('{}')/Hierarchies('{}')/Elements", dimension_name, hierarchy_name)
body = [element.body_as_dict for element in elements]

return self._rest.POST(url=url, data=json.dumps(body), **kwargs)

@require_data_admin
@require_ops_admin
def add_elements_use_blob(
self,
dimension_name: str,
hierarchy_name: str,
elements: Iterable[Element],
remove_blob: bool = True,
**kwargs,
):
"""Add elements to a hierarchy via an unbound TI process having an uploaded CSV as the data source.

Mirrors `add_elements` but scales better to large element sets. Elements that already exist
surface as minor errors (raised as TM1pyWritePartialFailureException).

:param dimension_name: The name of the dimension.
:param hierarchy_name: The name of the hierarchy.
:param elements: An iterable of Element objects to add.
:param remove_blob: Remove the staged blob file after use (default: True).
:return: None
"""
return self._run_blob_process(
rows=[[element.name, str(element.element_type)[0]] for element in elements] if elements else [],
build_process=lambda process_name, blob_filename: self._build_add_elements_from_blob_process(
dimension_name=dimension_name,
hierarchy_name=hierarchy_name,
process_name=process_name,
blob_filename=blob_filename,
),
remove_blob=remove_blob,
**kwargs,
)

def _build_add_elements_from_blob_process(
self, dimension_name: str, hierarchy_name: str, process_name: str, blob_filename: str
) -> Process:
element_variable = "vElement"
type_variable = "vType"
process = self._build_blob_datasource_process(
process_name=process_name,
blob_filename=blob_filename,
variables=[(element_variable, "String"), (type_variable, "String")],
)
# empty insertion point -> elements are appended to the hierarchy
process.metadata_procedure = (
f"HierarchyElementInsert('{dimension_name}','{hierarchy_name}','',{element_variable},{type_variable});"
)
return process

def add_element_attributes(
self, dimension_name: str, hierarchy_name: str, element_attributes: List[ElementAttribute], **kwargs
):
Expand Down
10 changes: 8 additions & 2 deletions TM1py/Services/HierarchyService.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,11 +636,12 @@ def update_or_create_hierarchy_from_dataframe(
new_elements[element_name] = Element.Types.CONSOLIDATED

if new_elements:
# add these elements to hierarchy in tm1
# add these elements to hierarchy in tm1 (blob path for admins scales to large sets)
self.elements.add_elements(
dimension_name=dimension_name,
hierarchy_name=hierarchy_name,
elements=(Element(element_name, element_type) for element_name, element_type in new_elements.items()),
use_blob=self.is_admin,
)

# define the attribute columns in df. Applies to all elements in df, not only new ones.
Expand Down Expand Up @@ -782,7 +783,12 @@ def update_or_create_hierarchy_from_dataframe(
(k, v): w for (k, v), w in edges.items() if (k, v) not in current_edges or w != current_edges[(k, v)]
}
if new_edges:
self.elements.add_edges(dimension_name=dimension_name, hierarchy_name=hierarchy_name, edges=new_edges)
self.elements.add_edges(
dimension_name=dimension_name,
hierarchy_name=hierarchy_name,
edges=new_edges,
use_blob=self.is_admin,
)

if hierarchy_sort_order:
self._implement_hierarchy_sort_order(dimension_name, hierarchy_name, hierarchy_sort_order)
Expand Down
Loading
Loading