-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path_server.py
More file actions
235 lines (203 loc) · 8.39 KB
/
Copy path_server.py
File metadata and controls
235 lines (203 loc) · 8.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
"""MCP JSON-RPC server core. Transport-agnostic — see ``_mount`` for HTTP."""
from __future__ import annotations
import asyncio
import json
import re
import time
from typing import Any
from hawkapi_mcp._dispatch import dispatch_tool
from hawkapi_mcp._errors import (
INTERNAL_ERROR,
INVALID_PARAMS,
INVALID_REQUEST,
METHOD_NOT_FOUND,
RpcError,
)
from hawkapi_mcp._tools import openapi_to_tools
MCP_PROTOCOL_VERSION = "2025-06-18"
# Response headers never surfaced to the agent via ``structuredContent``.
# Anything matching these would leak credentials/session material set by the
# inner route back to the (untrusted) MCP client.
_SENSITIVE_HEADERS = frozenset({"set-cookie", "authorization"})
_SENSITIVE_HEADER_PATTERN = re.compile(r"^x-.*-(token|secret)$", re.IGNORECASE)
# Re-exported for backward compatibility with anyone importing the constants
# from this module.
_PARSE_ERROR = -32700
_INVALID_REQUEST = INVALID_REQUEST
_METHOD_NOT_FOUND = METHOD_NOT_FOUND
_INVALID_PARAMS = INVALID_PARAMS
_INTERNAL_ERROR = INTERNAL_ERROR
class MCPServer:
"""Stateless MCP server that exposes a HawkAPI app's routes as tools.
Methods supported (subset of the spec sufficient for tool execution):
* ``initialize``
* ``ping``
* ``tools/list``
* ``tools/call``
"""
def __init__(
self,
app: Any,
*,
server_name: str | None = None,
server_version: str | None = None,
include_only: set[str] | None = None,
exclude: set[str] | None = None,
cache_ttl_seconds: float | None = None,
strip_response_headers: set[str] | None = None,
) -> None:
self._app = app
self._name = server_name or getattr(app, "title", "hawkapi-mcp")
self._version = server_version or getattr(app, "version", "0.0.0")
self._include_only = include_only
self._exclude = exclude
self._cache_ttl = cache_ttl_seconds
self._strip_response_headers = {h.lower() for h in strip_response_headers or set()}
self._tools_cache: list[dict[str, Any]] | None = None
self._tools_cached_at: float = 0.0
self._tools_lock = asyncio.Lock()
async def _tools(self) -> list[dict[str, Any]]:
if self._tools_cache is not None and not self._cache_expired():
return self._tools_cache
async with self._tools_lock:
if self._tools_cache is None or self._cache_expired():
spec = self._app.openapi() if hasattr(self._app, "openapi") else {}
self._tools_cache = openapi_to_tools(
spec, include_only=self._include_only, exclude=self._exclude
)
self._tools_cached_at = time.monotonic()
return self._tools_cache
def _cache_expired(self) -> bool:
if self._cache_ttl is None:
return False
return (time.monotonic() - self._tools_cached_at) >= self._cache_ttl
def invalidate_tools(self) -> None:
self._tools_cache = None
async def handle(
self,
request: dict[str, Any],
*,
extra_headers: list[tuple[bytes, bytes]] | None = None,
client: tuple[str, int] | None = None,
) -> dict[str, Any] | None:
"""Process a single JSON-RPC message. ``None`` for valid notifications.
``extra_headers`` and ``client`` are forwarded to ``tools/call`` so the
synthetic inner request carries the real caller's credentials and
address (see :func:`hawkapi_mcp._dispatch.dispatch_tool`).
"""
if not isinstance(request, dict): # pyright: ignore[reportUnnecessaryIsInstance]
return _error(None, _INVALID_REQUEST, "Request must be a JSON object")
if request.get("jsonrpc") != "2.0":
return _error(request.get("id"), _INVALID_REQUEST, "jsonrpc must be '2.0'")
method = request.get("method")
msg_id = request.get("id")
params = request.get("params") or {}
if not isinstance(method, str):
return _error(msg_id, _INVALID_REQUEST, "method must be a string")
if not isinstance(params, dict):
return _error(msg_id, _INVALID_PARAMS, "params must be an object")
# Notification — no id, no response expected.
is_notification = msg_id is None and "id" not in request
try:
result = await self._dispatch_method(
method, params, extra_headers=extra_headers, client=client
)
except RpcError as exc:
if is_notification:
return None
return _error(msg_id, exc.code, exc.message)
except Exception as exc: # noqa: BLE001
if is_notification:
return None
return _error(msg_id, _INTERNAL_ERROR, f"Internal error: {exc!r}")
if is_notification:
return None
return {"jsonrpc": "2.0", "id": msg_id, "result": result}
async def _dispatch_method(
self,
method: str,
params: dict[str, Any],
*,
extra_headers: list[tuple[bytes, bytes]] | None = None,
client: tuple[str, int] | None = None,
) -> Any:
if method == "initialize":
return {
"protocolVersion": MCP_PROTOCOL_VERSION,
"capabilities": {"tools": {"listChanged": False}},
"serverInfo": {"name": self._name, "version": self._version},
}
if method == "ping":
return {}
if method == "notifications/initialized":
return {}
if method == "tools/list":
return {"tools": [_external_tool(t) for t in await self._tools()]}
if method == "tools/call":
return await self._call_tool(params, extra_headers=extra_headers, client=client)
raise RpcError(_METHOD_NOT_FOUND, f"Unknown method: {method}")
async def _call_tool(
self,
params: dict[str, Any],
*,
extra_headers: list[tuple[bytes, bytes]] | None = None,
client: tuple[str, int] | None = None,
) -> dict[str, Any]:
name = params.get("name")
arguments = params.get("arguments") or {}
if not isinstance(name, str):
raise RpcError(_INVALID_PARAMS, "'name' is required")
if not isinstance(arguments, dict):
raise RpcError(_INVALID_PARAMS, "'arguments' must be an object")
tools = await self._tools()
tool = next((t for t in tools if t["name"] == name), None)
if tool is None:
raise RpcError(_METHOD_NOT_FOUND, f"Tool '{name}' not found")
meta = tool["_meta"]
status, body, headers = await dispatch_tool(
self._app,
meta["method"],
meta["path"],
arguments,
client=client,
extra_headers=extra_headers,
)
is_error = status >= 400
return {
"content": [
{"type": "text", "text": _decode_body(body, headers)},
],
"isError": is_error,
"structuredContent": {
"status": status,
"headers": self._safe_headers(headers),
},
}
def _safe_headers(self, headers: dict[str, str]) -> dict[str, str]:
"""Strip credential-bearing response headers before exposing to the agent."""
return {
k: v
for k, v in headers.items()
if k.lower() not in _SENSITIVE_HEADERS
and k.lower() not in self._strip_response_headers
and not _SENSITIVE_HEADER_PATTERN.match(k)
}
# Backward-compatible alias — older code may import _RpcError from this module.
_RpcError = RpcError
def _error(msg_id: Any, code: int, message: str) -> dict[str, Any]:
return {"jsonrpc": "2.0", "id": msg_id, "error": {"code": code, "message": message}}
def _external_tool(tool: dict[str, Any]) -> dict[str, Any]:
return {
"name": tool["name"],
"description": tool["description"],
"inputSchema": tool["inputSchema"],
}
def _decode_body(body: bytes, headers: dict[str, str]) -> str:
ctype = headers.get("content-type", "")
if "application/json" in ctype:
try:
return json.dumps(json.loads(body.decode("utf-8")), ensure_ascii=False)
except (UnicodeDecodeError, json.JSONDecodeError):
return body.decode("utf-8", errors="replace")
return body.decode("utf-8", errors="replace")
__all__ = ["MCPServer", "MCP_PROTOCOL_VERSION"]