-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpoller.py
More file actions
331 lines (293 loc) · 10.7 KB
/
poller.py
File metadata and controls
331 lines (293 loc) · 10.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
"""Background health poller — SSH every 30s, snapshots, WebSocket updates."""
from __future__ import annotations
import asyncio
import logging
from datetime import UTC, datetime
from agent import run_agent_loop
from anomaly_detection import anomaly_signature, is_metric_above_threshold
from db import store
from health_metrics import (
build_server_health,
count_restart_events,
derive_server_status,
)
from models.config import ServerConfig, load_app_config
from models.incident import AnomalyEvent
from ssh_client import detect_compose_command, is_server_reachable
from tools.infrastructure import collect_health_snapshot
from ws_hub import ws_broadcast
logger = logging.getLogger(__name__)
_poller_task: asyncio.Task | None = None
_stop_event: asyncio.Event | None = None
_consecutive_critical: dict[str, int] = {}
_last_container_statuses: dict[str, list[dict]] = {}
_baseline_refresh_counter = 0
_last_anomaly_signature: dict[str, str] = {}
def _utc_now_iso() -> str:
return datetime.now(UTC).replace(microsecond=0).isoformat()
async def _is_anomaly(
server: ServerConfig,
cpu: float | None,
mem: float | None,
disk: float | None,
restart_events: int,
*,
baseline_cpu: float | None = None,
baseline_mem: float | None = None,
margin: float = 1.15,
use_baseline: bool = True,
) -> tuple[bool, str]:
t = server.thresholds
reasons = []
if is_metric_above_threshold(
cpu, t.cpu_percent, baseline_cpu, margin=margin, use_baseline=use_baseline
):
reasons.append(f"CPU {cpu}% above threshold")
if is_metric_above_threshold(
mem, t.memory_percent, baseline_mem, margin=margin, use_baseline=use_baseline
):
reasons.append(f"Memory {mem}% above threshold")
if disk is not None and disk > t.disk_percent:
reasons.append(f"Disk {disk}% > {t.disk_percent}%")
if restart_events >= t.container_restart_count:
reasons.append(f"Container restarts ({restart_events}) in window")
if reasons:
return True, "; ".join(reasons)
return False, ""
def _match_service(server: ServerConfig, container_name: str) -> str | None:
name_lower = container_name.lower()
for svc in server.services:
if svc.name.lower() in name_lower or name_lower in svc.name.lower():
return svc.name
return None
def _container_anomaly(
server: ServerConfig, containers: list[dict]
) -> tuple[bool, str, str | None]:
for c in containers:
status = (c.get("status") or "").lower()
if "exited" in status or "restarting" in status or "dead" in status:
name = c.get("name", "unknown")
svc = _match_service(server, name)
return True, f"Container {name} unhealthy: {c.get('status')}", svc
return False, "", None
async def _trigger_agent(
server: ServerConfig, reason: str, service_name: str | None, metrics: dict
) -> None:
signature = anomaly_signature(reason, service_name)
if _last_anomaly_signature.get(server.id) == signature:
return
if await store.is_anomaly_suppressed(server.id, signature, service_name):
logger.info("Anomaly suppressed on %s: %s", server.id, signature[:80])
return
_last_anomaly_signature[server.id] = signature
asyncio.create_task(
run_agent_loop(
AnomalyEvent(
server_id=server.id,
service_name=service_name,
reason=reason,
severity="high" if service_name else "medium",
metrics=metrics,
)
),
name=f"agent-{server.id}",
)
async def _poll_server(server: ServerConfig) -> None:
if not is_server_reachable(server):
logger.debug("Skipping unconfigured server %s", server.id)
return
try:
await detect_compose_command(server)
snap = await collect_health_snapshot(server)
if not snap.get("success"):
logger.error("Poll failed %s: %s", server.id, snap.get("error"))
await store.upsert_server(
server.id, server.label, server.host, status="unknown"
)
await ws_broadcast(
{
"type": "snapshot_update",
"server_id": server.id,
"data": {
"server_id": server.id,
"label": server.label,
"status": "unknown",
"error": snap.get("error"),
},
}
)
return
containers = snap.get("containers") or []
prev = _last_container_statuses.get(server.id)
restart_events = count_restart_events(containers, prev)
_last_container_statuses[server.id] = containers
cpu = snap.get("cpu_percent")
mem = snap.get("memory_percent")
disk = snap.get("disk_percent")
crit_key = server.id
if (cpu and cpu > server.thresholds.cpu_percent) or (
mem and mem > server.thresholds.memory_percent
):
_consecutive_critical[crit_key] = _consecutive_critical.get(crit_key, 0) + 1
else:
_consecutive_critical[crit_key] = 0
from health_metrics import ContainerStatus
container_objs = [
ContainerStatus(
name=c["name"],
status=c["status"],
image=c.get("image", ""),
restart_count=c.get("restart_count", 0),
)
for c in containers
]
status = derive_server_status(
metrics={"cpu_percent": cpu, "memory_percent": mem, "disk_percent": disk},
containers=container_objs,
thresholds=server.thresholds,
consecutive_critical=_consecutive_critical.get(crit_key, 0),
)
captured_at = _utc_now_iso()
await store.insert_snapshot(
server.id,
cpu_percent=cpu,
memory_percent=mem,
disk_percent=disk,
container_statuses=containers,
raw_data={"docker_running": snap.get("docker_running")},
)
await store.upsert_server(
server.id,
server.label,
server.host,
last_seen_at=captured_at,
status=status,
)
health = build_server_health(
server.id,
server.label,
status,
cpu,
mem,
disk,
container_objs,
captured_at,
)
await ws_broadcast(
{
"type": "snapshot_update",
"server_id": server.id,
"data": health.to_ws_payload(),
}
)
metrics = {
"cpu_percent": cpu,
"memory_percent": mem,
"disk_percent": disk,
"restart_events": restart_events,
}
try:
cfg = load_app_config()
anomaly_cfg = cfg.rules.anomaly
except FileNotFoundError:
anomaly_cfg = None
margin = anomaly_cfg.baseline_margin if anomaly_cfg else 1.15
use_baseline = anomaly_cfg.use_baseline_detection if anomaly_cfg else True
baseline_row = await store.get_baseline(server.id)
baseline_cpu = baseline_row.get("cpu_p95") if baseline_row else None
baseline_mem = baseline_row.get("memory_p95") if baseline_row else None
is_bad, reason = await _is_anomaly(
server,
cpu,
mem,
disk,
restart_events,
baseline_cpu=baseline_cpu,
baseline_mem=baseline_mem,
margin=margin,
use_baseline=use_baseline,
)
svc_from_container: str | None = None
if not is_bad:
is_bad, reason, svc_from_container = _container_anomaly(server, containers)
if is_bad:
logger.warning("Anomaly detected on %s: %s", server.id, reason)
await _trigger_agent(server, reason, svc_from_container, metrics)
else:
_last_anomaly_signature.pop(server.id, None)
except Exception as exc:
err_msg = str(exc).split("\n")[0][:200]
logger.error("Poller error for %s: %s", server.id, err_msg)
await store.upsert_server(server.id, server.label, server.host, status="unknown")
await ws_broadcast(
{
"type": "snapshot_update",
"server_id": server.id,
"data": {
"server_id": server.id,
"label": server.label,
"status": "unknown",
"error": err_msg,
},
}
)
async def _maybe_refresh_baselines() -> None:
global _baseline_refresh_counter
_baseline_refresh_counter += 1
if _baseline_refresh_counter % 720 != 0: # ~6h at 30s interval
return
try:
cfg = load_app_config()
for server in cfg.servers.servers:
if not is_server_reachable(server):
continue
cpu_p95, mem_p95 = await store.compute_baseline_p95(server.id, hours=24)
await store.upsert_baseline(server.id, cpu_p95, mem_p95)
except FileNotFoundError:
pass
async def run_poller_loop() -> None:
global _stop_event
_stop_event = asyncio.Event()
logger.info("Poller started")
interval = 30
while not _stop_event.is_set():
try:
cfg = load_app_config()
interval = cfg.rules.automation.poll_interval_seconds
await asyncio.gather(
*[_poll_server(s) for s in cfg.servers.servers],
return_exceptions=True,
)
await _maybe_refresh_baselines()
except FileNotFoundError as exc:
logger.warning("Poller: %s", exc)
except asyncio.CancelledError:
raise
except Exception:
logger.exception("Poller loop error")
try:
await asyncio.wait_for(_stop_event.wait(), timeout=interval)
break
except TimeoutError:
pass
except FileNotFoundError:
await asyncio.sleep(30)
async def start_poller() -> None:
global _poller_task
if _poller_task is not None and not _poller_task.done():
return
_poller_task = asyncio.create_task(run_poller_loop(), name="health-poller")
logger.info("Poller task scheduled")
async def stop_poller() -> None:
global _poller_task, _stop_event
if _stop_event is not None:
_stop_event.set()
if _poller_task is not None:
_poller_task.cancel()
try:
await asyncio.wait_for(_poller_task, timeout=15.0)
except (asyncio.CancelledError, TimeoutError):
pass
_poller_task = None
_stop_event = None
logger.info("Poller stopped")