-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdb_ops.py
More file actions
1631 lines (1367 loc) · 54.6 KB
/
Copy pathdb_ops.py
File metadata and controls
1631 lines (1367 loc) · 54.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Database operations: authentication, CRUD for all entities, user management,
model preferences, and schema migrations.
"""
import base64
import json
import logging
import os
from datetime import datetime
import bcrypt
from config import PROVIDERS
from crypto_utils import KeyWrapper, crypto
from db_models import (
Base,
ChatHistory,
CustomPrompt,
DocumentTranslation,
GeneratedImage,
SessionLocal,
Transcription,
User,
UserModelPreference,
UserSettings,
VisionResult,
engine,
)
logger = logging.getLogger(__name__)
key_wrapper = KeyWrapper()
def get_db() -> "SessionLocal": # type: ignore[valid-type]
"""Get database session."""
return SessionLocal()
def hash_password(password: str) -> str:
"""Hash a password using bcrypt"""
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
def verify_password(password: str, hashed: str) -> bool:
"""Verify a password against its hash"""
return bcrypt.checkpw(password.encode("utf-8"), hashed.encode("utf-8"))
# ── Login throttle (per-username, in-memory) ───────────────────────────────
# Sliding window: max 5 failures in 15 minutes locks the account for 15 min.
# In-memory only — clears on app restart. For multi-process deploys, replace
# with Redis. The lockout uses the *username*, not IP, because the request
# layer doesn't surface IP into this function. Add IP gating at the proxy.
import threading as _login_threading
import time as _login_time
_LOGIN_FAILS: dict[str, list[float]] = {}
_LOGIN_LOCKED: dict[str, float] = {}
_LOGIN_LOCK = _login_threading.Lock()
_LOGIN_WINDOW = 15 * 60 # 15 minutes
_LOGIN_MAX_FAILS = 5
_LOGIN_LOCKOUT = 15 * 60 # 15 minutes
def _login_is_locked(username: str) -> bool:
now = _login_time.time()
with _LOGIN_LOCK:
until = _LOGIN_LOCKED.get(username, 0.0)
if until and until > now:
return True
if until and until <= now:
_LOGIN_LOCKED.pop(username, None)
return False
def _login_record_failure(username: str) -> None:
now = _login_time.time()
cutoff = now - _LOGIN_WINDOW
with _LOGIN_LOCK:
fails = [t for t in _LOGIN_FAILS.get(username, []) if t >= cutoff]
fails.append(now)
_LOGIN_FAILS[username] = fails
if len(fails) >= _LOGIN_MAX_FAILS:
_LOGIN_LOCKED[username] = now + _LOGIN_LOCKOUT
_LOGIN_FAILS[username] = []
logger.warning(
f"[SECURITY] Account '{username}' locked for {_LOGIN_LOCKOUT // 60} min "
f"after {_LOGIN_MAX_FAILS} failed login attempts"
)
def _login_record_success(username: str) -> None:
with _LOGIN_LOCK:
_LOGIN_FAILS.pop(username, None)
_LOGIN_LOCKED.pop(username, None)
def get_user_by_username(username: str) -> User | None:
"""Look up a user by username. Returns the User ORM object or None."""
db = get_db()
try:
return db.query(User).filter(User.username == username).first()
finally:
db.close()
def get_verified_user_id(user_state: dict | None) -> int | None:
"""Re-read user from DB by session user_id and return the verified ID.
Use this instead of raw ``user_state["id"]`` in security-sensitive
operations (delete, decrypt, export, exchange creds) to prevent IDOR
attacks via tampered session state.
Returns the user ID if the user exists and is active, else None.
"""
if not user_state or not user_state.get("id"):
return None
db = get_db()
try:
user = db.query(User).filter(User.id == user_state["id"]).first()
if user is None:
logger.warning(
"[SECURITY] get_verified_user_id: user_id=%s not found in DB",
user_state.get("id"),
)
return None
return user.id
finally:
db.close()
import threading as _threading
# Server-side UMK cache — populated during gradio_auth_check, consumed
# by demo.load() session init. Keyed by username → umk bytes.
# Thread-safe via lock. Entries persist for the process lifetime
# (acceptable — same as the previous localStorage approach, but
# server-side instead of client-side).
_UMK_CACHE: dict[str, bytes] = {}
_UMK_LOCK = _threading.Lock()
def gradio_auth_check(username: str, password: str) -> bool:
"""Auth callback for Gradio's auth= parameter.
Returns True if credentials are valid, False otherwise.
Also caches the derived UMK for use in demo.load() session init.
"""
user, umk = authenticate_user(username, password)
if user is not None:
if umk:
with _UMK_LOCK:
_UMK_CACHE[username] = umk
return True
return False
def get_cached_umk(username: str) -> bytes | None:
"""Retrieve the UMK cached during login. Returns bytes or None."""
with _UMK_LOCK:
return _UMK_CACHE.get(username)
def clear_cached_umk(username: str) -> None:
"""Remove the cached UMK for *username* on logout."""
with _UMK_LOCK:
_UMK_CACHE.pop(username, None)
def authenticate_user(username: str, password: str) -> tuple:
"""
Authenticate user AND create keychain if needed.
Returns a 'SafeUser' object that works after DB close.
Includes per-username rate-limit / lockout to slow online brute force.
"""
# Reject obviously invalid usernames before hitting the DB
ok, _ = validate_username(username)
if not ok:
logger.warning("[SECURITY] Login refused — invalid username format: %r", username)
return None, None
if _login_is_locked(username):
logger.warning(f"[SECURITY] Login refused — '{username}' is locked out")
return None, None
db = get_db()
try:
logger.info(f"🔐 AUTH ATTEMPT: User '{username}'")
user = db.query(User).filter(User.username == username).first()
if not user:
logger.error(f"❌ AUTH FAIL: User '{username}' not found in DB")
_login_record_failure(username)
return None, None
if not verify_password(password, user.password_hash):
logger.error(f"❌ AUTH FAIL: Password hash mismatch for '{username}'")
_login_record_failure(username)
return None, None
logger.info(f"✅ Password verified for '{username}'. Unlocking keychain...")
# 1. Migration Logic (User has no keys yet)
if not user.salt or not user.encrypted_master_key:
logger.info(f"🔄 Migrating user {username} to per-user encryption...")
try:
keychain = key_wrapper.create_user_keychain(password)
# Wrap the GLOBAL key (legacy support)
from cryptography.fernet import Fernet
salt_bytes = base64.b64decode(keychain["salt"])
wrapper_key = key_wrapper.derive_wrapper_key(password, salt_bytes)
f = Fernet(wrapper_key)
encrypted_umk = f.encrypt(crypto.global_key)
user.salt = keychain["salt"]
user.encrypted_master_key = encrypted_umk.decode("utf-8")
db.commit()
umk = crypto.global_key
except Exception as e:
logger.error(f"Migration failed: {e}")
return None, None
else:
# 2. Normal Unlock
try:
umk = key_wrapper.unlock_user_keychain(
password, user.salt, user.encrypted_master_key
)
except Exception as e:
logger.error(f"❌ KEYCHAIN ERROR: {e}")
umk = None
if not umk:
logger.error(f"❌ AUTH FAIL: Could not decrypt Master Key for '{username}'")
return None, None
logger.info(f"🔓 Login successful for '{username}'")
_login_record_success(username)
# 3. Create safe object
class SafeUser:
pass
safe_user = SafeUser()
safe_user.id = user.id
safe_user.username = user.username
safe_user.is_admin = user.is_admin
safe_user.is_media_manager = getattr(user, "is_media_manager", False)
safe_user.has_sandbox_access = getattr(user, "has_sandbox_access", False)
return safe_user, umk
except Exception as e:
logger.exception(f"🔥 Auth Critical Error: {e}")
return None, None
finally:
db.close()
def create_default_users() -> None:
"""
Bootstrap a single admin account on a fresh database.
Security model:
- No hardcoded credentials in source.
- Admin username can be supplied via $BOOTSTRAP_ADMIN_USERNAME (default: 'admin').
- Password is read from $BOOTSTRAP_ADMIN_PASSWORD if set, otherwise a 24-char
random password is generated and printed to stdout exactly once.
- The bootstrap user is flagged `must_change_password=True` so the first
login is forced to rotate. (App must enforce that flag.)
- Idempotent: if any users already exist, this function does nothing.
"""
db = SessionLocal()
try:
if db.query(User).count() > 0:
logger.info("Users already exist, skipping bootstrap")
return
import secrets as _secrets
import string as _string
admin_username = os.environ.get("BOOTSTRAP_ADMIN_USERNAME", "admin").strip()
admin_password = os.environ.get("BOOTSTRAP_ADMIN_PASSWORD")
generated = False
if not admin_password:
alphabet = _string.ascii_letters + _string.digits + "!@#$%^&*-_=+"
admin_password = "".join(_secrets.choice(alphabet) for _ in range(24))
generated = True
admin = User(
username=admin_username,
password_hash=hash_password(admin_password),
email=os.environ.get("BOOTSTRAP_ADMIN_EMAIL") or None,
is_admin=True,
)
# Force password rotation on first login if the column exists.
if hasattr(admin, "must_change_password"):
admin.must_change_password = True
db.add(admin)
db.commit()
if generated:
# Single-line marker so the operator can grep this once and rotate it.
print(
"\n" + "=" * 64 + f"\n[BOOTSTRAP] Created admin user '{admin_username}'."
f"\n[BOOTSTRAP] Initial password (rotate immediately): {admin_password}"
"\n[BOOTSTRAP] This password will NOT be shown again."
"\n" + "=" * 64 + "\n",
flush=True,
)
logger.info(f"✅ Bootstrap admin user '{admin_username}' created")
except Exception as e:
db.rollback()
logger.exception(f"Error creating bootstrap admin: {e!s}")
finally:
db.close()
# ── Transition migrations (run idempotently at every startup) ────────────────
# These exist so a deployment that ran the OLD insecure code can be upgraded
# in place without operator surgery. Each step is idempotent and safe to run
# repeatedly.
# Usernames that the OLD `create_default_users` shipped with publicly known
# passwords. ANY user matching one of these gets `must_change_password=True`
# set automatically on startup so the next login forces a rotation.
LEGACY_DEFAULT_USERNAMES = ("admin123", "user123")
def run_security_transition_migrations():
"""
Idempotent post-deploy hardening for an existing database.
1. Ensures the `must_change_password` column exists on `users`.
(`migrate_db.py` also handles this; we duplicate defensively because
this function may run before the migration script is executed.)
2. Force-flags any account whose username matches the historic default
(`admin123`, `user123`) so the operator cannot accidentally keep
running with the publicly-known credentials.
3. Logs a warning if `crypto_utils._APP_DIR/.master_key` exists on disk
and the env var `MASTER_ENCRYPTION_KEY` is *not* set, telling the
operator how to migrate to the env-var path. The on-disk key keeps
working until they do so — no service interruption.
4. Logs a warning if any legacy `*.pickle` token files are still present.
"""
# 1. Ensure must_change_password column exists.
try:
from sqlalchemy import inspect as _inspect
from sqlalchemy import text as _text
from db_models import engine as _engine
_insp = _inspect(_engine)
if "users" in _insp.get_table_names():
cols = [c["name"] for c in _insp.get_columns("users")]
if "must_change_password" not in cols:
with _engine.begin() as _conn:
_conn.execute(
_text("ALTER TABLE users ADD COLUMN must_change_password BOOLEAN DEFAULT 0")
)
logger.info("[TRANSITION] added users.must_change_password column")
except Exception as e:
logger.warning(f"[TRANSITION] could not ensure must_change_password column: {e}")
# 2. Flag legacy default users for forced password rotation.
db = SessionLocal()
try:
flagged = 0
for legacy_name in LEGACY_DEFAULT_USERNAMES:
u = db.query(User).filter(User.username == legacy_name).first()
if not u:
continue
if not getattr(u, "must_change_password", False):
u.must_change_password = True
flagged += 1
logger.warning(
f"[TRANSITION] Legacy default user '{legacy_name}' detected — "
f"flagged must_change_password=True. ROTATE THIS PASSWORD NOW."
)
if flagged:
db.commit()
except Exception as e:
db.rollback()
logger.warning(f"[TRANSITION] could not flag legacy users: {e}")
finally:
db.close()
# 3. Master key on disk → recommend env-var migration.
try:
from crypto_utils import _APP_DIR as _CRYPTO_DIR
_key_file = os.path.join(_CRYPTO_DIR, ".master_key")
if os.path.exists(_key_file) and not os.environ.get("MASTER_ENCRYPTION_KEY"):
logger.warning(
"[TRANSITION] .master_key file is present at %s. The new code "
"still reads it for backward compatibility, but you should "
"migrate to the MASTER_ENCRYPTION_KEY env var. "
"Run `python migrate_master_key.py` to print the env-var "
"value, add it to the systemd EnvironmentFile, then "
"delete the .master_key file.",
_key_file,
)
except Exception as e:
logger.debug(f"[TRANSITION] master-key check skipped: {e}")
# 4. Legacy pickle tokens.
try:
import glob as _glob
from pathlib import Path as _Path
_here = _Path(__file__).parent
_stale = list(_glob.glob(str(_here / "yt_token_*.pickle")))
if _stale:
logger.warning(
f"[TRANSITION] {len(_stale)} legacy yt_token_*.pickle file(s) "
f"found. The new YouTube toolkit will NOT load them (RCE risk "
f"via pickle). Re-authenticate and delete these files: {_stale}"
)
except Exception: # nosec B110
pass
def get_decrypted_transcription(trans_id: int, user_id: int, user_state: dict | None = None) -> dict | None:
"""Load and decrypt using user session key"""
db = SessionLocal()
try:
trans = (
db.query(Transcription)
.filter(Transcription.id == trans_id, Transcription.user_id == user_id)
.first()
)
if not trans:
return None
# GET KEY
umk = user_state.get("umk") if user_state else crypto.global_key
if trans.is_encrypted:
try:
# Try decrypting with Session Key
decrypted = crypto.decrypt_text(str(trans.original_text), key=umk)
# Check validity (AES-GCM auth tag will fail if key is wrong)
if decrypted == "[Decryption Failed]" and umk != crypto.global_key:
# Fallback: Data might be old (encrypted with global key)
decrypted = crypto.decrypt_text(str(trans.original_text), key=crypto.global_key)
trans.original_text = decrypted # type: ignore[assignment]
# 4. Handle Translation Text (same logic)
if trans.translated_text:
decrypted_trans = crypto.decrypt_text(str(trans.translated_text), key=umk)
if decrypted_trans == "[Decryption Failed]" and umk != crypto.global_key:
decrypted_trans = crypto.decrypt_text(
str(trans.translated_text), key=crypto.global_key
)
trans.translated_text = decrypted_trans # type: ignore[assignment]
except Exception as e:
logger.error(f"Decryption failed: {e}")
trans.original_text = "[Fehler: Daten konnten nicht entschlüsselt werden]" # type: ignore[assignment]
return trans
finally:
db.close()
def get_decrypted_chat(chat_id: int, user_id: int, user_state: dict | None = None) -> dict | None:
db = SessionLocal()
try:
chat = (
db.query(ChatHistory)
.filter(ChatHistory.id == chat_id, ChatHistory.user_id == user_id)
.first()
)
if not chat:
return None
# GET KEY
umk = user_state.get("umk") if user_state else None
if chat.is_encrypted:
# Try decrypting with Session Key
try:
# 2. Try decrypting with User Key first
decrypted_json = crypto.decrypt_text(str(chat.messages), key=umk)
# 3. If that failed (returns specific error string), try Global Key
# This handles legacy data migration automatically
if decrypted_json == "[Decryption Failed]" and umk != crypto.global_key:
# Fallback: Try global key (for data created before migration)
decrypted_json = crypto.decrypt_text(str(chat.messages), key=crypto.global_key)
chat.messages = decrypted_json # type: ignore[assignment]
except Exception:
chat.messages = "[]" # type: ignore[assignment]
return chat
finally:
db.close()
def change_password_secure(user_id: int, old_password: str, new_password: str) -> tuple[bool, str]:
"""Changes password and re-wraps the Master Key without touching data"""
db = SessionLocal()
try:
user = db.query(User).filter(User.id == user_id).first()
if not user:
return False, "User missing"
# Verify old password hash first
if not verify_password(old_password, user.password_hash):
return False, "Altes Passwort falsch"
# RE-WRAP MAGIC
if user.salt and user.encrypted_master_key:
try:
new_keychain = key_wrapper.rewrap_keychain(
old_password, new_password, user.salt, user.encrypted_master_key
)
user.salt = new_keychain["salt"]
user.encrypted_master_key = new_keychain["encrypted_master_key"]
except ValueError:
return False, "Kryptographischer Fehler beim Umschlüsseln."
else:
# Legacy user migration happens here if they change password!
# If user has no keychain yet, create one using the Global Key
# (Assuming existing data is currently encrypted with Global Key)
pass
# Standard password update
user.password_hash = hash_password(new_password)
# Clear the must_change_password flag after successful rotation
if hasattr(user, "must_change_password"):
user.must_change_password = False
db.commit()
return True, "✅ Passwort geändert & Key neu verschlüsselt (Daten bleiben sicher)"
except Exception as e:
db.rollback()
return False, f"Fehler: {e}"
finally:
db.close()
def save_chat_history(
user_id: int,
provider: str,
model: str,
messages: list,
title: str | None = None,
user_state: dict | None = None,
) -> int | None:
"""Save chat conversation to database. Returns chat ID or None."""
db = SessionLocal()
try:
# Grab the key from the session state
umk = user_state.get("umk") if user_state else crypto.global_key
messages_json = json.dumps(messages)
encrypted_messages = crypto.encrypt_text(messages_json, key=umk)
chat = ChatHistory(
user_id=user_id,
provider=provider,
model=model,
messages=encrypted_messages,
title=title or f"Chat {datetime.now().strftime('%Y-%m-%d %H:%M')}",
is_encrypted=True,
encryption_metadata=json.dumps({"algorithm": "AES-256-GCM-UserKey", "version": 2}),
)
db.add(chat)
db.commit()
db.refresh(chat) # Refresh to get the ID
chat_id = chat.id # Get ID BEFORE closing session
return chat_id
except Exception as e:
db.rollback()
logger.exception(f"Error saving chat history: {e!s}")
raise
finally:
db.close()
def generate_ai_label(provider: str, model: str) -> str:
"""Generate standardized label for AI-generated content"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M")
compliance = (PROVIDERS.get(provider) or {}).get("badge", "Unbekannt") # type: ignore[attr-defined]
return f"""
---
**🤖 KI-Generiert**
- **Provider:** {provider} ({compliance})
- **Modell:** {model}
- **Datum:** {timestamp}
*Dieser Inhalt wurde mit KI-Unterstützung erstellt und durch Menschen geprüft.*
---
"""
def save_transcription(
user_id: int,
provider: str,
model: str,
original: str,
translated: str | None = None,
language: str | None = None,
filename: str | None = None,
title: str | None = None,
user_state: dict | None = None,
):
db = SessionLocal()
try:
# GET KEY FROM SESSION
umk = user_state.get("umk") if user_state else None
if not umk:
# Fallback for system operations if needed, but risky
logger.warning("No user session key found, falling back to global")
umk = crypto.global_key
encrypted_original = crypto.encrypt_text(original, key=umk) if original else None
encrypted_translated = crypto.encrypt_text(translated, key=umk) if translated else None
trans = Transcription(
user_id=user_id,
provider=provider,
model=model or "N/A",
original_text=encrypted_original,
translated_text=encrypted_translated,
language=language,
filename=filename,
title=title or f"Transkript {datetime.now().strftime('%Y-%m-%d %H:%M')}",
is_encrypted=True,
encryption_metadata=json.dumps({"algorithm": "AES-256-GCM", "version": 2}),
)
db.add(trans)
db.commit()
return trans.id
except Exception as e:
db.rollback()
logger.exception(f"Error saving transcription: {e!s}")
raise
finally:
db.close()
def get_decrypted_vision(vision_id: int, user_id: int, user_state=None):
"""Load and decrypt vision result"""
db = SessionLocal()
try:
vis = (
db.query(VisionResult)
.filter(VisionResult.id == vision_id, VisionResult.user_id == user_id)
.first()
)
if not vis:
return None
# Get key from session
umk = user_state.get("umk") if user_state else crypto.global_key
if vis.is_encrypted:
try:
# Decrypt with user key, fallback to global
orig_result = str(vis.result)
vis.result = crypto.decrypt_text(orig_result, key=umk) # type: ignore[assignment]
if vis.result == "[Decryption Failed]" and umk != crypto.global_key:
vis.result = crypto.decrypt_text(orig_result, key=crypto.global_key) # type: ignore[assignment]
if vis.prompt:
orig_prompt = str(vis.prompt)
vis.prompt = crypto.decrypt_text(orig_prompt, key=umk) # type: ignore[assignment]
if vis.prompt == "[Decryption Failed]" and umk != crypto.global_key:
vis.prompt = crypto.decrypt_text(orig_prompt, key=crypto.global_key) # type: ignore[assignment]
except Exception as e:
logger.error(f"Vision decryption failed: {e}")
vis.result = "[Fehler: Entschlüsselung fehlgeschlagen]" # type: ignore[assignment]
return vis
finally:
db.close()
def save_vision_result(
user_id: int,
provider: str,
model: str,
prompt: str,
result: str,
image_path: str | None = None,
user_state=None,
):
"""Save vision analysis to database"""
db = SessionLocal()
try:
# Get Key
umk = user_state.get("umk") if user_state else crypto.global_key
# Encrypt result and prompt manually before creating object
enc_result = crypto.encrypt_text(result, key=umk)
enc_prompt = crypto.encrypt_text(prompt, key=umk)
vision = VisionResult(
user_id=user_id,
provider=provider,
model=model,
prompt=enc_prompt,
result=enc_result,
is_encrypted=True,
image_path=image_path,
encryption_metadata=json.dumps({"algorithm": "AES-256-GCM-UserKey", "version": 2}),
)
db.add(vision)
db.commit()
db.refresh(vision)
vision_id = vision.id
return vision_id
except Exception as e:
db.rollback()
logger.exception(f"Error saving vision result: {e!s}")
raise
finally:
db.close()
def save_generated_image(
user_id: int, provider: str, model: str, prompt: str, image_path: str, user_state=None
):
"""Save generated image to database"""
db = SessionLocal()
try:
umk = user_state.get("umk") if user_state else crypto.global_key
enc_prompt = crypto.encrypt_text(prompt, key=umk)
img = GeneratedImage(
user_id=user_id,
provider=provider,
model=model,
prompt=enc_prompt,
is_encrypted=True,
image_path=image_path,
)
db.add(img)
db.commit()
db.refresh(img)
img_id = img.id
return img_id
except Exception as e:
db.rollback()
logger.exception(f"Error saving generated image: {e!s}")
raise
finally:
db.close()
def get_user_transcriptions(user_id: int, limit: int = 50):
"""Get user's transcription history"""
db = get_db()
results = (
db.query(Transcription)
.filter(Transcription.user_id == user_id)
.order_by(Transcription.timestamp.desc())
.limit(limit)
.all()
)
db.close()
return results
def get_user_custom_prompts(user_id: int, category: str | None = None):
"""Get user's custom prompts"""
db = get_db()
query = db.query(CustomPrompt).filter(CustomPrompt.user_id == user_id)
if category:
query = query.filter(CustomPrompt.category == category)
results = query.order_by(CustomPrompt.timestamp.desc()).all()
db.close()
return results
def save_custom_prompt(
user_id: int, name: str, prompt_text: str, category: str = "general", is_shared: bool = False
):
"""Save a custom prompt template"""
db = SessionLocal()
try:
prompt = CustomPrompt(
user_id=user_id,
name=name,
category=category,
prompt_text=prompt_text,
is_shared=is_shared,
)
db.add(prompt)
db.commit()
db.refresh(prompt)
prompt_id = prompt.id
return prompt_id
except Exception as e:
db.rollback()
logger.exception(f"Error saving custom prompt: {e!s}")
raise
finally:
db.close()
# Add after existing database functions
def delete_transcription(trans_id: int, user_id: int):
"""Delete a transcription safely"""
db = get_db()
try:
trans = (
db.query(Transcription)
.filter(Transcription.id == trans_id, Transcription.user_id == user_id)
.first()
)
if trans:
db.delete(trans)
db.commit()
return True
return False
except Exception as e:
db.rollback()
logger.exception(f"DB Error: {e}")
return False
finally:
db.close()
def delete_chat_history(chat_id: int, user_id: int):
"""Delete a chat history safely"""
db = get_db()
try:
chat = (
db.query(ChatHistory)
.filter(ChatHistory.id == chat_id, ChatHistory.user_id == user_id)
.first()
)
if chat:
db.delete(chat)
db.commit()
return True
return False
except Exception as e:
db.rollback()
logger.exception(f"DB Error: {e}")
return False
finally:
db.close()
def delete_vision_result(vision_id: int, user_id: int):
"""Delete a vision result safely"""
db = get_db()
try:
vision = (
db.query(VisionResult)
.filter(VisionResult.id == vision_id, VisionResult.user_id == user_id)
.first()
)
if vision:
db.delete(vision)
db.commit()
return True
return False
except Exception as e:
db.rollback()
logger.exception(f"DB Error: {e}")
return False
finally:
db.close()
def get_user_settings(user_id: int):
"""Get or create user settings"""
db = get_db()
try:
settings = db.query(UserSettings).filter(UserSettings.user_id == user_id).first()
if not settings:
# Create default settings
settings = UserSettings(user_id=user_id)
db.add(settings)
db.commit()
db.refresh(settings)
return settings
finally:
db.close()
def update_user_settings(user_id: int, **kwargs):
"""Update user settings"""
db = SessionLocal()
try:
settings = db.query(UserSettings).filter(UserSettings.user_id == user_id).first()
if not settings:
settings = UserSettings(user_id=user_id)
db.add(settings)
# Update provided fields
for key, value in kwargs.items():
if hasattr(settings, key):
setattr(settings, key, value)
settings.updated_at = datetime.utcnow() # type: ignore[assignment]
db.commit()
return True, "✅ Einstellungen gespeichert"
except Exception as e:
db.rollback()
logger.exception(f"Error updating settings: {e}")
return False, f"🔥 Fehler: {e!s}"
finally:
db.close()
def get_tool_preferences(user_id: int) -> dict:
"""Load tool preferences for a user. Returns defaults if not set."""
from tools import DEFAULT_TOOLS_STANDARD
_defaults = {
"enabled_tools": DEFAULT_TOOLS_STANDARD,
"search_backend": "tavily",
"image_provider": "auto", # "auto", "Nebius", "BFL"
"num_results": 5,
"max_result_chars": 6000,
}
try:
settings = get_user_settings(user_id)
raw = getattr(settings, "tool_preferences_json", None)
if raw:
stored = json.loads(raw)
_defaults.update(stored)
except Exception as e:
logger.warning(f"get_tool_preferences failed for user {user_id}: {e}")
return _defaults
def save_tool_preferences(user_id: int, prefs: dict) -> tuple[bool, str]:
"""Persist tool preferences for a user."""
try:
return update_user_settings(user_id, tool_preferences_json=json.dumps(prefs))
except Exception as e:
return False, f"🔥 Fehler: {e}"
def delete_generated_image(img_id: int, user_id: int):
"""Delete a generated image safely"""
db = get_db()
try:
img = (
db.query(GeneratedImage)
.filter(GeneratedImage.id == img_id, GeneratedImage.user_id == user_id)
.first()
)
if img:
if img.image_path and os.path.exists(img.image_path):
try:
os.remove(img.image_path)
except Exception:
pass # nosec
db.delete(img)
db.commit()
return True
return False
except Exception as e:
db.rollback()
logger.exception(f"DB Error: {e}")
return False
finally:
db.close()
def delete_custom_prompt(prompt_id: int, user_id: int):
"""Delete a custom prompt"""
db = get_db()
prompt = (
db.query(CustomPrompt)
.filter(CustomPrompt.id == prompt_id, CustomPrompt.user_id == user_id)
.first()
)
if prompt:
db.delete(prompt)
db.commit()
db.close()
return True
db.close()
return False
def get_user_chat_history(user_id: int, limit: int = 50):
"""Get user's chat history"""