-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathstatemachine.py
More file actions
103 lines (94 loc) · 3.3 KB
/
Copy pathstatemachine.py
File metadata and controls
103 lines (94 loc) · 3.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
## State machine for task control
import importlib.util
import warnings
import socket
import time
import messageDefinitions as md
import Messenger as MR
from ctypes import *
from threading import Thread
import struct
import json
import Globals
import os
from multiprocessing import Process
import random
from State import *
import sys
sys.path.append(Globals.UTILS_PATH)
import haptics
import graphics
import msgUtils
class StateMachine(object):
def __init__(self, configFile, saveFilePrefix):
config = json.load(open(configFile))
self.running = False
self.stopped = True
self.build(config, saveFilePrefix)
# Build should return the start state of the StateMachine
def build(self, config, saveFilePrefix):
self.name = config["name"]
self.stateNameList = config["states"]
self.taskVars = config["taskVars"]
del config["taskVars"]
self.config = config
self.transitionTable = {}
self.states = {}
self.currentState = self.config["startState"]
for startupMsgs in self.config["setup_msg"]:
msgUtils.makeMessage(startupMsgs)
for stateName in self.stateNameList:
stateDict = config[stateName]
stateType = stateDict["type"]
stateTransitions = stateDict["transitions"]
stateGraphics = stateDict["graphics"]
stateHaptics = stateDict["haptics"]
StateConstructor = getattr(sys.modules[__name__], stateType)
state = StateConstructor(stateName, stateTransitions, stateGraphics, stateHaptics)
for g in stateGraphics.keys():
objectDict = stateGraphics[g]
graphics.makeObject(g, objectDict["shape"], objectDict["size"],\
objectDict["color"], objectDict["position"])
for transitionSymbol in stateTransitions.keys():
self.transitionTable[(stateName, transitionSymbol)] = stateTransitions[transitionSymbol]
self.states[stateName] = state
time.sleep(0.5)
def run(self):
self.running = True
self.stopped = False
self.paused = False
while self.currentState != "end" and self.running == True:
if self.paused == True:
continue
currentState = self.states[self.currentState]
if self.currentState in self.config["TRIAL_START"]:
trialStart = md.M_TRIAL_START()
trialStart.header.msg_type = md.TRIAL_START
trialStart.trialNum = self.taskVars["trialNum"]
packet = MR.makeMessage(trialStart)
MR.sendMessage(packet)
time.sleep(0.5)
transition = currentState.entry(self.currentState, self)
print(self.currentState, transition)
if self.currentState in self.config["TRIAL_END"]:
trialEnd = md.M_TRIAL_END()
trialEnd.header.msg_type = md.TRIAL_END
packet = MR.makeMessage(trialEnd)
MR.sendMessage(packet)
time.sleep(0.5)
nextStates = self.transitionTable[(self.currentState, transition)]
if len(nextStates) > 1:
nextState = random.choice(nextStates)
else:
nextState = nextStates[0]
if nextState == "end":
return transition
self.currentState = nextState
for endMessages in self.config["end_msg"]:
msgUtils.makeMessage(endMessages)
self.stopped = True
return "done"
if __name__ == "__main__":
taskConfig = sys.argv[1]
taskSM = StateMachine(taskConfig, "")
taskSM.run()