diff --git a/__tests__/hooks/useWaveIsTyping.test.tsx b/__tests__/hooks/useWaveIsTyping.test.tsx index c567e09fe4..ba9ab749e8 100644 --- a/__tests__/hooks/useWaveIsTyping.test.tsx +++ b/__tests__/hooks/useWaveIsTyping.test.tsx @@ -1,109 +1,28 @@ -import { act, renderHook, waitFor } from '@testing-library/react'; +import { renderHook, act } from '@testing-library/react'; import { useWaveIsTyping } from '@/hooks/useWaveIsTyping'; import { WsMessageType } from '@/helpers/Types'; -import { useWebSocket } from '@/services/websocket/useWebSocket'; -import { useWebSocketMessage } from '@/services/websocket/useWebSocketMessage'; -import { WebSocketStatus } from '@/services/websocket/WebSocketTypes'; -jest.mock('@/services/websocket/useWebSocket', () => ({ - useWebSocket: jest.fn(), -})); +const listeners: any[] = []; -jest.mock('@/services/websocket/useWebSocketMessage', () => ({ - useWebSocketMessage: jest.fn(), +jest.mock('@/hooks/useWaveWebSocket', () => ({ + useWaveWebSocket: () => ({ + socket: { + addEventListener: (_: string, cb: any) => listeners.push(cb), + removeEventListener: jest.fn(), + }, + }), })); -const sendMock = jest.fn(); - -const mockedUseWebSocket = useWebSocket as jest.MockedFunction; -const mockedUseWebSocketMessage = useWebSocketMessage as jest.MockedFunction; - -type MessageHandlers = Partial void>>>; - -let messageHandlers: MessageHandlers; - -const emitMessage = (type: WsMessageType, payload: any) => { - messageHandlers[type]?.forEach((handler) => handler(payload)); -}; - -beforeEach(() => { +test('reports typing status and clears after timeout', () => { jest.useFakeTimers(); - sendMock.mockReset(); - mockedUseWebSocket.mockReset(); - mockedUseWebSocketMessage.mockReset(); - - messageHandlers = {}; - - mockedUseWebSocket.mockReturnValue({ - send: sendMock, - status: WebSocketStatus.CONNECTED, - } as any); - - mockedUseWebSocketMessage.mockImplementation((type, callback) => { - if (!messageHandlers[type]) { - messageHandlers[type] = []; - } - messageHandlers[type]!.push(callback); - return { isConnected: true }; - }); -}); - -afterEach(() => { - jest.clearAllTimers(); - jest.useRealTimers(); - jest.clearAllMocks(); -}); - -test('reports typing status, handles drop updates, and clears after timeout', async () => { - const { result, unmount } = renderHook(() => useWaveIsTyping('wave', null)); - - await waitFor(() => - expect(sendMock).toHaveBeenCalledWith(WsMessageType.SUBSCRIBE_TO_WAVE, { - subscribe: true, - wave_id: 'wave', - }), - ); - - act(() => { - emitMessage(WsMessageType.USER_IS_TYPING, { - wave_id: 'wave', - profile: { handle: 'A', level: 1 }, - timestamp: Date.now(), - }); - }); + const { result } = renderHook(() => useWaveIsTyping('wave', null)); act(() => { - jest.advanceTimersByTime(1000); + listeners[0]({ data: JSON.stringify({ type: WsMessageType.USER_IS_TYPING, data: { wave_id: 'wave', profile: { handle: 'A', level: 1 } } }) }); }); + act(() => jest.advanceTimersByTime(1000)); expect(result.current).toContain('A is typing'); - act(() => { - emitMessage(WsMessageType.DROP_UPDATE, { - wave: { id: 'wave' }, - author: { handle: 'A', level: 1 }, - }); - }); - expect(result.current).toBe(''); - - act(() => { - emitMessage(WsMessageType.USER_IS_TYPING, { - wave_id: 'wave', - profile: { handle: 'A', level: 1 }, - timestamp: Date.now(), - }); - }); - - act(() => { - jest.advanceTimersByTime(6000); - }); + act(() => jest.advanceTimersByTime(6000)); expect(result.current).toBe(''); - - act(() => { - unmount(); - }); - - expect(sendMock).toHaveBeenLastCalledWith(WsMessageType.SUBSCRIBE_TO_WAVE, { - subscribe: false, - wave_id: 'wave', - }); }); diff --git a/__tests__/hooks/useWaveWebSocket.test.ts b/__tests__/hooks/useWaveWebSocket.test.ts new file mode 100644 index 0000000000..b3484f1d23 --- /dev/null +++ b/__tests__/hooks/useWaveWebSocket.test.ts @@ -0,0 +1,75 @@ +import { act, renderHook, waitFor } from "@testing-library/react"; +import { useWaveWebSocket } from "@/hooks/useWaveWebSocket"; + +class MockWebSocket { + static CONNECTING = 0; + static OPEN = 1; + static CLOSING = 2; + static CLOSED = 3; + readyState = MockWebSocket.CONNECTING; + onopen = null as ((ev?: any) => any) | null; + onclose = null as ((ev?: any) => any) | null; + onerror = null as ((ev?: any) => any) | null; + send = jest.fn(); + close = jest.fn(() => { + this.readyState = MockWebSocket.CLOSED; + this.onclose && this.onclose({}); + }); + triggerOpen() { + this.readyState = MockWebSocket.OPEN; + this.onopen && this.onopen({}); + } + constructor(public url: string) {} +} + +describe("useWaveWebSocket", () => { + let originalWs: any; + beforeEach(() => { + originalWs = global.WebSocket; + (global as any).WebSocket = jest.fn( + (url: string) => new MockWebSocket(url) + ); + jest.useFakeTimers(); + }); + afterEach(() => { + jest.useRealTimers(); + (global as any).WebSocket = originalWs; + jest.clearAllMocks(); + }); + + it("connects and sends subscribe message", async () => { + const { result } = renderHook(() => useWaveWebSocket("wave1")); + const instance = (globalThis.WebSocket as jest.Mock).mock.results[0] + .value as MockWebSocket; + act(() => { + instance.triggerOpen(); + }); + await waitFor(() => + expect(result.current.readyState).toBe(MockWebSocket.OPEN) + ); + expect(instance.send).toHaveBeenCalledWith( + JSON.stringify({ type: "SUBSCRIBE_TO_WAVE", wave_id: "wave1" }) + ); + }); + + it("schedules reconnect on close", () => { + const spy = jest.spyOn(globalThis, "setTimeout"); + renderHook(() => useWaveWebSocket("wave1")); + const instance = (globalThis.WebSocket as jest.Mock).mock.results[0] + .value as MockWebSocket; + act(() => { + instance.onclose && instance.onclose({}); + }); + expect(spy).toHaveBeenCalled(); + }); + + it("disconnect stops reconnecting", () => { + const { result } = renderHook(() => useWaveWebSocket("wave1")); + const instance = (globalThis.WebSocket as jest.Mock).mock.results[0] + .value as MockWebSocket; + act(() => { + result.current.disconnect(); + }); + expect(instance.close).toHaveBeenCalled(); + }); +}); diff --git a/__tests__/services/websocket/WebSocketProvider.test.tsx b/__tests__/services/websocket/WebSocketProvider.test.tsx index c689dca2af..46c4c11a1f 100644 --- a/__tests__/services/websocket/WebSocketProvider.test.tsx +++ b/__tests__/services/websocket/WebSocketProvider.test.tsx @@ -64,7 +64,6 @@ class MockWebSocket { describe('WebSocketProvider', () => { let originalWs: any; let mockGetAuthJwt: jest.MockedFunction; - let mathRandomSpy: jest.SpyInstance; beforeEach(() => { originalWs = global.WebSocket; @@ -77,13 +76,11 @@ describe('WebSocketProvider', () => { mockGetAuthJwt = authUtils.getAuthJwt as jest.MockedFunction; mockGetAuthJwt.mockReturnValue('fresh-token'); - mathRandomSpy = jest.spyOn(Math, 'random').mockReturnValue(0.5); jest.clearAllMocks(); }); afterEach(() => { (global as any).WebSocket = originalWs; - mathRandomSpy.mockRestore(); jest.useRealTimers(); }); diff --git a/hooks/useWaveIsTyping.ts b/hooks/useWaveIsTyping.ts index 87d08e3ef8..fe8a40c3ee 100644 --- a/hooks/useWaveIsTyping.ts +++ b/hooks/useWaveIsTyping.ts @@ -1,27 +1,37 @@ "use client"; -import { useCallback, useEffect, useRef, useState } from "react"; -import { ApiProfileMin } from "@/generated/models/ApiProfileMin"; +import { useEffect, useRef, useState } from "react"; +import { useWaveWebSocket } from "./useWaveWebSocket"; import { WsDropUpdateMessage, WsMessageType, WsTypingMessage, } from "@/helpers/Types"; -import { useWebSocket } from "@/services/websocket/useWebSocket"; -import { useWebSocketMessage } from "@/services/websocket/useWebSocketMessage"; -import { WebSocketStatus } from "@/services/websocket/WebSocketTypes"; +import { ApiProfileMin } from "@/generated/models/ApiProfileMin"; +/* ------------------------------------------------------------------ */ +/* Types */ +/* ------------------------------------------------------------------ */ interface TypingEntry { profile: ApiProfileMin; - lastTypingAt: number; + lastTypingAt: number; // our local receive time (ms) } -const TYPING_WINDOW_MS = 5000; -const CLEANUP_INTERVAL_MS = 1000; +/* ------------------------------------------------------------------ */ +/* Constants */ +/* ------------------------------------------------------------------ */ + +const TYPING_WINDOW_MS = 5_000; // still typing if ≤ 5 s old +const CLEANUP_INTERVAL_MS = 1_000; // prune/check once per second + +/* ------------------------------------------------------------------ */ +/* Helper to convert active typers → human string */ +/* ------------------------------------------------------------------ */ function buildTypingString(entries: TypingEntry[]): string { if (entries.length === 0) return ""; + // Highest‑level first (undefined → 0) const sorted = entries.sort( (a, b) => (b.profile.level ?? 0) - (a.profile.level ?? 0) ); @@ -34,106 +44,92 @@ function buildTypingString(entries: TypingEntry[]): string { if (names.length === 2) { return `${names[0]}, ${names[1]} are typing`; } - return `${names[0]}, ${names[1]} and ${names.length - 2} more people are typing`; + return `${names[0]}, ${names[1]} and ${ + names.length - 2 + } more people are typing`; } +/* ------------------------------------------------------------------ */ +/* Hook */ +/* ------------------------------------------------------------------ */ + +/** + * React hook that returns a live “is‑typing” label for a wave. + * + * @param waveId Wave/channel ID being viewed. + * @param myHandle Handle of current user (events from this handle are ignored). + */ export function useWaveIsTyping( waveId: string, myHandle: string | null ): string { - const { send, status } = useWebSocket(); + const { socket } = useWaveWebSocket(waveId); + /** Only the final string lives in state; everything else is in a ref. */ const [typingMessage, setTypingMessage] = useState(""); + /** Mutable store of active typers — doesn’t cause re‑renders. */ const typersRef = useRef>(new Map()); - const updateTypingString = useCallback(() => { - const entries = Array.from(typersRef.current.values()); - const newMessage = buildTypingString(entries); - - setTypingMessage((prev) => (prev === newMessage ? prev : newMessage)); - }, []); - + /* ----- 1. Reset when wave changes -------------------------------- */ useEffect(() => { typersRef.current.clear(); - updateTypingString(); - }, [waveId, updateTypingString]); + setTypingMessage(""); + }, [waveId]); + /* ----- 2. Handle incoming USER_IS_TYPING packets ----------------- */ useEffect(() => { - if (status !== WebSocketStatus.CONNECTED) { - return; - } - - send(WsMessageType.SUBSCRIBE_TO_WAVE, { - subscribe: true, - wave_id: waveId, - }); - - return () => { - send(WsMessageType.SUBSCRIBE_TO_WAVE, { - subscribe: false, - wave_id: waveId, + if (!socket) return; + + const onMessage = (event: MessageEvent) => { + let msg: WsTypingMessage | WsDropUpdateMessage; + try { + msg = JSON.parse(event.data); + } catch (err) { + console.error("Bad WebSocket JSON", err); + return; + } + if (msg.type === WsMessageType.DROP_UPDATE) { + typersRef.current.delete(msg.data?.author.handle ?? ""); + } + if (msg.type !== WsMessageType.USER_IS_TYPING) return; + const data = msg.data; + if (!data || data.wave_id !== waveId) return; + if (data.profile?.handle === myHandle) return; // ignore myself + if (!data.profile?.handle) return; + // Use local clock for freshness (avoids clock‑skew issues) + typersRef.current.set(data.profile.handle, { + profile: data.profile, + lastTypingAt: Date.now(), }); }; - }, [send, status, waveId]); - - useWebSocketMessage( - WsMessageType.USER_IS_TYPING, - useCallback( - (data) => { - if (!data || data.wave_id !== waveId) return; - if (data.profile?.handle === myHandle) return; - - const handle = data.profile?.handle; - if (!handle) return; - - typersRef.current.set(handle, { - profile: data.profile, - lastTypingAt: Date.now(), - }); - updateTypingString(); - }, - [myHandle, waveId, updateTypingString] - ) - ); - - useWebSocketMessage( - WsMessageType.DROP_UPDATE, - useCallback( - (drop) => { - if (drop?.author?.handle) { - const sameWave = drop.wave?.id === waveId; - if (!sameWave) return; - if (typersRef.current.delete(drop.author.handle)) { - updateTypingString(); - } - } - }, - [waveId, updateTypingString] - ) - ); - useEffect(() => { - if (status !== WebSocketStatus.CONNECTED) { - typersRef.current.clear(); - updateTypingString(); - } - }, [status, updateTypingString]); + socket.addEventListener("message", onMessage); + return () => socket.removeEventListener("message", onMessage); + }, [socket, waveId, myHandle]); + /* ----- 3. Periodic cleanup + state update ------------------------ */ useEffect(() => { const intervalId = setInterval(() => { const now = Date.now(); + // Prune stale typers typersRef.current.forEach((entry, handle) => { if (now - entry.lastTypingAt > TYPING_WINDOW_MS) { typersRef.current.delete(handle); } }); - updateTypingString(); + // Derive the new string + const newMessage = buildTypingString( + Array.from(typersRef.current.values()) + ); + + // Only trigger re‑render if text actually changed + setTypingMessage((prev) => (prev === newMessage ? prev : newMessage)); }, CLEANUP_INTERVAL_MS); return () => clearInterval(intervalId); - }, [updateTypingString]); + }, []); // stable for entire lifespan return typingMessage; } diff --git a/hooks/useWaveWebSocket.ts b/hooks/useWaveWebSocket.ts new file mode 100644 index 0000000000..33cb913423 --- /dev/null +++ b/hooks/useWaveWebSocket.ts @@ -0,0 +1,111 @@ +"use client"; + +import { publicEnv } from "@/config/env"; +import { useEffect, useRef, useState } from "react"; +import { WsMessageType } from "@/helpers/Types"; + +interface UseWaveWebSocketResult { + socket: WebSocket | null; + readyState: number; + /** + * Manually disconnects the WebSocket and prevents further reconnect attempts. + */ + disconnect: () => void; +} + +const RECONNECT_DELAY = 2000; +const MAX_RECONNECT_ATTEMPTS = 20; + +/** + * Custom hook to connect to a WebSocket for a given waveId. + * Automatically reconnects on disconnect, up to MAX_RECONNECT_ATTEMPTS, + * with a delay of RECONNECT_DELAY ms between attempts. + * Sends a "hello world" message upon successful connection. + */ +export function useWaveWebSocket(waveId: string): UseWaveWebSocketResult { + const socketRef = useRef(null); + const [readyState, setReadyState] = useState(WebSocket.CLOSED); + const reconnectAttemptsRef = useRef(0); + const reconnectTimeoutRef = useRef(null); + // flag controlling whether reconnection should be attempted + const shouldReconnectRef = useRef(true); + + useEffect(() => { + // allow reconnection again on (re-)mount or waveId change + shouldReconnectRef.current = true; + // determine base URL from environment + const url = + publicEnv.WS_ENDPOINT ?? + publicEnv.API_ENDPOINT?.replace("https://api", "wss://ws") ?? + "wss://default-fallback-url"; + + function connect() { + const ws = new WebSocket(url); + socketRef.current = ws; + setReadyState(ws.readyState); + + ws.onopen = () => { + setReadyState(ws.readyState); + reconnectAttemptsRef.current = 0; + ws.send( + JSON.stringify({ + type: WsMessageType.SUBSCRIBE_TO_WAVE, + wave_id: waveId, + }) + ); + }; + + ws.onclose = () => { + setReadyState(WebSocket.CLOSED); + // only reconnect if allowed + if ( + shouldReconnectRef.current && + reconnectAttemptsRef.current < MAX_RECONNECT_ATTEMPTS + ) { + reconnectAttemptsRef.current += 1; + reconnectTimeoutRef.current = window.setTimeout( + connect, + RECONNECT_DELAY + ); + } + }; + + ws.onerror = (error: Event) => { + console.error("WebSocket error:", error); + ws.close(); + }; + } + + connect(); + + return () => { + // disable future reconnects on cleanup + shouldReconnectRef.current = false; + if (reconnectTimeoutRef.current != null) { + clearTimeout(reconnectTimeoutRef.current); + } + if (socketRef.current) { + socketRef.current.close(); + } + }; + }, [waveId]); + + // manual disconnect function + const disconnect = () => { + // disable future reconnects + shouldReconnectRef.current = false; + // stop any pending reconnect + if (reconnectTimeoutRef.current !== null) { + clearTimeout(reconnectTimeoutRef.current); + reconnectTimeoutRef.current = null; + } + // close existing socket + if (socketRef.current) { + socketRef.current.close(); + socketRef.current = null; + } + setReadyState(WebSocket.CLOSED); + }; + + return { socket: socketRef.current, readyState, disconnect }; +} diff --git a/services/websocket/useWaveSubscriptionManager.ts b/services/websocket/useWaveSubscriptionManager.ts new file mode 100644 index 0000000000..67fb61b90f --- /dev/null +++ b/services/websocket/useWaveSubscriptionManager.ts @@ -0,0 +1,142 @@ +"use client"; + +import { useCallback, useEffect } from "react"; +import { WsMessageType } from "@/helpers/Types"; +import { useWebSocket } from "./useWebSocket"; +import { WebSocketStatus } from "./WebSocketTypes"; + +type SubscriptionRecord = { + count: number; + subscribed: boolean; +}; + +const subscriptionRegistry: Map = new Map(); + +function setRecord(waveId: string, updater: (record: SubscriptionRecord | undefined) => SubscriptionRecord | undefined) { + const next = updater(subscriptionRegistry.get(waveId)); + if (!next) { + subscriptionRegistry.delete(waveId); + return; + } + subscriptionRegistry.set(waveId, next); +} + +function ensurePositiveCount(count: number): number { + return count < 0 ? 0 : count; +} + +export function useWaveSubscriptionManager() { + const { send, status } = useWebSocket(); + + const debugLog = useCallback((message: string, waveId: string, extra?: Record) => { + // eslint-disable-next-line no-console -- intentional diagnostic logging + console.debug(`[WaveSubscription] ${message}`, { + waveId, + ...extra, + }); + }, []); + + const subscribeToWave = useCallback( + (waveId: string | null | undefined) => { + if (!waveId) return; + setRecord(waveId, (current) => { + const nextCount = ensurePositiveCount((current?.count ?? 0) + 1); + const alreadySubscribed = current?.subscribed ?? false; + const shouldSend = + !alreadySubscribed && status === WebSocketStatus.CONNECTED; + + if (shouldSend) { + send(WsMessageType.SUBSCRIBE_TO_WAVE, { + subscribe: true, + wave_id: waveId, + }); + debugLog("subscribe request sent", waveId, { + count: nextCount, + }); + } + + debugLog("subscription count updated", waveId, { + count: nextCount, + subscribed: alreadySubscribed || shouldSend, + }); + + return { + count: nextCount, + subscribed: alreadySubscribed || shouldSend, + }; + }); + }, + [debugLog, send, status] + ); + + const unsubscribeFromWave = useCallback( + (waveId: string | null | undefined) => { + if (!waveId) return; + setRecord(waveId, (current) => { + if (!current) return undefined; + const nextCount = ensurePositiveCount(current.count - 1); + + if (nextCount === 0) { + if (current.subscribed && status === WebSocketStatus.CONNECTED) { + send(WsMessageType.SUBSCRIBE_TO_WAVE, { + subscribe: false, + wave_id: waveId, + }); + debugLog("unsubscribe request sent", waveId, { count: 0 }); + } + return undefined; + } + + debugLog("subscription count updated", waveId, { + count: nextCount, + subscribed: current.subscribed, + }); + + return { + count: nextCount, + subscribed: current.subscribed, + }; + }); + }, + [debugLog, send, status] + ); + + useEffect(() => { + if (status === WebSocketStatus.CONNECTED) { + subscriptionRegistry.forEach((record, waveId) => { + if (record.count > 0 && !record.subscribed) { + send(WsMessageType.SUBSCRIBE_TO_WAVE, { + subscribe: true, + wave_id: waveId, + }); + debugLog("resubscribe on reconnect", waveId, { + count: record.count, + }); + subscriptionRegistry.set(waveId, { + count: record.count, + subscribed: true, + }); + } + }); + return; + } + + subscriptionRegistry.forEach((record, waveId) => { + if (record.subscribed) { + subscriptionRegistry.set(waveId, { + count: record.count, + subscribed: false, + }); + debugLog("mark unsubscribed (socket non-connected)", waveId, { + count: record.count, + status, + }); + } + }); + }, [debugLog, send, status]); + + return { + subscribeToWave, + unsubscribeFromWave, + }; +}