-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabase.py
More file actions
280 lines (245 loc) · 9.25 KB
/
Copy pathdatabase.py
File metadata and controls
280 lines (245 loc) · 9.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
"""
Database management for Telegram Monitor
Using PostgreSQL
"""
import psycopg2
import psycopg2.extras
import json
import os
from datetime import datetime
from pathlib import Path
# Database connection from environment variables
DB_CONFIG = {
'host': os.getenv('POSTGRES_HOST', 'localhost'),
'port': os.getenv('POSTGRES_PORT', '5432'),
'database': os.getenv('POSTGRES_DB', 'telegram_monitor'),
'user': os.getenv('POSTGRES_USER', 'postgres'),
'password': os.getenv('POSTGRES_PASSWORD', ''),
}
def init_db():
"""Initialize database with required tables"""
conn = psycopg2.connect(**DB_CONFIG)
cursor = conn.cursor()
# Accounts table
cursor.execute("""
CREATE TABLE IF NOT EXISTS accounts (
id SERIAL PRIMARY KEY,
phone_number VARCHAR(50) UNIQUE NOT NULL,
session_name VARCHAR(255) NOT NULL,
api_id INTEGER NOT NULL,
api_hash VARCHAR(255) NOT NULL,
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Monitors table (removed webhook_urls)
cursor.execute("""
CREATE TABLE IF NOT EXISTS monitors (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
account_id INTEGER NOT NULL REFERENCES accounts(id) ON DELETE CASCADE,
channel_ids JSONB NOT NULL,
status VARCHAR(50) DEFAULT 'stopped',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Telegram messages table (comprehensive schema)
cursor.execute("""
CREATE TABLE IF NOT EXISTS telegram_messages (
id SERIAL PRIMARY KEY,
monitor_id INTEGER NOT NULL REFERENCES monitors(id) ON DELETE CASCADE,
channel_id BIGINT NOT NULL,
message_id INTEGER NOT NULL,
date TIMESTAMP NOT NULL,
edit_date TIMESTAMP,
-- Message content
text TEXT,
caption TEXT,
link TEXT,
-- Sender info
from_user_id BIGINT,
from_user_username VARCHAR(255),
from_user_first_name VARCHAR(255),
from_user_last_name VARCHAR(255),
-- Channel info
channel_username VARCHAR(255),
channel_title VARCHAR(255),
-- Media flags
has_photo BOOLEAN DEFAULT FALSE,
has_video BOOLEAN DEFAULT FALSE,
has_document BOOLEAN DEFAULT FALSE,
has_audio BOOLEAN DEFAULT FALSE,
has_voice BOOLEAN DEFAULT FALSE,
has_sticker BOOLEAN DEFAULT FALSE,
has_animation BOOLEAN DEFAULT FALSE,
has_video_note BOOLEAN DEFAULT FALSE,
has_poll BOOLEAN DEFAULT FALSE,
has_location BOOLEAN DEFAULT FALSE,
has_contact BOOLEAN DEFAULT FALSE,
has_venue BOOLEAN DEFAULT FALSE,
has_dice BOOLEAN DEFAULT FALSE,
has_game BOOLEAN DEFAULT FALSE,
has_web_page BOOLEAN DEFAULT FALSE,
-- Media details
media_type VARCHAR(50),
file_id TEXT,
file_unique_id TEXT,
file_name TEXT,
file_size BIGINT,
mime_type VARCHAR(255),
-- Engagement metrics
views INTEGER DEFAULT 0,
forwards INTEGER DEFAULT 0,
-- Reply info
is_reply BOOLEAN DEFAULT FALSE,
reply_to_message_id INTEGER,
-- Forward info
is_forwarded BOOLEAN DEFAULT FALSE,
forward_from_chat_id BIGINT,
forward_from_message_id INTEGER,
-- Message attributes
is_pinned BOOLEAN DEFAULT FALSE,
is_edited BOOLEAN DEFAULT FALSE,
-- JSON fields
reactions JSONB,
entities JSONB,
reply_markup JSONB,
raw_data JSONB,
-- Processing
processed BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- Unique constraint to prevent duplicate messages
UNIQUE(monitor_id, channel_id, message_id)
)
""")
# Create indexes
cursor.execute("CREATE INDEX IF NOT EXISTS idx_messages_monitor_id ON telegram_messages(monitor_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_messages_channel_id ON telegram_messages(channel_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_messages_date ON telegram_messages(date DESC)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_messages_created_at ON telegram_messages(created_at DESC)")
conn.commit()
cursor.close()
conn.close()
class Database:
def __init__(self):
init_db()
def get_connection(self):
return psycopg2.connect(**DB_CONFIG)
# Account operations
def create_account(self, phone_number, session_name, api_id, api_hash):
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO accounts (phone_number, session_name, api_id, api_hash)
VALUES (%s, %s, %s, %s)
RETURNING id
""", (phone_number, session_name, api_id, api_hash))
account_id = cursor.fetchone()[0]
conn.commit()
cursor.close()
conn.close()
return account_id
def get_accounts(self):
conn = self.get_connection()
cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cursor.execute("SELECT * FROM accounts WHERE is_active = TRUE")
accounts = cursor.fetchall()
cursor.close()
conn.close()
return accounts
def get_account(self, account_id):
conn = self.get_connection()
cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cursor.execute("SELECT * FROM accounts WHERE id = %s", (account_id,))
account = cursor.fetchone()
cursor.close()
conn.close()
return account
# Monitor operations (removed webhook_urls)
def create_monitor(self, name, account_id, channel_ids):
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO monitors (name, account_id, channel_ids)
VALUES (%s, %s, %s)
RETURNING id
""", (name, account_id, json.dumps(channel_ids)))
monitor_id = cursor.fetchone()[0]
conn.commit()
cursor.close()
conn.close()
return monitor_id
def get_monitors(self):
conn = self.get_connection()
cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cursor.execute("""
SELECT m.*, a.phone_number
FROM monitors m
JOIN accounts a ON m.account_id = a.id
""")
monitors = []
for row in cursor.fetchall():
monitor = dict(row)
monitor['channel_ids'] = json.loads(monitor['channel_ids']) if isinstance(monitor['channel_ids'], str) else monitor['channel_ids']
monitors.append(monitor)
cursor.close()
conn.close()
return monitors
def get_monitor(self, monitor_id):
conn = self.get_connection()
cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cursor.execute("SELECT * FROM monitors WHERE id = %s", (monitor_id,))
monitor = cursor.fetchone()
cursor.close()
conn.close()
if monitor:
monitor = dict(monitor)
monitor['channel_ids'] = json.loads(monitor['channel_ids']) if isinstance(monitor['channel_ids'], str) else monitor['channel_ids']
return monitor
def update_monitor_status(self, monitor_id, status):
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute("""
UPDATE monitors
SET status = %s, updated_at = CURRENT_TIMESTAMP
WHERE id = %s
""", (status, monitor_id))
conn.commit()
cursor.close()
conn.close()
def update_monitor(self, monitor_id, name, channel_ids):
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute("""
UPDATE monitors
SET name = %s, channel_ids = %s, updated_at = CURRENT_TIMESTAMP
WHERE id = %s
""", (name, json.dumps(channel_ids), monitor_id))
conn.commit()
cursor.close()
conn.close()
def delete_monitor(self, monitor_id):
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM monitors WHERE id = %s", (monitor_id,))
conn.commit()
cursor.close()
conn.close()
# Message operations
def log_message(self, monitor_id, channel_id, message_id, message_type, has_media):
"""Legacy method for backward compatibility"""
pass
def get_recent_messages(self, monitor_id, limit=50):
conn = self.get_connection()
cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cursor.execute("""
SELECT * FROM telegram_messages
WHERE monitor_id = %s
ORDER BY created_at DESC
LIMIT %s
""", (monitor_id, limit))
messages = cursor.fetchall()
cursor.close()
conn.close()
return messages