diff --git a/apps/server/convex/__tests__/backgroundStream.test.ts b/apps/server/convex/__tests__/backgroundStream.test.ts index 43bf9b7f..9784e4ad 100644 --- a/apps/server/convex/__tests__/backgroundStream.test.ts +++ b/apps/server/convex/__tests__/backgroundStream.test.ts @@ -299,9 +299,9 @@ describe("backgroundStream", () => { // A new message should have been created const message = await t.run(async (ctx) => { - return await (ctx.db as any) + return await ctx.db .query("messages") - .withIndex("by_client_id", (q: any) => + .withIndex("by_client_id", (q) => q.eq("chatId", chatId).eq("clientMessageId", "msg-008"), ) .first(); @@ -353,9 +353,9 @@ describe("backgroundStream", () => { // Only one message should exist for this clientMessageId const allMessages = await t.run(async (ctx) => { - return await (ctx.db as any) + return await ctx.db .query("messages") - .withIndex("by_client_id", (q: any) => + .withIndex("by_client_id", (q) => q.eq("chatId", chatId).eq("clientMessageId", "msg-009"), ) .collect(); diff --git a/apps/server/convex/chatFork.ts b/apps/server/convex/chatFork.ts index 1749b6a6..8289b8e4 100644 --- a/apps/server/convex/chatFork.ts +++ b/apps/server/convex/chatFork.ts @@ -1,4 +1,5 @@ -import { mutation } from "./_generated/server"; +import type { Doc, Id } from "./_generated/dataModel"; +import { mutation, type MutationCtx } from "./_generated/server"; import { v } from "convex/values"; import { incrementStat, STAT_KEYS } from "./lib/dbStats"; import { rateLimiter } from "./lib/rateLimiter"; @@ -7,6 +8,48 @@ import { requireAuthUserId } from "./lib/auth"; import { assertOwnsChat } from "./chats"; const MAX_FORK_MESSAGE_COPY = 200; +const FORK_PAGE_SIZE = 200; + +function matchesForkPoint(message: Doc<"messages">, messageId: string): boolean { + return String(message._id) === messageId || message.clientMessageId === messageId; +} + +/** + * Loads messages from the start of the chat in ascending order until the fork + * point is reached. Avoids scanning the entire chat when the fork is early. + */ +async function collectMessagesThroughFork( + ctx: MutationCtx, + chatId: Id<"chats">, + messageId: string, +): Promise[]> { + const collected: Doc<"messages">[] = []; + let cursor: string | null = null; + + while (true) { + const page = await ctx.db + .query("messages") + .withIndex("by_chat_not_deleted", (q) => + q.eq("chatId", chatId).eq("deletedAt", undefined), + ) + .order("asc") + .paginate({ cursor, numItems: FORK_PAGE_SIZE }); + + for (const message of page.page) { + collected.push(message); + if (matchesForkPoint(message, messageId)) { + return collected; + } + } + + if (page.isDone) { + break; + } + cursor = page.continueCursor; + } + + throw new Error("Fork point message not found"); +} export const fork = mutation({ args: { @@ -32,26 +75,13 @@ export const fork = mutation({ throwRateLimitError("messages forked", retryAfter); } - const allMessages = await ctx.db - .query("messages") - .withIndex("by_chat_not_deleted", (q) => - q.eq("chatId", args.chatId).eq("deletedAt", undefined) - ) - .order("asc") - .collect(); - - const forkIndex = allMessages.findIndex( - (message) => - String(message._id) === args.messageId || - message.clientMessageId === args.messageId, + const prefixThroughFork = await collectMessagesThroughFork( + ctx, + args.chatId, + args.messageId, ); - if (forkIndex === -1) { - throw new Error("Fork point message not found"); - } - const messagesToCopy = allMessages - .slice(0, forkIndex + 1) - .slice(-MAX_FORK_MESSAGE_COPY); + const messagesToCopy = prefixThroughFork.slice(-MAX_FORK_MESSAGE_COPY); const now = Date.now(); const newChatId = await ctx.db.insert("chats", { diff --git a/apps/server/convex/chatShares.ts b/apps/server/convex/chatShares.ts index 1e59ac63..17843e2b 100644 --- a/apps/server/convex/chatShares.ts +++ b/apps/server/convex/chatShares.ts @@ -68,12 +68,13 @@ async function getActiveShareForChat( userId: Id<"users">, chatId: Id<"chats">, ) { - const shares = await ctx.db + return await ctx.db .query("chatShares") - .withIndex("by_user_chat_revoked_updated", (q) => q.eq("userId", userId).eq("chatId", chatId)) + .withIndex("by_user_chat_revoked_updated", (q) => + q.eq("userId", userId).eq("chatId", chatId).eq("revokedAt", undefined), + ) .order("desc") - .collect(); - return shares.find((share) => !share.revokedAt) ?? null; + .first(); } function normalizePreviewText(text: string) { diff --git a/apps/server/convex/files.ts b/apps/server/convex/files.ts index d38c6878..fd502671 100644 --- a/apps/server/convex/files.ts +++ b/apps/server/convex/files.ts @@ -413,8 +413,11 @@ export const getUserQuota = query({ }, }); +/** Cap for list queries to avoid unbounded reads on accounts with many uploads. */ +const MAX_FILES_LIST = 2000; + /** - * Retrieves all non-deleted files for a specific chat. + * Retrieves non-deleted files for a specific chat (most recent first, capped). * * @param chatId - The ID of the chat * @param userId - The ID of the user (for authorization) @@ -440,21 +443,20 @@ export const getFilesByChat = query({ const userId = await requireAuthUserId(ctx, args.userId); await getOwnedChat(ctx, args.chatId, userId); - // Query all non-deleted files for this chat const files = await ctx.db .query("fileUploads") .withIndex("by_chat_not_deleted", (q) => q.eq("chatId", args.chatId).eq("deletedAt", undefined) ) .order("desc") - .collect(); + .take(MAX_FILES_LIST); return files.map(toFileSummary); }, }); /** - * Retrieves all non-deleted files for a specific user. + * Retrieves non-deleted files for a specific user (most recent first, capped). * * @param userId - The ID of the user * @returns Array of file metadata objects with chat information @@ -477,14 +479,13 @@ export const getFilesByUser = query({ ), handler: async (ctx, args) => { const userId = await requireAuthUserId(ctx, args.userId); - // Query all non-deleted files for this user const files = await ctx.db .query("fileUploads") .withIndex("by_user_not_deleted", (q) => q.eq("userId", userId).eq("deletedAt", undefined) ) .order("desc") - .collect(); + .take(MAX_FILES_LIST); return files.map((file) => ({ ...toFileSummary(file), diff --git a/apps/server/convex/message_helpers.ts b/apps/server/convex/message_helpers.ts index 52d7d67e..8bd22016 100644 --- a/apps/server/convex/message_helpers.ts +++ b/apps/server/convex/message_helpers.ts @@ -34,30 +34,32 @@ async function validateAttachmentOwnership( ); } - for (const attachment of attachments) { - const fileUpload = await ctx.db - .query("fileUploads") - .withIndex("by_storage", (q) => q.eq("storageId", attachment.storageId)) - .unique(); - - if (!fileUpload) { - throw new Error( - "Unauthorized: attachment references a file that does not exist in your uploads.", - ); - } - - if (fileUpload.userId !== userId) { - throw new Error( - "Unauthorized: you do not own the referenced attachment file.", - ); - } - - if (fileUpload.deletedAt) { - throw new Error( - "Attachment references a file that has been deleted.", - ); - } - } + await Promise.all( + attachments.map(async (attachment) => { + const fileUpload = await ctx.db + .query("fileUploads") + .withIndex("by_storage", (q) => q.eq("storageId", attachment.storageId)) + .unique(); + + if (!fileUpload) { + throw new Error( + "Unauthorized: attachment references a file that does not exist in your uploads.", + ); + } + + if (fileUpload.userId !== userId) { + throw new Error( + "Unauthorized: you do not own the referenced attachment file.", + ); + } + + if (fileUpload.deletedAt) { + throw new Error( + "Attachment references a file that has been deleted.", + ); + } + }), + ); } export async function getVerifiedStorageIds( diff --git a/apps/server/convex/message_queries.ts b/apps/server/convex/message_queries.ts index 8dfcc746..86c7440d 100644 --- a/apps/server/convex/message_queries.ts +++ b/apps/server/convex/message_queries.ts @@ -120,10 +120,9 @@ export const getFirstUserMessage = query({ const message = await ctx.db .query("messages") - .withIndex("by_chat_not_deleted", (q) => - q.eq("chatId", args.chatId).eq("deletedAt", undefined) + .withIndex("by_chat_not_deleted_role", (q) => + q.eq("chatId", args.chatId).eq("deletedAt", undefined).eq("role", "user"), ) - .filter((q) => q.eq(q.field("role"), "user")) .order("asc") .first(); diff --git a/apps/server/convex/schema.ts b/apps/server/convex/schema.ts index 08d07572..3b859d09 100644 --- a/apps/server/convex/schema.ts +++ b/apps/server/convex/schema.ts @@ -180,6 +180,7 @@ export default defineSchema({ .index("by_user", ["userId"]) .index("by_user_status", ["userId", "status", "createdAt"]) .index("by_chat_not_deleted", ["chatId", "deletedAt", "createdAt"]) + .index("by_chat_not_deleted_role", ["chatId", "deletedAt", "role", "createdAt"]) .index("by_user_created", ["userId", "createdAt"]) .index("by_stream_id", ["streamId"]) .index("by_chat_status", ["chatId", "status", "deletedAt"]), diff --git a/apps/web/src/lib/redis.ts b/apps/web/src/lib/redis.ts index 784fdb5f..5c40fd10 100644 --- a/apps/web/src/lib/redis.ts +++ b/apps/web/src/lib/redis.ts @@ -4,6 +4,8 @@ const STREAM_TTL_SECONDS = 3600; const STREAM_ERROR_TTL_SECONDS = 600; const TYPING_TTL_SECONDS = 3; const PRESENCE_TTL_SECONDS = 60; +/** Per-user unread hash: refresh on write/read so inactive users do not retain keys forever. */ +const UNREAD_HASH_TTL_SECONDS = 60 * 60 * 24 * 90; const ONLINE_WINDOW_MS = 60_000; const keys = { @@ -290,24 +292,32 @@ export async function isUserOnline(userId: string): Promise { export async function incrementUnread(userId: string, chatId: string): Promise { const client = await getConnectedClient(); if (!client) return; - await client.hincrby(keys.unread(userId), chatId, 1); + const key = keys.unread(userId); + await client.hincrby(key, chatId, 1); + await client.expire(key, UNREAD_HASH_TTL_SECONDS); } export async function clearUnread(userId: string, chatId: string): Promise { const client = await getConnectedClient(); if (!client) return; - await client.hdel(keys.unread(userId), chatId); + const key = keys.unread(userId); + await client.hdel(key, chatId); + await client.expire(key, UNREAD_HASH_TTL_SECONDS); } export async function getUnreadCounts(userId: string): Promise> { const client = await getConnectedClient(); if (!client) return {}; - const counts = await client.hgetall>(keys.unread(userId)); + const key = keys.unread(userId); + const counts = await client.hgetall>(key); const result: Record = {}; for (const [chatId, count] of Object.entries(counts ?? {})) { result[chatId] = Number.parseInt(count, 10); } + if (Object.keys(result).length > 0) { + await client.expire(key, UNREAD_HASH_TTL_SECONDS); + } return result; } diff --git a/apps/web/src/lib/upstash.ts b/apps/web/src/lib/upstash.ts index 8bae7401..debc670d 100644 --- a/apps/web/src/lib/upstash.ts +++ b/apps/web/src/lib/upstash.ts @@ -268,6 +268,11 @@ function createSlidingWindowRatelimit( return { limit: async (identifier: string) => { const now = Date.now(); + for (const [key, entry] of memoryRatelimitStore) { + if (entry.resetAt <= now) { + memoryRatelimitStore.delete(key); + } + } const key = `${prefix}:${identifier}`; const existing = memoryRatelimitStore.get(key); if (!existing || existing.resetAt <= now) {