-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathnode_node.py
More file actions
623 lines (513 loc) · 24.8 KB
/
Copy pathnode_node.py
File metadata and controls
623 lines (513 loc) · 24.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
# -*- coding: utf-8 -*-
"""
A module containing NodeEditor's class for representing `Node`.
"""
from collections import OrderedDict
from nodeeditor.node_graphics_node import QDMGraphicsNode
from nodeeditor.node_content_widget import QDMNodeContentWidget
from nodeeditor.node_serializable import Serializable
from nodeeditor.node_socket import Socket, LEFT_BOTTOM, LEFT_CENTER, LEFT_TOP, RIGHT_BOTTOM, RIGHT_CENTER, RIGHT_TOP
from nodeeditor.utils_no_qt import dumpException, pp
DEBUG = False
class Node(Serializable):
"""
Class representing `Node` in the `Scene`.
"""
GraphicsNode_class = QDMGraphicsNode
NodeContent_class = QDMNodeContentWidget
Socket_class = Socket
def __init__(self, scene: 'Scene', title: str="Undefined Node", inputs: list=[], outputs: list=[]):
"""
:param scene: reference to the :class:`~nodeeditor.node_scene.Scene`
:type scene: :class:`~nodeeditor.node_scene.Scene`
:param title: Node Title shown in Scene
:type title: str
:param inputs: list of :class:`~nodeeditor.node_socket.Socket` types from which the `Sockets` will be auto created
:param outputs: list of :class:`~nodeeditor.node_socket.Socket` types from which the `Sockets` will be auto created
:Instance Attributes:
- **scene** - reference to the :class:`~nodeeditor.node_scene.Scene`
- **grNode** - Instance of :class:`~nodeeditor.node_graphics_node.QDMGraphicsNode` handling graphical representation in the ``QGraphicsScene``. Automatically created in the constructor
- **content** - Instance of :class:`~nodeeditor.node_graphics_content.QDMGraphicsContent` which is child of ``QWidget`` representing container for all inner widgets inside of the Node. Automatically created in the constructor
- **inputs** - list containin Input :class:`~nodeeditor.node_socket.Socket` instances
- **outputs** - list containin Output :class:`~nodeeditor.node_socket.Socket` instances
"""
super().__init__()
self._title = title
self.scene = scene
# just to be sure, init these variables
self.content = None
self.grNode = None
self.initInnerClasses()
self.initSettings()
self.title = title
self.scene.addNode(self)
self.scene.grScene.addItem(self.grNode)
# create socket for inputs and outputs
self.inputs = []
self.outputs = []
self.initSockets(inputs, outputs)
# dirty and evaluation
self._is_dirty = False
self._is_invalid = False
def __str__(self):
return "<%s:%s %s..%s>" % (self.title, self.__class__.__name__,hex(id(self))[2:5], hex(id(self))[-3:])
@property
def title(self):
"""
Title shown in the scene
:getter: return current Node title
:setter: sets Node title and passes it to Graphics Node class
:type: ``str``
"""
return self._title
@title.setter
def title(self, value):
self._title = value
self.grNode.title = self._title
@property
def pos(self):
"""
Retrieve Node's position in the Scene
:return: Node position
:rtype: ``QPointF``
"""
return self.grNode.pos() # QPointF
def setPos(self, x: float, y: float):
"""
Sets position of the Graphics Node
:param x: X `Scene` position
:param y: Y `Scene` position
"""
self.grNode.setPos(x, y)
for inputs in self.inputs:
for edge in inputs.edges:
edge.grEdge.calcPath()
edge.updatePositions()
for outputs in self.outputs:
for edge in outputs.edges:
edge.grEdge.calcPath()
edge.updatePositions()
def initInnerClasses(self):
"""Sets up graphics Node (PyQt) and Content Widget"""
node_content_class = self.getNodeContentClass()
graphics_node_class = self.getGraphicsNodeClass()
if node_content_class is not None: self.content = node_content_class(self)
if graphics_node_class is not None: self.grNode = graphics_node_class(self)
def getNodeContentClass(self):
"""Returns class representing nodeeditor content"""
return self.__class__.NodeContent_class
def getGraphicsNodeClass(self):
return self.__class__.GraphicsNode_class
def initSettings(self):
"""Initialize properties and socket information"""
self.socket_spacing = 22
self.input_socket_position = LEFT_BOTTOM
self.output_socket_position = RIGHT_TOP
self.input_multi_edged = False
self.output_multi_edged = True
self.socket_offsets = {
LEFT_BOTTOM: -1,
LEFT_CENTER: -1,
LEFT_TOP: -1,
RIGHT_BOTTOM: 1,
RIGHT_CENTER: 1,
RIGHT_TOP: 1,
}
def initSockets(self, inputs: list, outputs: list, reset: bool=True):
"""
Create sockets for inputs and outputs
:param inputs: list of Socket Types (int) or tuples of (socket_type, label)
:type inputs: ``list``
:param outputs: list of Socket Types (int) or tuples of (socket_type, label)
:type outputs: ``list``
:param reset: if ``True`` destroys and removes old `Sockets`
:type reset: ``bool``
"""
if reset:
# clear old sockets
if hasattr(self, 'inputs') and hasattr(self, 'outputs'):
# remove grSockets from scene
for socket in (self.inputs+self.outputs):
self.scene.grScene.removeItem(socket.grSocket)
self.inputs = []
self.outputs = []
# create new sockets
counter = 0
for item in inputs:
# Handle both old format (int) and new format (tuple)
if isinstance(item, tuple):
socket_type, label = item
else:
socket_type, label = item, ""
socket = self.__class__.Socket_class(
node=self, index=counter, position=self.input_socket_position,
socket_type=socket_type, multi_edges=self.input_multi_edged,
count_on_this_node_side=len(inputs), is_input=True, label=label
)
counter += 1
self.inputs.append(socket)
counter = 0
for item in outputs:
# Handle both old format (int) and new format (tuple)
if isinstance(item, tuple):
socket_type, label = item
else:
socket_type, label = item, ""
socket = self.__class__.Socket_class(
node=self, index=counter, position=self.output_socket_position,
socket_type=socket_type, multi_edges=self.output_multi_edged,
count_on_this_node_side=len(outputs), is_input=False, label=label
)
counter += 1
self.outputs.append(socket)
def onEdgeConnectionChanged(self, new_edge: 'Edge'):
"""
Event handling that any connection (`Edge`) has changed. Currently not used...
:param new_edge: reference to the changed :class:`~nodeeditor.node_edge.Edge`
:type new_edge: :class:`~nodeeditor.node_edge.Edge`
"""
pass
def onInputChanged(self, socket: 'Socket'):
"""Event handling when Node's input Edge has changed. We auto-mark this `Node` to be `Dirty` with all it's
descendants
:param socket: reference to the changed :class:`~nodeeditor.node_socket.Socket`
:type socket: :class:`~nodeeditor.node_socket.Socket`
"""
self.markDirty()
self.markDescendantsDirty()
def onDeserialized(self, data: dict):
"""Event manually called when this node was deserialized. Currently called when node is deserialized from scene
Passing `data` containing the data which have been deserialized """
pass
def onDoubleClicked(self, event):
"""Event handling double click on Graphics Node in `Scene`"""
pass
def doSelect(self, new_state: bool=True):
"""Shortcut method for selecting/deselecting the `Node`
:param new_state: ``True`` if you want to select the `Node`. ``False`` if you want to deselect the `Node`
:type new_state: ``bool``
"""
self.grNode.doSelect(new_state)
def isSelected(self):
"""Returns ``True`` if current `Node` is selected"""
return self.grNode.isSelected()
def hasConnectedEdge(self, edge: 'Edge'):
"""Returns ``True`` if edge is connected to any :class:`~nodeeditor.node_socket.Socket` of this `Node`"""
for socket in (self.inputs + self.outputs):
if socket.isConnected(edge):
return True
return False
def getSocketPosition(self, index: int, position: int, num_out_of: int=1) -> '(x, y)':
"""
Get the relative `x, y` position of a :class:`~nodeeditor.node_socket.Socket`. This is used for placing
the `Graphics Sockets` on `Graphics Node`.
:param index: Order number of the Socket. (0, 1, 2, ...)
:type index: ``int``
:param position: `Socket Position Constant` describing where the Socket is located. See :ref:`socket-position-constants`
:type position: ``int``
:param num_out_of: Total number of Sockets on this `Socket Position`
:type num_out_of: ``int``
:return: Position of described Socket on the `Node`
:rtype: ``x, y``
"""
x = self.socket_offsets[position] if (position in (LEFT_TOP, LEFT_CENTER, LEFT_BOTTOM)) else self.grNode.width + self.socket_offsets[position]
if position in (LEFT_BOTTOM, RIGHT_BOTTOM):
# start from bottom
y = self.grNode.height - self.grNode.edge_roundness - self.grNode.title_vertical_padding - index * self.socket_spacing
elif position in (LEFT_CENTER, RIGHT_CENTER):
num_sockets = num_out_of
node_height = self.grNode.height
top_offset = self.grNode.title_height + 2 * self.grNode.title_vertical_padding + self.grNode.edge_padding
available_height = node_height - top_offset
total_height_of_all_sockets = num_sockets * self.socket_spacing
new_top = available_height - total_height_of_all_sockets
# y = top_offset + index * self.socket_spacing + new_top / 2
y = top_offset + available_height/2.0 + (index-0.5)*self.socket_spacing
if num_sockets > 1:
y -= self.socket_spacing * (num_sockets-1)/2
elif position in (LEFT_TOP, RIGHT_TOP):
# start from top
y = self.grNode.title_height + self.grNode.title_vertical_padding + self.grNode.edge_roundness + index * self.socket_spacing
else:
# this should never happen
y = 0
return [x, y]
def getSocketScenePosition(self, socket: 'Socket') -> '(x, y)':
"""
Get absolute Socket position in the Scene
:param socket: `Socket` which position we want to know
:return: (x, y) Socket's scene position
"""
nodepos = self.grNode.pos()
socketpos = self.getSocketPosition(socket.index, socket.position, socket.count_on_this_node_side)
return (nodepos.x() + socketpos[0], nodepos.y() + socketpos[1])
def updateConnectedEdges(self):
"""Recalculate (Refresh) positions of all connected `Edges`. Used for updating Graphics Edges"""
for socket in self.inputs + self.outputs:
# if socket.hasEdge():
for edge in socket.edges:
edge.updatePositions()
def remove(self):
"""
Safely remove this Node
"""
if DEBUG: print("> Removing Node", self)
if DEBUG: print(" - remove all edges from sockets")
for socket in (self.inputs+self.outputs):
# if socket.hasEdge():
for edge in socket.edges.copy():
if DEBUG: print(" - removing from socket:", socket, "edge:", edge)
edge.remove()
if DEBUG: print(" - remove grNode")
self.scene.grScene.removeItem(self.grNode)
self.grNode = None
if DEBUG: print(" - remove node from the scene")
self.scene.removeNode(self)
if DEBUG: print(" - everything was done.")
# node evaluation stuff
def isDirty(self) -> bool:
"""Is this node marked as `Dirty`
:return: ``True`` if `Node` is marked as `Dirty`
:rtype: ``bool``
"""
return self._is_dirty
def markDirty(self, new_value: bool=True):
"""Mark this `Node` as `Dirty`. See :ref:`evaluation` for more
:param new_value: ``True`` if this `Node` should be `Dirty`. ``False`` if you want to un-dirty this `Node`
:type new_value: ``bool``
"""
self._is_dirty = new_value
if self._is_dirty: self.onMarkedDirty()
def onMarkedDirty(self):
"""Called when this `Node` has been marked as `Dirty`. This method is supposed to be overridden"""
pass
def markChildrenDirty(self, new_value: bool=True):
"""Mark all first level children of this `Node` to be `Dirty`. Not this `Node` it self. Not other descendants
:param new_value: ``True`` if children should be `Dirty`. ``False`` if you want to un-dirty children
:type new_value: ``bool``
"""
for other_node in self.getChildrenNodes():
other_node.markDirty(new_value)
def markDescendantsDirty(self, new_value: bool=True):
"""Mark all children and descendants of this `Node` to be `Dirty`. Not this `Node` it self
:param new_value: ``True`` if children and descendants should be `Dirty`. ``False`` if you want to un-dirty children and descendants
:type new_value: ``bool``
"""
for other_node in self.getChildrenNodes():
other_node.markDirty(new_value)
other_node.markDescendantsDirty(new_value)
def isInvalid(self) -> bool:
"""Is this node marked as `Invalid`?
:return: ``True`` if `Node` is marked as `Invalid`
:rtype: ``bool``
"""
return self._is_invalid
def markInvalid(self, new_value: bool=True):
"""Mark this `Node` as `Invalid`. See :ref:`evaluation` for more
:param new_value: ``True`` if this `Node` should be `Invalid`. ``False`` if you want to make this `Node` valid
:type new_value: ``bool``
"""
self._is_invalid = new_value
if self._is_invalid: self.onMarkedInvalid()
def onMarkedInvalid(self):
"""Called when this `Node` has been marked as `Invalid`. This method is supposed to be overridden"""
pass
def markChildrenInvalid(self, new_value: bool=True):
"""Mark all first level children of this `Node` to be `Invalid`. Not this `Node` it self. Not other descendants
:param new_value: ``True`` if children should be `Invalid`. ``False`` if you want to make children valid
:type new_value: ``bool``
"""
for other_node in self.getChildrenNodes():
other_node.markInvalid(new_value)
def markDescendantsInvalid(self, new_value: bool=True):
"""Mark all children and descendants of this `Node` to be `Invalid`. Not this `Node` it self
:param new_value: ``True`` if children and descendants should be `Invalid`. ``False`` if you want to make children and descendants valid
:type new_value: ``bool``
"""
for other_node in self.getChildrenNodes():
other_node.markInvalid(new_value)
other_node.markDescendantsInvalid(new_value)
def eval(self, index=0):
"""Evaluate this `Node`. This is supposed to be overridden. See :ref:`evaluation` for more"""
self.markDirty(False)
self.markInvalid(False)
return 0
def evalChildren(self):
"""Evaluate all children of this `Node`"""
for node in self.getChildrenNodes():
node.eval()
# traversing nodes functions
def getChildrenNodes(self) -> 'List[Node]':
"""
Retreive all first-level children connected to this `Node` `Outputs`
:return: list of `Nodes` connected to this `Node` from all `Outputs`
:rtype: List[:class:`~nodeeditor.node_node.Node`]
"""
if self.outputs == []: return []
other_nodes = []
for ix in range(len(self.outputs)):
for edge in self.outputs[ix].edges:
other_node = edge.getOtherSocket(self.outputs[ix]).node
other_nodes.append(other_node)
return other_nodes
def getInput(self, index: int=0) -> ['Node', None]:
"""
Get the **first** `Node` connected to the Input specified by `index`
:param index: Order number of the `Input Socket`
:type index: ``int``
:return: :class:`~nodeeditor.node_node.Node` which is connected to the specified `Input` or ``None`` if
there is no connection or the index is out of range
:rtype: :class:`~nodeeditor.node_node.Node` or ``None``
"""
if index < 0 or index >= len(self.inputs):
return None
try:
input_socket = self.inputs[index]
if len(input_socket.edges) == 0: return None
connecting_edge = input_socket.edges[0]
other_socket = connecting_edge.getOtherSocket(self.inputs[index])
return other_socket.node
except Exception as e:
dumpException(e)
return None
def getInputWithSocket(self, index: int=0) -> [('Node', 'Socket'), (None, None)]:
"""
Get the **first** `Node` connected to the Input specified by `index` and the connection `Socket`
:param index: Order number of the `Input Socket`
:type index: ``int``
:return: Tuple containing :class:`~nodeeditor.node_node.Node` and :class:`~nodeeditor.node_socket.Socket` which
is connected to the specified `Input` or ``None`` if there is no connection or the index is out of range
:rtype: (:class:`~nodeeditor.node_node.Node`, :class:`~nodeeditor.node_socket.Socket`)
"""
try:
input_socket = self.inputs[index]
if len(input_socket.edges) == 0: return None, None
connecting_edge = input_socket.edges[0]
other_socket = connecting_edge.getOtherSocket(self.inputs[index])
return other_socket.node, other_socket
except Exception as e:
dumpException(e)
return None, None
def getInputWithSocketIndex(self, index: int=0) -> ('Node', int):
"""
Get the **first** `Node` connected to the Input specified by `index` and the connection `Socket`
:param index: Order number of the `Input Socket`
:type index: ``int``
:return: Tuple containing :class:`~nodeeditor.node_node.Node` and :class:`~nodeeditor.node_socket.Socket` which
is connected to the specified `Input` or ``None`` if there is no connection or the index is out of range
:rtype: (:class:`~nodeeditor.node_node.Node`, int)
"""
try:
edge = self.inputs[index].edges[0]
socket = edge.getOtherSocket(self.inputs[index])
return socket.node, socket.index
except IndexError:
# print("EXC: Trying to get input with socket index %d, but none is attached to" % index, self)
return None, None
except Exception as e:
dumpException(e)
return None, None
def getInputs(self, index: int=0) -> 'List[Node]':
"""
Get **all** `Nodes` connected to the Input specified by `index`
:param index: Order number of the `Input Socket`
:type index: ``int``
:return: all :class:`~nodeeditor.node_node.Node` instances which are connected to the
specified `Input` or ``[]`` if there is no connection or the index is out of range
:rtype: List[:class:`~nodeeditor.node_node.Node`]
"""
if index < 0 or index >= len(self.inputs):
return []
ins = []
for edge in self.inputs[index].edges:
other_socket = edge.getOtherSocket(self.inputs[index])
ins.append(other_socket.node)
return ins
def getOutputs(self, index: int=0) -> 'List[Node]':
"""
Get **all** `Nodes` connected to the Output specified by `index`
:param index: Order number of the `Output Socket`
:type index: ``int``
:return: all :class:`~nodeeditor.node_node.Node` instances which are connected to the
specified `Output` or ``[]`` if there is no connection or the index is out of range
:rtype: List[:class:`~nodeeditor.node_node.Node`]
"""
if index < 0 or index >= len(self.outputs):
return []
outs = []
for edge in self.outputs[index].edges:
other_socket = edge.getOtherSocket(self.outputs[index])
outs.append(other_socket.node)
return outs
# serialization functions
def serialize(self) -> OrderedDict:
inputs, outputs = [], []
for socket in self.inputs: inputs.append(socket.serialize())
for socket in self.outputs: outputs.append(socket.serialize())
ser_content = self.content.serialize() if isinstance(self.content, Serializable) else {}
return OrderedDict([
('id', self.id),
('title', self.title),
('pos_x', self.grNode.scenePos().x()),
('pos_y', self.grNode.scenePos().y()),
('inputs', inputs),
('outputs', outputs),
('content', ser_content),
])
def deserialize(self, data: dict, hashmap: dict={}, restore_id: bool=True, *args, **kwargs) -> bool:
try:
if restore_id: self.id = data['id']
hashmap[data['id']] = self
self.setPos(data['pos_x'], data['pos_y'])
self.title = data['title']
data['inputs'].sort(key=lambda socket: socket['index'] + socket['position'] * 10000 )
data['outputs'].sort(key=lambda socket: socket['index'] + socket['position'] * 10000 )
num_inputs = len( data['inputs'] )
num_outputs = len( data['outputs'] )
# print("> deserialize node, num inputs:", num_inputs, "num outputs:", num_outputs)
# pp(data)
# possible way to do it is reuse existing sockets...
# dont create new ones if not necessary
for socket_data in data['inputs']:
found = None
for socket in self.inputs:
# print("\t", socket, socket.index, "=?", socket_data['index'])
if socket.index == socket_data['index']:
found = socket
break
if found is None:
# print("deserialization of socket data has not found input socket with index:", socket_data['index'])
# print("actual socket data:", socket_data)
# we can create new socket for this
found = self.__class__.Socket_class(
node=self, index=socket_data['index'], position=socket_data['position'],
socket_type=socket_data['socket_type'], count_on_this_node_side=num_inputs,
is_input=True, label=socket_data.get('label', '')
)
self.inputs.append(found) # append newly created input to the list
found.deserialize(socket_data, hashmap, restore_id)
for socket_data in data['outputs']:
found = None
for socket in self.outputs:
# print("\t", socket, socket.index, "=?", socket_data['index'])
if socket.index == socket_data['index']:
found = socket
break
if found is None:
# print("deserialization of socket data has not found output socket with index:", socket_data['index'])
# print("actual socket data:", socket_data)
# we can create new socket for this
found = self.__class__.Socket_class(
node=self, index=socket_data['index'], position=socket_data['position'],
socket_type=socket_data['socket_type'], count_on_this_node_side=num_outputs,
is_input=False, label=socket_data.get('label', '')
)
self.outputs.append(found) # append newly created output to the list
found.deserialize(socket_data, hashmap, restore_id)
except Exception as e: dumpException(e)
# also deserialize the content of the node
# so far the rest was ok, now as last step the content...
if isinstance(self.content, Serializable):
res = self.content.deserialize(data['content'], hashmap)
return res
return True