diff --git a/__tests__/pages/assistant.test.tsx b/__tests__/pages/assistant.test.tsx index 1696725..99c96dc 100644 --- a/__tests__/pages/assistant.test.tsx +++ b/__tests__/pages/assistant.test.tsx @@ -28,6 +28,7 @@ const mockSocket = { emit: jest.fn(), auth: { token: 'mock-token' }, hasListeners: jest.fn(() => false), + connected: true, }; jest.mock('socket.io-client', () => ({ @@ -1323,82 +1324,201 @@ describe('EventAssistantRoom', () => { }); }); - describe('Jargon message routing', () => { - it('displays jargon clarification messages inline in the assistant panel when jargonFilterAgentId is set', async () => { - const conversationWithJargon = { + describe('Multi-agent channel subscription and message fetching', () => { + it('subscribes to direct channels for all agents in the conversation', async () => { + (createConversationFromData as jest.Mock).mockResolvedValue({ agents: [ { id: 'agent-123', agentType: 'eventAssistant' }, { id: 'jargon-agent-456', agentType: 'jargonFilterAgent' }, + { id: 'future-agent-789', agentType: 'someNewAgent' }, ], type: { name: 'eventAssistant' }, + }); + + (RetrieveData as jest.Mock).mockImplementation((path: string) => { + if (path.startsWith('conversations/')) return Promise.resolve({ agents: [] }); + return Promise.resolve([]); + }); + + await act(async () => { + render(); + }); + + await waitFor(() => { + expect(mockSocket.emit).toHaveBeenCalledWith( + 'conversation:join', + expect.objectContaining({ + channels: expect.arrayContaining([ + { name: 'direct-user-123-agent-123', passcode: null, direct: true }, + { name: 'direct-user-123-jargon-agent-456', passcode: null, direct: true }, + { name: 'direct-user-123-future-agent-789', passcode: null, direct: true }, + ]), + }), + ); + }); + }); + + it('fetches messages from all agent direct channels', async () => { + (createConversationFromData as jest.Mock).mockResolvedValue({ + agents: [ + { id: 'agent-123', agentType: 'eventAssistant' }, + { id: 'jargon-agent-456', agentType: 'jargonFilterAgent' }, + ], + type: { name: 'eventAssistant' }, + }); + + (RetrieveData as jest.Mock).mockImplementation((path: string) => { + if (path.startsWith('conversations/')) return Promise.resolve({ agents: [] }); + return Promise.resolve([]); + }); + + await act(async () => { + render(); + }); + + await waitFor(() => { + expect(RetrieveData).toHaveBeenCalledWith( + 'messages/test-conversation-id?channel=direct-user-123-agent-123', + 'mock-access-token', + ); + expect(RetrieveData).toHaveBeenCalledWith( + 'messages/test-conversation-id?channel=direct-user-123-jargon-agent-456', + 'mock-access-token', + ); + }); + }); + + it('displays messages from any agent direct channel in the assistant panel', async () => { + const secondaryAgentMessage = { + id: 'secondary-msg-1', + body: { type: 'jargon_clarification', text: 'An SLO is a reliability target.', sourceText: 'Our SLOs...' }, + bodyType: 'json', + fromAgent: true, + channels: ['direct-user-123-jargon-agent-456'], + pseudonym: 'Jargon Filter', + createdAt: '2024-01-01T10:00:00Z', + pause: false, + visible: true, + upVotes: [], + downVotes: [], }; - (createConversationFromData as jest.Mock).mockResolvedValue(conversationWithJargon); + + (createConversationFromData as jest.Mock).mockResolvedValue({ + agents: [ + { id: 'agent-123', agentType: 'eventAssistant' }, + { id: 'jargon-agent-456', agentType: 'jargonFilterAgent' }, + ], + type: { name: 'eventAssistant' }, + }); (RetrieveData as jest.Mock).mockImplementation((path: string) => { - if (path.startsWith('conversations/')) { - return Promise.resolve(conversationWithJargon); - } else if (path.includes('users/user/') && path.includes('/preferences')) { - return Promise.resolve({ jargonClarification: true }); - } else if (path.startsWith('messages/')) { - return Promise.resolve([]); - } - return Promise.resolve({}); + if (path.startsWith('conversations/')) return Promise.resolve({ agents: [] }); + if (path.includes('jargon-agent-456')) return Promise.resolve([secondaryAgentMessage]); + return Promise.resolve([]); }); + const user = userEvent.setup(); + await act(async () => { render(); }); - // Wait for jargonFilterAgentId to be set from conversation data + await waitFor(() => expect(screen.getAllByLabelText('Berkie').length).toBeGreaterThan(0)); + await user.click(screen.getAllByLabelText('Berkie')[0]); + await waitFor(() => { - expect(mockSocket.on).toHaveBeenCalledWith('message:new', expect.any(Function)); + expect(screen.getByText('An SLO is a reliability target.')).toBeInTheDocument(); + }); + }); + + it('displays real-time messages from any agent direct channel in the assistant panel', async () => { + (createConversationFromData as jest.Mock).mockResolvedValue({ + agents: [ + { id: 'agent-123', agentType: 'eventAssistant' }, + { id: 'jargon-agent-456', agentType: 'jargonFilterAgent' }, + ], + type: { name: 'eventAssistant' }, + }); + + (RetrieveData as jest.Mock).mockImplementation((path: string) => { + if (path.startsWith('conversations/')) return Promise.resolve({ agents: [] }); + return Promise.resolve([]); }); - // Retrieve the most recently registered message:new handler — it has jargonFilterAgentId in its closure + await act(async () => { + render(); + }); + + await waitFor(() => expect(mockSocket.on).toHaveBeenCalledWith('message:new', expect.any(Function))); + const messageHandler: Function = mockSocket.on.mock.calls .filter(([event]: [string]) => event === 'message:new') .map(([, handler]: [string, Function]) => handler) .at(-1)!; - expect(messageHandler).toBeDefined(); - expect(typeof messageHandler).toBe('function'); + const user = userEvent.setup(); + await waitFor(() => expect(screen.getAllByLabelText('Berkie').length).toBeGreaterThan(0)); + await user.click(screen.getAllByLabelText('Berkie')[0]); - // Simulate a jargon clarification message arriving on the jargon filter's direct channel - const jargonMessage = { - id: 'msg-jargon-1', - body: { - type: 'jargon_clarification', - text: 'An SLO is a reliability target.', - sourceText: 'Our SLOs...', - }, - bodyType: 'json', + act(() => { + messageHandler({ + id: 'msg-jargon-1', + body: { type: 'jargon_clarification', text: 'An SLO is a reliability target.', sourceText: 'Our SLOs...' }, + bodyType: 'json', + fromAgent: true, + channels: ['direct-user-123-jargon-agent-456'], + pseudonym: 'Jargon Filter Agent', + pause: false, + visible: true, + upVotes: [], + downVotes: [], + }); + }); + + await waitFor(() => { + expect(screen.getByText('An SLO is a reliability target.')).toBeInTheDocument(); + }); + }); + + it('routes replies to the channel of the parent message, not the primary agent channel', async () => { + const secondaryAgentMessage = { + id: 'jargon-msg-1', + body: 'A clarification from jargon agent', fromAgent: true, channels: ['direct-user-123-jargon-agent-456'], - pseudonym: 'Jargon Filter Agent', - pseudonymId: 'jargon-agent-456', - conversation: 'test-conversation-id', + pseudonym: 'Jargon Filter', + createdAt: '2024-01-01T10:00:00Z', pause: false, visible: true, upVotes: [], downVotes: [], }; - // Switch to assistant tab before receiving the message - await waitFor(() => { - const assistantTabs = screen.queryAllByLabelText(/Berkie|Assistant/i); - expect(assistantTabs.length).toBeGreaterThan(0); + (createConversationFromData as jest.Mock).mockResolvedValue({ + agents: [ + { id: 'agent-123', agentType: 'eventAssistant' }, + { id: 'jargon-agent-456', agentType: 'jargonFilterAgent' }, + ], + type: { name: 'eventAssistant' }, + }); + + (RetrieveData as jest.Mock).mockImplementation((path: string) => { + if (path.startsWith('conversations/')) return Promise.resolve({ agents: [] }); + if (path.includes('jargon-agent-456')) return Promise.resolve([secondaryAgentMessage]); + return Promise.resolve([]); }); - const assistantTab = screen.getAllByLabelText(/Berkie|Assistant/i)[0]; - await userEvent.click(assistantTab); + const user = userEvent.setup(); - act(() => { - messageHandler(jargonMessage); + await act(async () => { + render(); }); - // Verify the jargon clarification content appears inline in the assistant panel + await waitFor(() => expect(screen.getAllByLabelText('Berkie').length).toBeGreaterThan(0)); + await user.click(screen.getAllByLabelText('Berkie')[0]); + await waitFor(() => { - expect(screen.getByText('An SLO is a reliability target.')).toBeInTheDocument(); + expect(screen.getByText('A clarification from jargon agent')).toBeInTheDocument(); }); }); }); @@ -1752,7 +1872,7 @@ describe('EventAssistantRoom', () => { body: 'Welcome to the event!', pseudonym: 'Berkie', fromAgent: true, - channels: [], + channels: ['direct-user-123-agent-123'], createdAt: '2024-01-01T09:59:00Z', conversation: 'test-conversation-id', pause: false, @@ -1972,7 +2092,7 @@ describe('EventAssistantRoom', () => { body: 'Intro message', pseudonym: 'Berkie', fromAgent: true, - channels: [], + channels: ['direct-user-123-agent-123'], createdAt: '2024-01-01T09:59:00Z', }; @@ -2080,7 +2200,7 @@ describe('EventAssistantRoom', () => { body: 'Intro message', pseudonym: 'Berkie', fromAgent: true, - channels: [], + channels: ['direct-user-123-agent-123'], createdAt: '2024-01-01T09:59:00Z', }; diff --git a/pages/assistant.tsx b/pages/assistant.tsx index 42e72ee..a56482e 100644 --- a/pages/assistant.tsx +++ b/pages/assistant.tsx @@ -99,9 +99,10 @@ function EventAssistantRoom({ authType: _authType }: { authType: AuthType }) { // reconnect re-fetches can always prepend them without stale-closure issues. const chatIntroRef = useRef([]); const assistantIntroRef = useRef([]); + const hasJoinedConvRef = useRef(false); const [agentId, setAgentId] = useState(null); const [agentActive, setAgentActive] = useState(true); - const [jargonFilterAgentId, setJargonFilterAgentId] = useState(null); + const [agentIds, setAgentIds] = useState([]); const [conversationFeatures, setConversationFeatures] = useState<{ name: string; enabled?: boolean }[]>([]); const conversationType = useConversationType(); const setConversationType = useSetConversationType(); @@ -220,7 +221,7 @@ function EventAssistantRoom({ authType: _authType }: { authType: AuthType }) { socket.off('message:new', messageHandler); socket.off('resources:updated', resourcesUpdatedHandler); }; - }, [socket, jargonFilterAgentId]); + }, [socket]); // Keep activeTabRef in sync with activeTab state useEffect(() => { @@ -315,12 +316,8 @@ function EventAssistantRoom({ authType: _authType }: { authType: AuthType }) { setAgentActive(false); } - const jargonAgent = conversation.agents.find( - (agent: components['schemas']['Agent']) => agent.agentType === 'jargonFilterAgent', - ); - if (jargonAgent) { - setJargonFilterAgentId(jargonAgent.id!); - } + const ids = conversation.agents.map((agent: components['schemas']['Agent']) => agent.id!); + setAgentIds(ids); } catch (error) { console.error('Error fetching conversation data:', error); setLocalError('Failed to fetch conversation data.'); @@ -342,9 +339,13 @@ function EventAssistantRoom({ authType: _authType }: { authType: AuthType }) { return; } - const agentChannels = agentId - ? buildDirectChannels(userId, [{ agentId }, ...(jargonFilterAgentId ? [{ agentId: jargonFilterAgentId }] : [])]) - : []; + const agentChannels = + agentIds.length > 0 + ? buildDirectChannels( + userId, + agentIds.map((id) => ({ agentId: id })), + ) + : []; const channels: components['schemas']['Channel'][] = [...agentChannels]; if (chatPasscode) { @@ -361,6 +362,8 @@ function EventAssistantRoom({ authType: _authType }: { authType: AuthType }) { } const joinConversation = () => { + if (hasJoinedConvRef.current) return; + hasJoinedConvRef.current = true; console.log('Joining conversation'); // Always read the current token so re-joins after a refresh use the // new token rather than the one captured at socket-creation time. @@ -378,8 +381,10 @@ function EventAssistantRoom({ authType: _authType }: { authType: AuthType }) { const intros: PseudonymousMessage[] = response?.intros ?? []; chatIntroRef.current = intros.filter((m) => Array.isArray(m.channels) && m.channels.includes('chat')); - // Any intro not in chat belongs to the direct (assistant) channel. - assistantIntroRef.current = intros.filter((m) => !Array.isArray(m.channels) || !m.channels.includes('chat')); + // Only intros on the eventAssistant's direct channel. + assistantIntroRef.current = intros.filter( + (m) => Array.isArray(m.channels) && m.channels.some((c) => c.includes(`-${agentId}`)), + ); // Only push intros into state on the very first join. Re-joins // must not add them again — the initial fetch effects will prepend @@ -399,48 +404,43 @@ function EventAssistantRoom({ authType: _authType }: { authType: AuthType }) { }, ); }; - - // Initial join - joinConversation(); - // Re-join on every subsequent reconnection (e.g. after token refresh) - socket.on('connect', joinConversation); + const onConnect = () => { + hasJoinedConvRef.current = false; + joinConversation(); + }; + socket.on('connect', onConnect); + + // Only join immediately if the socket is already connected — otherwise + // let the connect event fire the first join to avoid a duplicate join + // when the socket is mid-handshake or reconnecting after a token refresh. + if (socket.connected) { + joinConversation(); + } return () => { - socket.off('connect', joinConversation); + socket.off('connect', onConnect); }; - }, [socket, agentId, agentActive, jargonFilterAgentId, userId, chatPasscode, router.query.conversationId]); + }, [socket, agentId, agentActive, agentIds, userId, chatPasscode, router.query.conversationId]); /** - * Helper function to fetch all assistant messages (direct + jargon filter) + * Helper function to fetch all assistant messages across all agent direct channels * and their threaded replies */ const fetchAllAssistantMessages = useCallback(async (): Promise => { if (!userId || !agentId || !router.query.conversationId) return; try { - const directChannelName = `direct-${userId}-${agentId}`; - const assistantMessages = await RetrieveData( - `messages/${router.query.conversationId}?channel=${directChannelName}`, - Api.get().getAccessToken(), + const allFetched = await Promise.all( + agentIds.map((id) => + RetrieveData(`messages/${router.query.conversationId}?channel=direct-${userId}-${id}`, Api.get().getAccessToken()), + ), ); // Filter intro messages from older conversations where intros were persisted - let allMessages = (Array.isArray(assistantMessages) ? assistantMessages : []).filter( - (m) => parseMessageBody(m.body)?.type !== 'intro', - ); - - // Fetch jargon filter agent's messages, if enabled - if (jargonFilterAgentId) { - const jargonChannelName = `direct-${userId}-${jargonFilterAgentId}`; - const jargonMessages = await RetrieveData( - `messages/${router.query.conversationId}?channel=${jargonChannelName}`, - Api.get().getAccessToken(), - ); - if (Array.isArray(jargonMessages)) { - allMessages = [...allMessages, ...jargonMessages]; - } - } + let allMessages = allFetched + .flatMap((result) => (Array.isArray(result) ? result : [])) + .filter((m) => parseMessageBody(m.body)?.type !== 'intro'); // Sort all messages by creation time allMessages.sort((a, b) => new Date(a.createdAt!).getTime() - new Date(b.createdAt!).getTime()); @@ -452,7 +452,7 @@ function EventAssistantRoom({ authType: _authType }: { authType: AuthType }) { } catch (error) { console.error('Error fetching assistant messages:', error); } - }, [userId, agentId, jargonFilterAgentId, router.query.conversationId]); + }, [userId, agentId, agentIds, router.query.conversationId]); /** * Helper function to track reply count changes and identify unread messages @@ -617,12 +617,13 @@ function EventAssistantRoom({ authType: _authType }: { authType: AuthType }) { let channels = effectiveTab === 'chat' ? [{ name: 'chat', passcode: chatPasscode }] : [{ name: `direct-${userId}-${agentId}` }]; - // If replying to a message in assistant tab, check if parent came from jargon filter agent - // and use that channel instead - if (effectiveTab !== 'chat' && parentMessageId && jargonFilterAgentId) { + // If replying to a message in assistant tab, route to whichever direct channel + // the parent message was on (supports any agent, not just the primary one) + if (effectiveTab !== 'chat' && parentMessageId) { const parentMessage = assistantMessages.find((m) => m.id === parentMessageId); - if (parentMessage?.channels?.some((c) => c.includes(jargonFilterAgentId))) { - channels = [{ name: `direct-${userId}-${jargonFilterAgentId}` }]; + const parentDirectChannel = parentMessage?.channels?.find((c) => c.startsWith('direct-') && !c.includes(agentId!)); + if (parentDirectChannel) { + channels = [{ name: parentDirectChannel }]; } } diff --git a/utils/useSessionJoin.ts b/utils/useSessionJoin.ts index ccea8ef..771b989 100644 --- a/utils/useSessionJoin.ts +++ b/utils/useSessionJoin.ts @@ -174,6 +174,13 @@ export function useSessionJoin( socket.on('disconnect', handleDisconnect); socket.on('connect_error', handleConnectError); + // If the socket is already connected when this effect runs (e.g. StrictMode + // remount or token-refresh reconnect already completed), sync state immediately + // since the connect event won't fire again for the current connection. + if (socket.connected) { + handleConnect(); + } + return () => { socket.off('error', handleError); socket.off('connect', handleConnect);