-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtree_algorithm.py
More file actions
148 lines (108 loc) · 6.12 KB
/
Copy pathtree_algorithm.py
File metadata and controls
148 lines (108 loc) · 6.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# %% Import packages to use
from datatable import fread
import pandas as pd
from IPython.display import display
import networkx as nx
# %% Hierarchical Graph Class
class hierarchical_graph():
def __init__(self, data, parent, source):
self.data = data ## DataFrame
self.parent = source ## Children(Lower)
self.source = parent ## Parents(Higher)
super().__init__()
## Generate Relationship Graph
## Decendants
self.graph_descend = nx.from_pandas_edgelist(df_orig,
target=self.parent,
source=self.source,
create_using=nx.DiGraph())
self.graph_ascend = nx.from_pandas_edgelist(df_orig,
target=self.source,
source=self.parent,
create_using=nx.DiGraph())
## Print maximum level between higher node and lowest node
print(max(nx.single_source_shortest_path_length(self.graph_descend, 138875005).values()))
def get_descendent(self, node, level, save=False):
decendents = nx.descendants_at_distance(G=self.graph_descend,
source=node,
distance=level)
## displaying
results = self.data[self.data['sourceId'].isin(decendents)].copy()
results['level'] = level
display(results)
if save == True:
results[results.columns] = results[results.columns].astype(str)
results.to_excel("./"+ str(node) + "_decendent.xlsx", index=False, encoding='utf-8')
# results.to_csv("./"+ str(node) + "_decendent.csv", index=False, encoding='utf-8')
def get_descendent_all(self, node, max_level=0, relation_all=False, save=False):
## Get max depth of given node
max_depth = max(nx.single_source_shortest_path_length(self.graph_descend, node).values())
## Select max_depth when max level isn't specified
if max_level == 0:
max_level = max_depth
results = pd.DataFrame([], columns=df_orig.columns)
## Get level 1 to given level
for level in range(1, max_level + 1):
decendents = nx.descendants_at_distance(G=self.graph_descend,
source=node,
distance=level)
if relation_all:
tmp_df = self.data[self.data['sourceId'].isin(decendents)].copy()
else:
if level == 1:
parents_relation = [node]
tmp_df = self.data[(self.data['sourceId'].isin(decendents)) & (self.data['destinationId'].isin(parents_relation))].copy()
parents_relation = decendents
tmp_df['level'] = level
results = pd.concat((results, tmp_df), axis=0)
## displaying
display(results)
if save == True:
results[results.columns] = results[results.columns].astype(str)
results.to_excel("./"+ str(node) + "_decendent_all.xlsx", index=False, encoding='utf-8')
# results.to_csv("./"+ str(node) + "_decendent_all.csv", index=False, encoding='utf-8')
## Get ascendent from given level
def get_ascendent(self, node, level, save=False):
decendents = nx.descendants_at_distance(G=self.graph_ascend,
source=node,
distance=level)
## displaying
results = self.data[self.data['sourceId'].isin(decendents)].copy()
results['level'] = level
display(results)
if save == True:
results[results.columns] = results[results.columns].astype(str)
results.to_excel("./"+ str(node) + "_ascendent.xlsx", index=False, encoding='utf-8')
# results.to_csv("./"+ str(node) + "_ascendent.csv", index=False, encoding='utf-8')
## Get all ascendent from 1 to given levels
def get_ascendent_all(self, node, max_level=0, save=False):
## Get max depth of given node
max_depth = max(nx.single_source_shortest_path_length(self.graph_ascend, node).values())
## Select max_depth when max level isn't specified
if max_level == 0:
max_level = max_depth
results = pd.DataFrame([], columns=df_orig.columns)
## Get level 1 to given level
for level in range(1, max_level + 1):
decendents = nx.descendants_at_distance(G=self.graph_descend,
source=node,
distance=level)
tmp_df = self.data[self.data['sourceId'].isin(decendents)].copy()
tmp_df['level'] = level
results = pd.concat((results, tmp_df), axis=0)
## displaying
display(results)
if save == True:
results[results.columns] = results[results.columns].astype(str)
results.to_excel("./"+ str(node) + "_ascendent_all.xlsx", index=False, encoding='utf-8')
# results.to_csv("./"+ str(node) + "_ascendent_all.csv", index=False, encoding='utf-8')
# %%
if __name__ == '__main__':
df_orig = fread('./sct_relationship_codes.csv', na_strings=['NA','']).to_pandas()
test = hierarchical_graph(data=df_orig, parent='destinationId', source='sourceId')
# test.get_descendent(node=138875005, level=1, save=True)
test.get_descendent_all(node=15497006, max_level=0, relation_all=False, save=False)
# test.get_ascendent(node=138875005, level=1, save=False)
# test.get_ascendent_all(node=138875005, max_level=0, save=False)
# %%
# %%