-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_api.py
More file actions
77 lines (68 loc) · 2.6 KB
/
Copy pathtest_api.py
File metadata and controls
77 lines (68 loc) · 2.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import requests
import time
import sys
from datetime import datetime
BASE_URL = "http://localhost:8000/api/v1"
def test_api():
print("=== Testing FastAPI Endpoints ===")
# 1. Test Config API
print("\n1. Testing Config API...")
try:
resp = requests.get(f"{BASE_URL}/config/")
if resp.status_code == 200:
config = resp.json().get("config", {})
print(f"[SUCCESS] Config API OK. Got {len(config)} keys.")
else:
print(f"[FAILURE] Config API Failed: {resp.status_code} - {resp.text}")
except Exception as e:
print(f"[FAILURE] Config API Error: {e}")
# 2. Test Task Submission
print("\n2. Testing Task Submission...")
task_id = None
try:
payload = {"query": "Test Query"}
resp = requests.post(f"{BASE_URL}/tasks/submit?query=Test Query", json={}) # Query param as per code
if resp.status_code == 200:
data = resp.json()
task_id = data.get("task_id")
print(f"[SUCCESS] Task Submitted. ID: {task_id}")
else:
print(f"[FAILURE] Task Submission Failed: {resp.status_code} - {resp.text}")
except Exception as e:
print(f"[FAILURE] Task Submission Error: {e}")
if not task_id:
print("Skipping task status check due to submission failure.")
return
# 3. Test Task Status
print(f"\n3. Checking Status for Task {task_id}...")
for _ in range(5):
try:
resp = requests.get(f"{BASE_URL}/tasks/status/{task_id}")
if resp.status_code == 200:
status_data = resp.json()
status = status_data.get("status")
print(f" Current Status: {status}")
if status in ["SUCCESS", "FAILURE"]:
print(f"[SUCCESS] Task Finished with status: {status}")
break
else:
print(f"[FAILURE] Status Check Failed: {resp.status_code}")
except Exception as e:
print(f"[FAILURE] Status Check Error: {e}")
time.sleep(2)
# 4. Test Sync API
print("\n4. Testing Sync API...")
try:
resp = requests.get(f"{BASE_URL}/sync/data")
if resp.status_code == 200:
data = resp.json()
print(f"[SUCCESS] Sync API OK. Returned {len(data)} records.")
else:
print(f"[FAILURE] Sync API Failed: {resp.status_code}")
except Exception as e:
print(f"[FAILURE] Sync API Error: {e}")
if __name__ == "__main__":
# Wait for services to warm up
print("Waiting 5s for services to start...")
time.sleep(5)
test_api()