-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_file_hash.py
More file actions
531 lines (415 loc) · 18 KB
/
Copy pathtest_file_hash.py
File metadata and controls
531 lines (415 loc) · 18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
"""ssh_file_hash: input validation + POSIX/Windows command shape + output parsing.
Covers:
- invalid algorithm rejected
- POSIX path hits `<algo>sum -- <canonical>` with correct binary per algo
- Windows path hits `powershell ... Get-FileHash -Algorithm <ALGO> -LiteralPath '<p>'`
- digest lowercase, path-with-spaces in POSIX output parsed correctly
- non-zero exit -> HashError with stderr
- unparseable digest -> HashError
- path canonicalize + restricted-paths checks invoked (catches missing import)
No live SSH. `monkeypatch.setattr` swaps `resolve_path` / the conn's
`conn.run` / SFTP stat with shim objects; we assert on the captured argv.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, ClassVar
from unittest.mock import MagicMock
import pytest
from ssh_mcp.config import Settings
from ssh_mcp.models.policy import AuthPolicy, HostPolicy
from ssh_mcp.tools import sftp_read_tools
from ssh_mcp.tools.sftp_read_tools import HashError, ssh_file_hash
@dataclass
class _FakeRun:
stdout: str = ""
stderr: str = ""
exit_status: int = 0
class _FakeAttrs:
def __init__(self, size: int) -> None:
self.size = size
self.permissions = 0o100644
self.mtime = 0
self.uid = 1000
self.gid = 1000
class _FakeSFTP:
def __init__(self, size: int, *, stat_raises: bool = False) -> None:
self._size = size
self._stat_raises = stat_raises
async def __aenter__(self) -> _FakeSFTP:
return self
async def __aexit__(self, *exc: object) -> None:
return None
async def stat(self, _path: str) -> _FakeAttrs:
if self._stat_raises:
import asyncssh
raise asyncssh.SFTPError(
asyncssh.sftp.FX_NO_SUCH_FILE,
"no such file",
)
return _FakeAttrs(self._size)
class _FakeConn:
def __init__(
self,
*,
run_result: _FakeRun,
size: int = 1234,
stat_raises: bool = False,
) -> None:
self._run_result = run_result
self._size = size
self._stat_raises = stat_raises
self.run_calls: list[str] = []
self.run_kwargs: list[dict[str, Any]] = []
async def run(self, args: str, *, check: bool = False, timeout: float | None = None) -> _FakeRun:
self.run_calls.append(args)
self.run_kwargs.append({"check": check, "timeout": timeout})
return self._run_result
def start_sftp_client(self) -> _FakeSFTP:
return _FakeSFTP(self._size, stat_raises=self._stat_raises)
def _ctx(policy: HostPolicy) -> Any:
"""Stub ctx with just enough lifespan_context for the tool body."""
pool = MagicMock()
conn_holder: dict[str, _FakeConn] = {}
async def _acquire(_policy):
return conn_holder["conn"]
pool.acquire = _acquire
# INC-pool-sftp: `_stat_size` now goes through pool.sftp(resolved) instead
# of conn.start_sftp_client(). Wire pool.sftp to the same fake SFTP the
# underlying conn would produce so the existing `_FakeSFTP` and its
# stat_raises flag still drive `_stat_size`'s -1 sentinel branch.
def _pool_sftp(_resolved: Any) -> Any:
return conn_holder["conn"].start_sftp_client()
pool.sftp = MagicMock(side_effect=_pool_sftp)
class _C:
lifespan_context: ClassVar[dict[str, Any]] = {
"pool": pool,
"settings": Settings(SSH_PATH_ALLOWLIST=["/opt/app", "C:\\opt\\app"]),
"hosts": {"x": policy},
}
return _C(), conn_holder
def _posix_policy() -> HostPolicy:
return HostPolicy(
hostname="web01",
user="deploy",
auth=AuthPolicy(method="agent"),
path_allowlist=["/opt/app"],
)
def _windows_policy() -> HostPolicy:
return HostPolicy(
hostname="winbox",
user="Administrator",
auth=AuthPolicy(method="agent"),
platform="windows",
path_allowlist=["C:\\opt\\app"],
)
# --- input validation ---
class TestInputValidation:
@pytest.mark.asyncio
async def test_rejects_unknown_algorithm(self) -> None:
policy = _posix_policy()
ctx, _holder = _ctx(policy)
with pytest.raises(ValueError, match="algorithm must be one of"):
await ssh_file_hash(
host="x",
path="/opt/app/f",
ctx=ctx,
algorithm="sha3_256", # type: ignore[arg-type]
)
# --- POSIX happy path ---
class TestPosixHashing:
@pytest.mark.asyncio
@pytest.mark.parametrize(
("algorithm", "expected_binary"),
[
("md5", "md5sum"),
("sha1", "sha1sum"),
("sha256", "sha256sum"),
("sha512", "sha512sum"),
],
)
async def test_invokes_correct_binary(
self,
monkeypatch,
algorithm: str,
expected_binary: str,
) -> None:
async def fake_resolve(_conn, path, _policy, _settings, **_kw):
return path
monkeypatch.setattr(sftp_read_tools, "resolve_path", fake_resolve)
policy = _posix_policy()
ctx, holder = _ctx(policy)
conn = _FakeConn(
run_result=_FakeRun(stdout="deadbeef1234 /opt/app/f\n"),
size=42,
)
holder["conn"] = conn
result = await ssh_file_hash(
host="x",
path="/opt/app/f",
ctx=ctx,
algorithm=algorithm, # type: ignore[arg-type]
)
# One run call, and it starts with the right binary.
assert len(conn.run_calls) == 1
assert conn.run_calls[0].startswith(f"{expected_binary} -- ")
assert result.algorithm == algorithm
assert result.digest == "deadbeef1234"
assert result.size == 42
@pytest.mark.asyncio
async def test_parses_path_with_spaces(self, monkeypatch) -> None:
"""`<hex> <path>` -- split once on whitespace, take the digest."""
async def fake_resolve(_conn, path, _policy, _settings, **_kw):
return path
monkeypatch.setattr(sftp_read_tools, "resolve_path", fake_resolve)
policy = _posix_policy()
ctx, holder = _ctx(policy)
holder["conn"] = _FakeConn(
run_result=_FakeRun(
stdout="cafebabe1234 /opt/app/with spaces in name\n",
),
)
result = await ssh_file_hash(
host="x",
path="/opt/app/with spaces in name",
ctx=ctx,
)
assert result.digest == "cafebabe1234"
@pytest.mark.asyncio
async def test_non_zero_exit_raises_HashError(self, monkeypatch) -> None:
async def fake_resolve(_conn, path, _policy, _settings, **_kw):
return path
monkeypatch.setattr(sftp_read_tools, "resolve_path", fake_resolve)
policy = _posix_policy()
ctx, holder = _ctx(policy)
holder["conn"] = _FakeConn(
run_result=_FakeRun(
stderr="sha256sum: /opt/app/f: Permission denied\n",
exit_status=1,
),
)
with pytest.raises(HashError, match="Permission denied"):
await ssh_file_hash(host="x", path="/opt/app/f", ctx=ctx)
@pytest.mark.asyncio
async def test_unparseable_digest_raises_HashError(self, monkeypatch) -> None:
async def fake_resolve(_conn, path, _policy, _settings, **_kw):
return path
monkeypatch.setattr(sftp_read_tools, "resolve_path", fake_resolve)
policy = _posix_policy()
ctx, holder = _ctx(policy)
holder["conn"] = _FakeConn(
run_result=_FakeRun(stdout="NOT-A-HEX-STRING /opt/app/f\n"),
)
with pytest.raises(HashError, match="unparseable"):
await ssh_file_hash(host="x", path="/opt/app/f", ctx=ctx)
@pytest.mark.asyncio
async def test_digest_lowercased_even_if_upstream_uppercase(self, monkeypatch) -> None:
"""sha256sum is already lowercase but BusyBox variants can differ."""
async def fake_resolve(_conn, path, _policy, _settings, **_kw):
return path
monkeypatch.setattr(sftp_read_tools, "resolve_path", fake_resolve)
policy = _posix_policy()
ctx, holder = _ctx(policy)
holder["conn"] = _FakeConn(
run_result=_FakeRun(stdout="DEADBEEF /opt/app/f\n"),
)
result = await ssh_file_hash(host="x", path="/opt/app/f", ctx=ctx)
assert result.digest == "deadbeef"
# --- Windows (PowerShell -EncodedCommand path, INC-028) ---
class TestWindowsHashing:
"""ssh_file_hash on Windows targets dispatches to `_hash_windows`, which
base64-UTF16LE-encodes a `Get-FileHash` PowerShell script and ships it as a
single `-EncodedCommand` argv token. Avoids every shell-quoting corner the
previous `shlex.join`-based attempt (INC-031) hit on cmd.exe / PowerShell.
"""
@staticmethod
def _decode_encoded_cmd(run_call: str) -> str:
"""Pull the b64 payload out of the `powershell.exe ... -EncodedCommand <b64>`
command and decode back to the original UTF-16-LE script."""
import base64 as _b64
token = run_call.rsplit(" ", 1)[-1]
return _b64.b64decode(token).decode("utf-16-le")
@pytest.mark.asyncio
@pytest.mark.parametrize(
("algorithm", "expected_ps_algo"),
[
("md5", "MD5"),
("sha1", "SHA1"),
("sha256", "SHA256"),
("sha512", "SHA512"),
],
)
async def test_invokes_powershell_encoded_command(
self,
monkeypatch,
algorithm: str,
expected_ps_algo: str,
) -> None:
"""Argv must be `powershell.exe -NoProfile -NonInteractive
-EncodedCommand <b64>`, and the b64 must decode to a Get-FileHash
call naming the right algorithm + LiteralPath."""
async def fake_resolve(_conn, path, _policy, _settings, **_kw):
return path
monkeypatch.setattr(sftp_read_tools, "resolve_path", fake_resolve)
# Digest length per algorithm (matches the shape check in _hash_windows).
digest_len = {"md5": 32, "sha1": 40, "sha256": 64, "sha512": 128}[algorithm]
fake_digest = ("DEADBEEF" * 16)[:digest_len] # uppercase hex, correct length
policy = _windows_policy()
ctx, holder = _ctx(policy)
holder["conn"] = _FakeConn(
# Get-FileHash returns uppercase hex; parser lowercases.
run_result=_FakeRun(stdout=f"{fake_digest}\r\n"),
size=99,
)
result = await ssh_file_hash(
host="x",
path="C:\\opt\\app\\f",
ctx=ctx,
algorithm=algorithm, # type: ignore[arg-type]
)
assert len(holder["conn"].run_calls) == 1
call = holder["conn"].run_calls[0]
assert call.startswith("powershell.exe -NoProfile -NonInteractive -EncodedCommand ")
script = self._decode_encoded_cmd(call)
assert f"-Algorithm {expected_ps_algo}" in script
assert "Get-FileHash" in script
assert "-LiteralPath" in script
assert "C:\\opt\\app\\f" in script
# Fix: suppresses PowerShell progress-record CLIXML on stderr and
# forces an exit-status request over SSH (Windows OpenSSH quirk).
assert "$ProgressPreference='SilentlyContinue'" in script
assert script.rstrip(";").endswith("exit 0")
assert result.digest == fake_digest.lower()
assert result.algorithm == algorithm
assert result.size == 99
@pytest.mark.asyncio
async def test_windows_path_with_single_quote_is_ps_escaped(
self,
monkeypatch,
) -> None:
"""PowerShell literal single-quoted strings escape `'` as `''`.
Catches the "operator has a file named `O'Brien.txt`" class of bug that
would otherwise inject unquoted script into the -EncodedCommand payload
and either crash PowerShell or (worse) execute attacker-supplied code
if the path came through an attacker-reachable channel.
"""
async def fake_resolve(_conn, path, _policy, _settings, **_kw):
return path
monkeypatch.setattr(sftp_read_tools, "resolve_path", fake_resolve)
policy = _windows_policy()
ctx, holder = _ctx(policy)
# Default algorithm is sha256 (64 hex chars).
fake_sha256 = ("CAFEBABE" * 16)[:64]
holder["conn"] = _FakeConn(run_result=_FakeRun(stdout=f"{fake_sha256}\r\n"))
path = "C:\\opt\\app\\O'Brien.txt"
# `resolve_path` is monkeypatched to echo the path, so the real
# allowlist check is bypassed; we're verifying the script shape.
await ssh_file_hash(host="x", path=path, ctx=ctx)
script = self._decode_encoded_cmd(holder["conn"].run_calls[0])
# The `'` in `O'Brien` must appear as `''` between the enclosing
# single quotes of the LiteralPath argument.
assert "-LiteralPath 'C:\\opt\\app\\O''Brien.txt'" in script
@pytest.mark.asyncio
async def test_windows_non_zero_exit_raises_HashError(self, monkeypatch) -> None:
async def fake_resolve(_conn, path, _policy, _settings, **_kw):
return path
monkeypatch.setattr(sftp_read_tools, "resolve_path", fake_resolve)
policy = _windows_policy()
ctx, holder = _ctx(policy)
holder["conn"] = _FakeConn(
run_result=_FakeRun(
stderr="Get-FileHash : Access is denied.",
exit_status=1,
),
)
with pytest.raises(HashError, match="powershell"):
await ssh_file_hash(host="x", path="C:\\opt\\app\\f", ctx=ctx)
# --- Path policy reachability (would catch missing imports) ---
@pytest.mark.asyncio
async def test_file_hash_invokes_resolve_path(monkeypatch) -> None:
"""Drive past validation with stubbed I/O and assert `resolve_path` is
actually called. Catches the NameError class of bugs where the helper
is referenced but not imported. `resolve_path` bundles
`canonicalize_and_check` + `check_not_restricted`, so a single patch
covers both halves of the path-policy chain."""
calls: list[Any] = []
async def fake_resolve(_conn, path, policy, _settings, *, must_exist=True, **_kw):
calls.append((path, must_exist, policy.platform))
return path
monkeypatch.setattr(sftp_read_tools, "resolve_path", fake_resolve)
policy = _posix_policy()
ctx, holder = _ctx(policy)
holder["conn"] = _FakeConn(run_result=_FakeRun(stdout="cafebabe /opt/app/f\n"))
await ssh_file_hash(host="x", path="/opt/app/f", ctx=ctx)
assert len(calls) == 1
path, must_exist, platform = calls[0]
assert path == "/opt/app/f"
assert must_exist is True
assert platform == "posix"
# --- Gap coverage: canonical != raw, stat failure, timeout propagation ---
@pytest.mark.asyncio
async def test_hash_uses_canonical_path_not_raw_input(monkeypatch) -> None:
"""`realpath -m` resolves `/opt/app/../app/f` to `/opt/app/f`; the hash
command must see the canonical form, not the raw input. Catches a bug
where a future refactor passes `path` through instead of `canonical`."""
async def fake_resolve(_conn, _path, _policy, _settings, **_kw):
return "/opt/app/canonical/f" # deliberately different from input
monkeypatch.setattr(sftp_read_tools, "resolve_path", fake_resolve)
policy = _posix_policy()
ctx, holder = _ctx(policy)
conn = _FakeConn(run_result=_FakeRun(stdout="abc123 /opt/app/canonical/f\n"))
holder["conn"] = conn
result = await ssh_file_hash(host="x", path="/opt/app/../app/raw/f", ctx=ctx)
# argv must reference the canonical path, NOT the raw input.
assert "/opt/app/canonical/f" in conn.run_calls[0]
assert "raw" not in conn.run_calls[0]
# Returned `path` field also reflects the canonical form.
assert result.path == "/opt/app/canonical/f"
@pytest.mark.asyncio
async def test_stat_failure_yields_negative_size(monkeypatch) -> None:
"""`_stat_size` returns -1 when SFTP stat fails (e.g. after the hash
succeeds but the file vanished, or the SFTP subsystem barfs). The tool
must not raise -- the digest is still useful without a size."""
async def fake_resolve(_conn, path, _policy, _settings, **_kw):
return path
monkeypatch.setattr(sftp_read_tools, "resolve_path", fake_resolve)
policy = _posix_policy()
ctx, holder = _ctx(policy)
holder["conn"] = _FakeConn(
run_result=_FakeRun(stdout="deadbeef /opt/app/f\n"),
stat_raises=True,
)
result = await ssh_file_hash(host="x", path="/opt/app/f", ctx=ctx)
assert result.digest == "deadbeef"
assert result.size == -1
@pytest.mark.asyncio
async def test_timeout_propagates_to_conn_run(monkeypatch) -> None:
"""Caller-supplied timeout reaches the conn.run call. Without this, the
docstring promise of `timeout` control would be a lie."""
async def fake_resolve(_conn, path, _policy, _settings, **_kw):
return path
monkeypatch.setattr(sftp_read_tools, "resolve_path", fake_resolve)
policy = _posix_policy()
ctx, holder = _ctx(policy)
conn = _FakeConn(run_result=_FakeRun(stdout="deadbeef /opt/app/f\n"))
holder["conn"] = conn
await ssh_file_hash(host="x", path="/opt/app/f", ctx=ctx, timeout=300)
assert len(conn.run_kwargs) == 1
assert conn.run_kwargs[0]["timeout"] == 300.0
@pytest.mark.asyncio
async def test_timeout_defaults_to_settings_value(monkeypatch) -> None:
"""No explicit timeout -> Settings.SSH_COMMAND_TIMEOUT is used."""
async def fake_resolve(_conn, path, _policy, _settings, **_kw):
return path
monkeypatch.setattr(sftp_read_tools, "resolve_path", fake_resolve)
policy = _posix_policy()
ctx, holder = _ctx(policy)
# Customize the settings in the ctx so the assertion has a distinctive value.
from ssh_mcp.config import Settings
ctx.__class__.lifespan_context["settings"] = Settings(
SSH_PATH_ALLOWLIST=["/opt/app"],
SSH_COMMAND_TIMEOUT=777,
)
conn = _FakeConn(run_result=_FakeRun(stdout="deadbeef /opt/app/f\n"))
holder["conn"] = conn
await ssh_file_hash(host="x", path="/opt/app/f", ctx=ctx)
assert conn.run_kwargs[0]["timeout"] == 777.0