-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
160 lines (131 loc) · 5.67 KB
/
main.py
File metadata and controls
160 lines (131 loc) · 5.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
from fastapi import FastAPI, Query, HTTPException
from fastapi.responses import HTMLResponse, StreamingResponse
import httpx, requests, re, json
from urllib.parse import urljoin, quote_plus, unquote
import os
import hashlib
import time
app = FastAPI()
CACHE_DIR = "/tmp/image_cache"
# raise the time for expiry if you want
CACHE_EXPIRY_SECONDS = 30 * 60
os.makedirs(CACHE_DIR, exist_ok=True)
VIEWER_CACHE_DIR = "/tmp/viewer_cache"
# raise the time for expiry if you want
VIEWER_CACHE_EXPIRY_SECONDS = 10 * 60
os.makedirs(VIEWER_CACHE_DIR, exist_ok=True)
# cached files cleaner
async def clean_cached_files():
now = time.time()
for directory, expiry in [(CACHE_DIR, CACHE_EXPIRY_SECONDS), (VIEWER_CACHE_DIR, VIEWER_CACHE_EXPIRY_SECONDS)]:
for filename in os.listdir(directory):
path = os.path.join(directory, filename)
try:
if os.path.isfile(path) and now - os.path.getmtime(path) > expiry:
os.remove(path)
except:
pass
def trigger_cache_cleanup():
import asyncio
asyncio.create_task(clean_cached_files())
def cache_file_path(url: str) -> str:
return os.path.join(CACHE_DIR, f"{hashlib.sha256(url.encode()).hexdigest()}.cache")
def cache_meta_path(url: str) -> str:
return os.path.join(CACHE_DIR, f"{hashlib.sha256(url.encode()).hexdigest()}.meta")
def viewer_cache_path(url: str) -> str:
return os.path.join(VIEWER_CACHE_DIR, f"{hashlib.sha256(url.encode()).hexdigest()}.html")
# image scraper
async def scrape_cdns_and_images(chapter_url: str):
headers = {
"Referer": "https://www.mangakakalot.gg/",
"User-Agent": "Mozilla/5.0 Chrome/115.0 Safari/537.36",
}
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.get(chapter_url, headers=headers)
if resp.status_code != 200:
raise Exception(f"Failed to fetch HTML: {resp.status_code}")
html = resp.text
cdns_match = re.search(r'var cdns = (\[.*?\]);', html, re.DOTALL)
images_match = re.search(r'var chapterImages = (\[.*?\]);', html, re.DOTALL)
if not cdns_match or not images_match:
raise Exception("Failed to find cdns or chapterImages")
cdns = json.loads(cdns_match.group(1).replace('\\/', '/'))
chapter_images = json.loads(images_match.group(1).replace('\\/', '/'))
return [urljoin(cdns[0] + "/", path) for path in chapter_images]
# view the images in a page
@app.get("/viewer", response_class=HTMLResponse)
async def viewer(url: str = Query(...)):
trigger_cache_cleanup()
cache_path = viewer_cache_path(url)
now = time.time()
if os.path.exists(cache_path) and now - os.path.getmtime(cache_path) < VIEWER_CACHE_EXPIRY_SECONDS:
with open(cache_path, "r", encoding="utf-8") as f:
return HTMLResponse(f.read())
try:
image_urls = await scrape_cdns_and_images(url)
except Exception as e:
return HTMLResponse(f"<h1>Error scraping images: {e}</h1>", status_code=500)
html = """
<html><head><title>Manga Viewer</title>
<style>
body { background:#111; color:#fff; text-align:center; }
#spinner { position: fixed; top: 50%; left: 50%; transform: translate(-50%, -50%); }
.loader { border: 8px solid #f3f3f3; border-top: 8px solid #3498db;
border-radius: 50%; width: 60px; height: 60px; animation: spin 1s linear infinite; margin: 0 auto; }
@keyframes spin { 0% { transform: rotate(0deg); } 100% { transform: rotate(360deg); } }
.manga-image { display: none; max-width: 90%; margin: 10px auto; display: block; }
</style></head><body>
<div id="spinner"><div class="loader"></div><p>Loading images...</p></div>
<h1>Manga Viewer for chapter</h1>
"""
for img_url in image_urls:
proxied_url = f"/proxy?img={quote_plus(img_url)}"
html += f'<img src="{proxied_url}" class="manga-image" loading="lazy" />'
html += """
<script>
window.addEventListener('load', () => {
document.getElementById('spinner').style.display = 'none';
document.querySelectorAll('.manga-image').forEach(img => img.style.display = 'block');
});
</script></body></html>
"""
with open(cache_path, "w", encoding="utf-8") as f:
f.write(html)
return HTMLResponse(html)
@app.get("/")
def root():
return {"status": "ok"}
# proxy endpoint
@app.get("/proxy")
async def proxy(img: str = Query(...)):
trigger_cache_cleanup()
img_url = unquote(img)
if not img_url.startswith("http"):
raise HTTPException(status_code=400, detail="Invalid image URL")
cache_path = cache_file_path(img_url)
meta_path = cache_meta_path(img_url)
now = time.time()
if os.path.exists(cache_path) and os.path.exists(meta_path):
if now - os.path.getmtime(cache_path) < CACHE_EXPIRY_SECONDS:
def iterfile():
with open(cache_path, "rb") as f:
yield from f
with open(meta_path, "r") as f:
content_type = f.read().strip() or "image/webp"
return StreamingResponse(iterfile(), media_type=content_type)
headers = {
"Referer": "https://www.mangakakalot.gg/",
"User-Agent": "Mozilla/5.0 Chrome/115.0 Safari/537.36",
}
try:
r = requests.get(img_url, headers=headers, timeout=15)
r.raise_for_status()
except Exception as e:
raise HTTPException(status_code=502, detail=str(e))
content = r.content
content_type = r.headers.get("content-type", "image/webp")
with open(cache_path, "wb") as f:
f.write(content)
with open(meta_path, "w") as f:
f.write(content_type)
return StreamingResponse(iter([content]), media_type=content_type)