Files
r.kulbaev 2fe0ece991 fix(upload): scope archive extraction to per-session attachment dir
handle_upload_extract() used Path(s.workspace) as the extraction root,
bypassing HERMES_WEBUI_ATTACHMENT_DIR entirely. Route through
_session_attachment_dir(session_id) so archives land alongside
single-file uploads and session cleanup covers them.

Add tests and CHANGELOG entry.

Ref #2247
2026-05-18 21:22:02 +03:00

313 lines
13 KiB
Python

"""
Hermes Web UI -- File upload: multipart parser and upload handler.
"""
import mimetypes
import os
import re as _re
import email.parser
import tempfile
from pathlib import Path
from api.config import MAX_UPLOAD_BYTES, STATE_DIR
from api.helpers import j, bad
from api.models import get_session
from api.workspace import safe_resolve_ws
_MAX_EXTRACTED_BYTES = 10 * MAX_UPLOAD_BYTES
def parse_multipart(rfile, content_type, content_length) -> tuple:
import re as _re, email.parser as _ep
m = _re.search(r'boundary=([^;\s]+)', content_type)
if not m:
raise ValueError('No boundary in Content-Type')
boundary = m.group(1).strip('"').encode()
raw = rfile.read(content_length)
fields = {}
files = {}
delimiter = b'--' + boundary
end_marker = b'--' + boundary + b'--'
parts = raw.split(delimiter)
for part in parts[1:]:
stripped = part.lstrip(b'\r\n')
if stripped.startswith(b'--'):
break
sep = b'\r\n\r\n' if b'\r\n\r\n' in part else b'\n\n'
if sep not in part:
continue
header_raw, body = part.split(sep, 1)
if body.endswith(b'\r\n'):
body = body[:-2]
elif body.endswith(b'\n'):
body = body[:-1]
header_text = header_raw.lstrip(b'\r\n').decode('utf-8', errors='replace')
msg = _ep.HeaderParser().parsestr(header_text)
disp = msg.get('Content-Disposition', '')
name_m = _re.search(r'name="([^"]*)"', disp)
file_m = _re.search(r'filename="([^"]*)"', disp)
if not name_m:
continue
name = name_m.group(1)
if file_m:
files[name] = (file_m.group(1), body)
else:
fields[name] = body.decode('utf-8', errors='replace')
return fields, files
def _sanitize_upload_name(filename: str) -> str:
safe_name = _re.sub(r'[^\w.\-]', '_', Path(filename).name)[:200]
if not safe_name or safe_name.strip('.') == '':
raise ValueError('Invalid filename')
return safe_name
def _attachment_root() -> Path:
"""Return the configured upload inbox root.
Plain chat attachments are transient context for the agent, not project
source files. Keep them out of the active workspace by default while still
allowing operators to move the inbox with HERMES_WEBUI_ATTACHMENT_DIR.
"""
override = os.getenv('HERMES_WEBUI_ATTACHMENT_DIR', '').strip()
if override:
return Path(override).expanduser().resolve()
return (STATE_DIR / 'attachments').resolve()
def _upload_destination(session_id: str, safe_name: str) -> Path:
dest_dir = _session_attachment_dir(session_id)
dest_dir.mkdir(parents=True, exist_ok=True)
dest = (dest_dir / safe_name).resolve()
if not dest.is_relative_to(dest_dir):
raise ValueError('Invalid upload destination')
return dest
def _session_attachment_dir(session_id: str, *, root: Path | None = None) -> Path:
root = (root or _attachment_root()).resolve()
dest_dir = (root / _re.sub(r'[^\w.\-]', '_', str(session_id or 'session'))[:120]).resolve()
if not dest_dir.is_relative_to(root):
raise ValueError('Invalid attachment directory')
return dest_dir
def handle_upload(handler):
import traceback as _tb
try:
content_type = handler.headers.get('Content-Type', '')
content_length = int(handler.headers.get('Content-Length', 0) or 0)
if content_length > MAX_UPLOAD_BYTES:
return j(handler, {'error': f'File too large (max {MAX_UPLOAD_BYTES//1024//1024}MB)'}, status=413)
fields, files = parse_multipart(handler.rfile, content_type, content_length)
session_id = fields.get('session_id', '')
if 'file' not in files:
return j(handler, {'error': 'No file field in request'}, status=400)
filename, file_bytes = files['file']
if not filename:
return j(handler, {'error': 'No filename in upload'}, status=400)
try:
s = get_session(session_id)
except KeyError:
return j(handler, {'error': 'Session not found'}, status=404)
safe_name = _sanitize_upload_name(filename)
dest = _upload_destination(session_id, safe_name)
dest.write_bytes(file_bytes)
mime = mimetypes.guess_type(safe_name)[0] or 'application/octet-stream'
return j(handler, {
'filename': safe_name,
'path': str(dest),
'size': dest.stat().st_size,
'mime': mime,
'is_image': mime.startswith('image/'),
})
except ValueError as e:
return j(handler, {'error': str(e)}, status=400)
except Exception:
print('[webui] upload error: ' + _tb.format_exc(), flush=True)
return j(handler, {'error': 'Upload failed'}, status=500)
def extract_archive(file_bytes: bytes, filename: str, workspace: Path):
"""Extract a zip or tar archive into the workspace.
Returns a dict with ``extracted`` (int), ``files`` (list[str]).
Raises ValueError on zip-slip or unsupported format.
"""
import zipfile, tarfile, io, os, shutil
name = Path(filename).name
stem = Path(filename).stem # strip .zip / .tar.gz etc.
if name.lower().endswith(('.zip',)):
_mode = 'zip'
elif name.lower().endswith(('.tar', '.tar.gz', '.tgz', '.tar.bz2', '.tbz2', '.tar.xz', '.txz')):
_mode = 'tar'
else:
raise ValueError(f'Unsupported archive format: {filename}')
# Determine destination directory — use archive stem as folder name
dest_dir = safe_resolve_ws(workspace, stem)
# Avoid overwriting existing files by appending a suffix
if dest_dir.exists():
import string, random
while dest_dir.exists():
suffix = ''.join(random.choices(string.digits, k=3))
dest_dir = dest_dir.with_name(stem + '_' + suffix)
dest_dir.mkdir(parents=True, exist_ok=True)
extracted_files = []
total_extracted = 0
try:
if _mode == 'zip':
with zipfile.ZipFile(io.BytesIO(file_bytes)) as zf:
for member in zf.infolist():
# Skip directories
if member.is_dir():
continue
# Zip-slip protection
member_path = (dest_dir / member.filename).resolve()
if not member_path.is_relative_to(dest_dir.resolve()):
raise ValueError(f'Zip-slip blocked: {member.filename}')
# Zip-bomb protection: track actual extracted bytes (not declared file_size)
if total_extracted > _MAX_EXTRACTED_BYTES:
raise ValueError(
f'Extraction too large ({total_extracted // (1024*1024)} MB > '
f'{_MAX_EXTRACTED_BYTES // (1024*1024)} MB limit). '
f'Possible zip bomb.'
)
member_path.parent.mkdir(parents=True, exist_ok=True)
with zf.open(member) as src, open(member_path, 'wb') as dst:
_chunk_size = 65536
while True:
chunk = src.read(_chunk_size)
if not chunk:
break
total_extracted += len(chunk)
if total_extracted > _MAX_EXTRACTED_BYTES:
raise ValueError(
f'Extraction too large (> '
f'{_MAX_EXTRACTED_BYTES // (1024*1024)} MB limit). '
f'Possible zip bomb.'
)
dst.write(chunk)
extracted_files.append(str(member_path.relative_to(workspace.resolve())))
elif _mode == 'tar':
with tarfile.open(fileobj=io.BytesIO(file_bytes)) as tf:
for member in tf.getmembers():
if not member.isfile():
continue
# Tar-slip protection
member_path = (dest_dir / member.name).resolve()
if not member_path.is_relative_to(dest_dir.resolve()):
raise ValueError(f'Tar-slip blocked: {member.name}')
# Tar-bomb protection: track actual extracted bytes (not declared size)
if total_extracted > _MAX_EXTRACTED_BYTES:
raise ValueError(
f'Extraction too large ({total_extracted // (1024*1024)} MB > '
f'{_MAX_EXTRACTED_BYTES // (1024*1024)} MB limit). '
f'Possible zip bomb.'
)
member_path.parent.mkdir(parents=True, exist_ok=True)
src_obj = tf.extractfile(member)
if src_obj:
with src_obj as src, open(member_path, 'wb') as dst:
_chunk_size = 65536
while True:
chunk = src.read(_chunk_size)
if not chunk:
break
total_extracted += len(chunk)
if total_extracted > _MAX_EXTRACTED_BYTES:
raise ValueError(
f'Extraction too large (> '
f'{_MAX_EXTRACTED_BYTES // (1024*1024)} MB limit). '
f'Possible zip bomb.'
)
dst.write(chunk)
extracted_files.append(str(member_path.relative_to(workspace.resolve())))
except Exception:
# Clean up partially-extracted directory to avoid orphaned folders
try:
shutil.rmtree(dest_dir, ignore_errors=True)
except Exception:
pass
raise
return {'extracted': len(extracted_files), 'files': extracted_files, 'dest': str(dest_dir)}
def handle_upload_extract(handler):
"""Handle archive upload and extraction."""
import traceback as _tb
try:
content_type = handler.headers.get('Content-Type', '')
content_length = int(handler.headers.get('Content-Length', 0) or 0)
if content_length > MAX_UPLOAD_BYTES:
return j(handler, {'error': f'File too large (max {MAX_UPLOAD_BYTES//1024//1024}MB)'}, status=413)
fields, files = parse_multipart(handler.rfile, content_type, content_length)
session_id = fields.get('session_id', '')
if 'file' not in files:
return j(handler, {'error': 'No file field in request'}, status=400)
filename, file_bytes = files['file']
if not filename:
return j(handler, {'error': 'No filename in upload'}, status=400)
try:
s = get_session(session_id)
except KeyError:
return j(handler, {'error': 'Session not found'}, status=404)
session_dir = _session_attachment_dir(session_id)
session_dir.mkdir(parents=True, exist_ok=True)
result = extract_archive(file_bytes, filename, session_dir)
return j(handler, {'ok': True, **result})
except ValueError as e:
return j(handler, {'error': str(e)}, status=400)
except Exception:
print('[webui] upload extract error: ' + _tb.format_exc(), flush=True)
return j(handler, {'error': 'Archive extraction failed'}, status=500)
def handle_transcribe(handler):
import traceback as _tb
temp_path = None
try:
content_type = handler.headers.get('Content-Type', '')
content_length = int(handler.headers.get('Content-Length', 0) or 0)
if content_length > MAX_UPLOAD_BYTES:
return j(handler, {'error': f'File too large (max {MAX_UPLOAD_BYTES//1024//1024}MB)'}, status=413)
fields, files = parse_multipart(handler.rfile, content_type, content_length)
if 'file' not in files:
return j(handler, {'error': 'No file field in request'}, status=400)
filename, file_bytes = files['file']
if not filename:
return j(handler, {'error': 'No filename in upload'}, status=400)
safe_name = _sanitize_upload_name(filename)
suffix = Path(safe_name).suffix or '.webm'
with tempfile.NamedTemporaryFile(prefix='webui-stt-', suffix=suffix, delete=False) as tmp:
temp_path = tmp.name
tmp.write(file_bytes)
try:
from tools.transcription_tools import transcribe_audio
except ImportError:
return j(handler, {'error': 'Speech-to-text is unavailable on this server'}, status=503)
result = transcribe_audio(temp_path)
if not result.get('success'):
msg = str(result.get('error') or 'Transcription failed')
status = 503 if 'unavailable' in msg.lower() or 'not configured' in msg.lower() else 400
return j(handler, {'error': msg}, status=status)
transcript = str(result.get('transcript') or '').strip()
return j(handler, {'ok': True, 'transcript': transcript})
except ValueError as e:
return j(handler, {'error': str(e)}, status=400)
except Exception:
print('[webui] transcribe error: ' + _tb.format_exc(), flush=True)
return j(handler, {'error': 'Transcription failed'}, status=500)
finally:
if temp_path:
try:
Path(temp_path).unlink(missing_ok=True)
except Exception:
pass