Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions apps/server/convex/__tests__/backgroundStream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,9 @@ describe("backgroundStream", () => {

// A new message should have been created
const message = await t.run(async (ctx) => {
return await (ctx.db as any)
return await ctx.db
.query("messages")
.withIndex("by_client_id", (q: any) =>
.withIndex("by_client_id", (q) =>
q.eq("chatId", chatId).eq("clientMessageId", "msg-008"),
)
.first();
Expand Down Expand Up @@ -353,9 +353,9 @@ describe("backgroundStream", () => {

// Only one message should exist for this clientMessageId
const allMessages = await t.run(async (ctx) => {
return await (ctx.db as any)
return await ctx.db
.query("messages")
.withIndex("by_client_id", (q: any) =>
.withIndex("by_client_id", (q) =>
q.eq("chatId", chatId).eq("clientMessageId", "msg-009"),
)
.collect();
Expand Down
68 changes: 49 additions & 19 deletions apps/server/convex/chatFork.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { mutation } from "./_generated/server";
import type { Doc, Id } from "./_generated/dataModel";
import { mutation, type MutationCtx } from "./_generated/server";
import { v } from "convex/values";
import { incrementStat, STAT_KEYS } from "./lib/dbStats";
import { rateLimiter } from "./lib/rateLimiter";
Expand All @@ -7,6 +8,48 @@ import { requireAuthUserId } from "./lib/auth";
import { assertOwnsChat } from "./chats";

const MAX_FORK_MESSAGE_COPY = 200;
const FORK_PAGE_SIZE = 200;

function matchesForkPoint(message: Doc<"messages">, messageId: string): boolean {
return String(message._id) === messageId || message.clientMessageId === messageId;
}

/**
* Loads messages from the start of the chat in ascending order until the fork
* point is reached. Avoids scanning the entire chat when the fork is early.
*/
async function collectMessagesThroughFork(
ctx: MutationCtx,
chatId: Id<"chats">,
messageId: string,
): Promise<Doc<"messages">[]> {
const collected: Doc<"messages">[] = [];
let cursor: string | null = null;

while (true) {
const page = await ctx.db
.query("messages")
.withIndex("by_chat_not_deleted", (q) =>
q.eq("chatId", chatId).eq("deletedAt", undefined),
)
.order("asc")
.paginate({ cursor, numItems: FORK_PAGE_SIZE });

for (const message of page.page) {
collected.push(message);
if (matchesForkPoint(message, messageId)) {
return collected;
}
}

if (page.isDone) {
break;
}
cursor = page.continueCursor;
}

throw new Error("Fork point message not found");
}

export const fork = mutation({
args: {
Expand All @@ -32,26 +75,13 @@ export const fork = mutation({
throwRateLimitError("messages forked", retryAfter);
}

const allMessages = await ctx.db
.query("messages")
.withIndex("by_chat_not_deleted", (q) =>
q.eq("chatId", args.chatId).eq("deletedAt", undefined)
)
.order("asc")
.collect();

const forkIndex = allMessages.findIndex(
(message) =>
String(message._id) === args.messageId ||
message.clientMessageId === args.messageId,
const prefixThroughFork = await collectMessagesThroughFork(
ctx,
args.chatId,
args.messageId,
);
if (forkIndex === -1) {
throw new Error("Fork point message not found");
}

const messagesToCopy = allMessages
.slice(0, forkIndex + 1)
.slice(-MAX_FORK_MESSAGE_COPY);
const messagesToCopy = prefixThroughFork.slice(-MAX_FORK_MESSAGE_COPY);

const now = Date.now();
const newChatId = await ctx.db.insert("chats", {
Expand Down
9 changes: 5 additions & 4 deletions apps/server/convex/chatShares.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,13 @@ async function getActiveShareForChat(
userId: Id<"users">,
chatId: Id<"chats">,
) {
const shares = await ctx.db
return await ctx.db
.query("chatShares")
.withIndex("by_user_chat_revoked_updated", (q) => q.eq("userId", userId).eq("chatId", chatId))
.withIndex("by_user_chat_revoked_updated", (q) =>
q.eq("userId", userId).eq("chatId", chatId).eq("revokedAt", undefined),
)
.order("desc")
.collect();
return shares.find((share) => !share.revokedAt) ?? null;
.first();
}

function normalizePreviewText(text: string) {
Expand Down
13 changes: 7 additions & 6 deletions apps/server/convex/files.ts
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,11 @@ export const getUserQuota = query({
},
});

/** Cap for list queries to avoid unbounded reads on accounts with many uploads. */
const MAX_FILES_LIST = 2000;

/**
* Retrieves all non-deleted files for a specific chat.
* Retrieves non-deleted files for a specific chat (most recent first, capped).
*
* @param chatId - The ID of the chat
* @param userId - The ID of the user (for authorization)
Expand All @@ -440,21 +443,20 @@ export const getFilesByChat = query({
const userId = await requireAuthUserId(ctx, args.userId);
await getOwnedChat(ctx, args.chatId, userId);

// Query all non-deleted files for this chat
const files = await ctx.db
.query("fileUploads")
.withIndex("by_chat_not_deleted", (q) =>
q.eq("chatId", args.chatId).eq("deletedAt", undefined)
)
.order("desc")
.collect();
.take(MAX_FILES_LIST);

return files.map(toFileSummary);
},
});

/**
* Retrieves all non-deleted files for a specific user.
* Retrieves non-deleted files for a specific user (most recent first, capped).
*
* @param userId - The ID of the user
* @returns Array of file metadata objects with chat information
Expand All @@ -477,14 +479,13 @@ export const getFilesByUser = query({
),
handler: async (ctx, args) => {
const userId = await requireAuthUserId(ctx, args.userId);
// Query all non-deleted files for this user
const files = await ctx.db
.query("fileUploads")
.withIndex("by_user_not_deleted", (q) =>
q.eq("userId", userId).eq("deletedAt", undefined)
)
.order("desc")
.collect();
.take(MAX_FILES_LIST);

return files.map((file) => ({
...toFileSummary(file),
Expand Down
50 changes: 26 additions & 24 deletions apps/server/convex/message_helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,30 +34,32 @@ async function validateAttachmentOwnership(
);
}

for (const attachment of attachments) {
const fileUpload = await ctx.db
.query("fileUploads")
.withIndex("by_storage", (q) => q.eq("storageId", attachment.storageId))
.unique();

if (!fileUpload) {
throw new Error(
"Unauthorized: attachment references a file that does not exist in your uploads.",
);
}

if (fileUpload.userId !== userId) {
throw new Error(
"Unauthorized: you do not own the referenced attachment file.",
);
}

if (fileUpload.deletedAt) {
throw new Error(
"Attachment references a file that has been deleted.",
);
}
}
await Promise.all(
attachments.map(async (attachment) => {
const fileUpload = await ctx.db
.query("fileUploads")
.withIndex("by_storage", (q) => q.eq("storageId", attachment.storageId))
.unique();

if (!fileUpload) {
throw new Error(
"Unauthorized: attachment references a file that does not exist in your uploads.",
);
}

if (fileUpload.userId !== userId) {
throw new Error(
"Unauthorized: you do not own the referenced attachment file.",
);
}

if (fileUpload.deletedAt) {
throw new Error(
"Attachment references a file that has been deleted.",
);
}
}),
);
}

export async function getVerifiedStorageIds(
Expand Down
5 changes: 2 additions & 3 deletions apps/server/convex/message_queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,9 @@ export const getFirstUserMessage = query({

const message = await ctx.db
.query("messages")
.withIndex("by_chat_not_deleted", (q) =>
q.eq("chatId", args.chatId).eq("deletedAt", undefined)
.withIndex("by_chat_not_deleted_role", (q) =>
q.eq("chatId", args.chatId).eq("deletedAt", undefined).eq("role", "user"),
)
.filter((q) => q.eq(q.field("role"), "user"))
.order("asc")
.first();

Expand Down
1 change: 1 addition & 0 deletions apps/server/convex/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ export default defineSchema({
.index("by_user", ["userId"])
.index("by_user_status", ["userId", "status", "createdAt"])
.index("by_chat_not_deleted", ["chatId", "deletedAt", "createdAt"])
.index("by_chat_not_deleted_role", ["chatId", "deletedAt", "role", "createdAt"])
.index("by_user_created", ["userId", "createdAt"])
.index("by_stream_id", ["streamId"])
.index("by_chat_status", ["chatId", "status", "deletedAt"]),
Expand Down
16 changes: 13 additions & 3 deletions apps/web/src/lib/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ const STREAM_TTL_SECONDS = 3600;
const STREAM_ERROR_TTL_SECONDS = 600;
const TYPING_TTL_SECONDS = 3;
const PRESENCE_TTL_SECONDS = 60;
/** Per-user unread hash: refresh on write/read so inactive users do not retain keys forever. */
const UNREAD_HASH_TTL_SECONDS = 60 * 60 * 24 * 90;
const ONLINE_WINDOW_MS = 60_000;

const keys = {
Expand Down Expand Up @@ -290,24 +292,32 @@ export async function isUserOnline(userId: string): Promise<boolean> {
export async function incrementUnread(userId: string, chatId: string): Promise<void> {
const client = await getConnectedClient();
if (!client) return;
await client.hincrby(keys.unread(userId), chatId, 1);
const key = keys.unread(userId);
await client.hincrby(key, chatId, 1);
await client.expire(key, UNREAD_HASH_TTL_SECONDS);
}

export async function clearUnread(userId: string, chatId: string): Promise<void> {
const client = await getConnectedClient();
if (!client) return;
await client.hdel(keys.unread(userId), chatId);
const key = keys.unread(userId);
await client.hdel(key, chatId);
await client.expire(key, UNREAD_HASH_TTL_SECONDS);
}

export async function getUnreadCounts(userId: string): Promise<Record<string, number>> {
const client = await getConnectedClient();
if (!client) return {};

const counts = await client.hgetall<Record<string, string>>(keys.unread(userId));
const key = keys.unread(userId);
const counts = await client.hgetall<Record<string, string>>(key);
const result: Record<string, number> = {};
for (const [chatId, count] of Object.entries(counts ?? {})) {
result[chatId] = Number.parseInt(count, 10);
}
if (Object.keys(result).length > 0) {
await client.expire(key, UNREAD_HASH_TTL_SECONDS);
}
return result;
}

Expand Down
5 changes: 5 additions & 0 deletions apps/web/src/lib/upstash.ts
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,11 @@ function createSlidingWindowRatelimit(
return {
limit: async (identifier: string) => {
const now = Date.now();
for (const [key, entry] of memoryRatelimitStore) {
if (entry.resetAt <= now) {
memoryRatelimitStore.delete(key);
}
}
const key = `${prefix}:${identifier}`;
const existing = memoryRatelimitStore.get(key);
if (!existing || existing.resetAt <= now) {
Expand Down
Loading