-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathrun_tui.py
More file actions
182 lines (152 loc) · 6.38 KB
/
run_tui.py
File metadata and controls
182 lines (152 loc) · 6.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
#!/usr/bin/env python3
"DefaultMODE Agent Manager TUI"
import io, logging, threading
import os, sys, asyncio, subprocess
if sys.platform == "win32":
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
sys.stderr.reconfigure(encoding="utf-8", errors="replace")
os.environ.setdefault("PYTHONIOENCODING", "utf-8")
# Silence HTTP client loggers that corrupt the TUI via StreamHandler
for _logger_name in (
"httpx", "httpcore", "openai", "anthropic", "urllib3",
"google", "google.auth", "google.genai",
"requests", "urllib3.connectionpool",
):
logging.getLogger(_logger_name).setLevel(logging.CRITICAL)
# Prevent any future basicConfig from adding a StreamHandler
logging.getLogger().addHandler(logging.NullHandler())
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.containers import Horizontal
from textual.widgets import Footer, Label, TabbedContent, TabPane
from tui import LaunchPage, LogsPage, PromptsPage, MemoryPage, VizPage, ConfigPage
from tui.shared import STATE
class _TextRedirector(io.TextIOBase):
"""Captures writes to stdout/stderr and routes them to a TUI label."""
def __init__(self, app: "AgentManagerApp", original: io.TextIOBase):
self._app = app
self._original = original
self._main_thread = threading.current_thread()
def write(self, text: str) -> int:
if text and text.strip():
try:
if threading.current_thread() is self._main_thread:
self._app.push_console(text.strip())
else:
self._app.call_from_thread(self._app.push_console, text.strip())
except Exception:
pass
return len(text)
def flush(self):
pass
def writable(self):
return True
@property
def encoding(self):
return getattr(self._original, "encoding", "utf-8")
class AgentManagerApp(App):
TITLE = "defaultMODE Manager"
CSS_PATH = "tui/run_bot.css"
BINDINGS = [
Binding("q", "quit", "Quit"),
Binding("1", "tab_launch", "1:Launch"),
Binding("2", "tab_logs", "2:Logs"),
Binding("3", "tab_prompts", "3:Prompts"),
Binding("4", "tab_memory", "4:Memory"),
Binding("5", "tab_viz", "5:Viz"),
Binding("6", "tab_config", "6:Config"),
]
def __init__(self):
super().__init__()
self._original_stdout = sys.stdout
self._original_stderr = sys.stderr
def compose(self) -> ComposeResult:
yield Horizontal(
Label("[dim]no synth selected[/dim]", id="status-config"),
Label("", id="status-indicator"),
id="status-bar",
)
with TabbedContent():
with TabPane("Launch", id="tab-launch"):
yield LaunchPage()
with TabPane("Logs", id="tab-logs"):
yield LogsPage()
with TabPane("Prompts", id="tab-prompts"):
yield PromptsPage()
with TabPane("Memory", id="tab-memory"):
yield MemoryPage()
with TabPane("Viz", id="tab-viz"):
yield VizPage()
with TabPane("Config", id="tab-config"):
yield ConfigPage()
yield Label("", id="console-bar")
yield Footer()
def on_mount(self):
sys.stdout = _TextRedirector(self, self._original_stdout)
sys.stderr = _TextRedirector(self, self._original_stderr)
def push_console(self, text: str):
"""Push a message to the console bar."""
label = self.query_one("#console-bar", Label)
# Show last message, truncated to one line
clean = text.replace("\n", " ").strip()
if len(clean) > 200:
clean = clean[:200] + "..."
label.update(f"[dim]{clean}[/dim]")
def update_global_status(self):
config = self.query_one("#status-config", Label)
indicator = self.query_one("#status-indicator", Label)
if STATE.selected_bot and STATE.selected_api:
parts = [f"[bold]{STATE.selected_bot}[/bold]", STATE.selected_api]
if STATE.selected_model:
parts.append(STATE.selected_model)
config.update(" / ".join(parts))
else:
config.update("[dim]no synth selected[/dim]")
count = STATE.running_count
if count > 0:
indicator.update(f"[bold]● {count} BOT{'S' if count > 1 else ''} RUNNING[/bold]")
else:
indicator.update("")
async def action_quit(self):
"""Override quit to stop all running instances before exiting."""
# Restore streams before exit
sys.stdout = self._original_stdout
sys.stderr = self._original_stderr
for bot_name, instance in list(STATE.instances.items()):
if instance.running:
instance.running = False
try:
if instance.process and instance.process.returncode is None:
pid = instance.process.pid
if sys.platform == "win32":
import signal
os.kill(pid, signal.CTRL_BREAK_EVENT)
else:
import signal
os.killpg(os.getpgid(pid), signal.SIGINT)
try:
await asyncio.wait_for(instance.process.wait(), timeout=2)
except asyncio.TimeoutError:
if sys.platform == "win32":
subprocess.run(["taskkill", "/F", "/T", "/PID", str(pid)], capture_output=True, timeout=2)
else:
instance.process.kill()
except Exception:
pass
self.exit()
def action_tab_launch(self):
self.query_one(TabbedContent).active = "tab-launch"
def action_tab_logs(self):
self.query_one(TabbedContent).active = "tab-logs"
def action_tab_prompts(self):
self.query_one(TabbedContent).active = "tab-prompts"
def action_tab_memory(self):
self.query_one(TabbedContent).active = "tab-memory"
def action_tab_viz(self):
self.query_one(TabbedContent).active = "tab-viz"
def action_tab_config(self):
self.query_one(TabbedContent).active = "tab-config"
def main():
AgentManagerApp().run()
if __name__ == "__main__":
main()