From 7052052742d5d95ced888182082d78bc51debf93 Mon Sep 17 00:00:00 2001 From: gnezim Date: Wed, 15 Apr 2026 00:41:28 +0300 Subject: [PATCH] Add SignalRConnection ref-counted wrapper with tests Reference-counted connection management with grace period, dynamic import to keep @microsoft/signalr out of SSR bundle, and singleton sharing via getSharedConnection. --- src/shared/signalr/connection.test.ts | 253 ++++++++++++++++++++++++++ src/shared/signalr/connection.ts | 206 +++++++++++++++++++++ 2 files changed, 459 insertions(+) create mode 100644 src/shared/signalr/connection.test.ts create mode 100644 src/shared/signalr/connection.ts diff --git a/src/shared/signalr/connection.test.ts b/src/shared/signalr/connection.test.ts new file mode 100644 index 00000000..8bb1a200 --- /dev/null +++ b/src/shared/signalr/connection.test.ts @@ -0,0 +1,253 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import { + SignalRConnection, + getSharedConnection, + _resetSharedConnections, + type ConnectionStatus, +} from "./connection.js"; + +// ---- helpers ---- + +function createMockHub() { + const handlers: Record void)[]> = {}; + let onCloseCallback: ((error?: Error) => void) | null = null; + let onReconnectingCallback: ((error?: Error) => void) | null = null; + let onReconnectedCallback: ((connectionId?: string) => void) | null = null; + + const hub = { + start: vi.fn().mockResolvedValue(undefined), + stop: vi.fn().mockResolvedValue(undefined), + on: vi.fn((method: string, handler: (...args: unknown[]) => void) => { + if (!handlers[method]) handlers[method] = []; + handlers[method]!.push(handler); + }), + off: vi.fn((method: string, handler: (...args: unknown[]) => void) => { + const list = handlers[method]; + if (list) { + const idx = list.indexOf(handler); + if (idx >= 0) list.splice(idx, 1); + } + }), + onclose: vi.fn((cb: (error?: Error) => void) => { + onCloseCallback = cb; + }), + onreconnecting: vi.fn((cb: (error?: Error) => void) => { + onReconnectingCallback = cb; + }), + onreconnected: vi.fn((cb: (connectionId?: string) => void) => { + onReconnectedCallback = cb; + }), + // Test helpers to simulate events + _simulateMessage(channel: string, ...args: unknown[]) { + for (const h of handlers[channel] ?? []) h(...args); + }, + _simulateClose(error?: Error) { + onCloseCallback?.(error); + }, + _simulateReconnecting(error?: Error) { + onReconnectingCallback?.(error); + }, + _simulateReconnected(connectionId?: string) { + onReconnectedCallback?.(connectionId); + }, + }; + + return hub; +} + +function createConnection( + hub: ReturnType, + gracePeriodMs = 5000, +): SignalRConnection { + const conn = new SignalRConnection({ + hubUrl: "https://hub.test/flights", + gracePeriodMs, + }); + conn._buildConnection = vi.fn().mockResolvedValue(hub); + return conn; +} + +// ---- tests ---- + +describe("SignalRConnection", () => { + beforeEach(() => { + vi.useFakeTimers(); + _resetSharedConnections(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("two rapid subscribes produce exactly one start() call", async () => { + const hub = createMockHub(); + const conn = createConnection(hub); + + const handler1 = vi.fn(); + const handler2 = vi.fn(); + + conn.subscribe("flights", handler1); + conn.subscribe("flights", handler2); + + // Let the async build + start resolve + await vi.runAllTimersAsync(); + + expect(hub.start).toHaveBeenCalledTimes(1); + }); + + it("unmount + remount within grace period reuses connection", async () => { + const hub = createMockHub(); + const conn = createConnection(hub, 5000); + + const handler1 = vi.fn(); + const unsub1 = conn.subscribe("flights", handler1); + await vi.runAllTimersAsync(); + + expect(hub.start).toHaveBeenCalledTimes(1); + + // Unsubscribe (starts grace timer) + unsub1(); + + // Advance time partially (within grace period) + vi.advanceTimersByTime(2000); + + // Remount -- subscribe again + const handler2 = vi.fn(); + conn.subscribe("flights", handler2); + await vi.runAllTimersAsync(); + + // Still only one start() -- connection was reused + expect(hub.start).toHaveBeenCalledTimes(1); + expect(hub.stop).not.toHaveBeenCalled(); + }); + + it("unmount + remount after grace period creates fresh connection", async () => { + const hub1 = createMockHub(); + const hub2 = createMockHub(); + let buildCallCount = 0; + + const conn = new SignalRConnection({ + hubUrl: "https://hub.test/flights", + gracePeriodMs: 100, + }); + conn._buildConnection = vi.fn().mockImplementation(async () => { + buildCallCount++; + return buildCallCount === 1 ? hub1 : hub2; + }); + + const handler1 = vi.fn(); + const unsub1 = conn.subscribe("flights", handler1); + await vi.runAllTimersAsync(); + + expect(hub1.start).toHaveBeenCalledTimes(1); + + // Unsubscribe + unsub1(); + + // Advance past grace period (100ms) so connection stops + await vi.advanceTimersByTimeAsync(200); + + expect(hub1.stop).toHaveBeenCalledTimes(1); + + // Remount -- subscribe again, should create fresh connection + const handler2 = vi.fn(); + conn.subscribe("flights", handler2); + await vi.runAllTimersAsync(); + + expect(hub2.start).toHaveBeenCalledTimes(1); + expect(buildCallCount).toBe(2); + }); + + it("status transitions fire onStatusChange handlers", async () => { + const hub = createMockHub(); + const conn = createConnection(hub); + + const statuses: ConnectionStatus[] = []; + conn.onStatusChange((s) => statuses.push(s)); + + expect(conn.status).toBe("idle"); + + conn.subscribe("flights", vi.fn()); + await vi.runAllTimersAsync(); + + expect(statuses).toContain("connecting"); + expect(statuses).toContain("live"); + expect(conn.status).toBe("live"); + + // Simulate reconnecting + hub._simulateReconnecting(); + expect(conn.status).toBe("reconnecting"); + expect(statuses).toContain("reconnecting"); + + // Simulate reconnected + hub._simulateReconnected("abc"); + expect(conn.status).toBe("live"); + + // Simulate close + hub._simulateClose(); + expect(conn.status).toBe("offline"); + expect(statuses).toContain("offline"); + }); + + it("onStatusChange unsubscribe stops notifications", async () => { + const hub = createMockHub(); + const conn = createConnection(hub); + + const statuses: ConnectionStatus[] = []; + const unsub = conn.onStatusChange((s) => statuses.push(s)); + + conn.subscribe("flights", vi.fn()); + await vi.runAllTimersAsync(); + + unsub(); + hub._simulateClose(); + + // Should not have "offline" since we unsubscribed + expect(statuses).not.toContain("offline"); + }); + + it("unsubscribe is idempotent", async () => { + const hub = createMockHub(); + const conn = createConnection(hub, 100); + + const unsub = conn.subscribe("flights", vi.fn()); + await vi.runAllTimersAsync(); + + unsub(); + unsub(); // Second call should be a no-op + + // Grace timer should only be started once + await vi.advanceTimersByTimeAsync(200); + expect(hub.stop).toHaveBeenCalledTimes(1); + }); + + it("destroy tears down immediately", async () => { + const hub = createMockHub(); + const conn = createConnection(hub); + + conn.subscribe("flights", vi.fn()); + await vi.runAllTimersAsync(); + + await conn.destroy(); + expect(hub.stop).toHaveBeenCalledTimes(1); + expect(conn.status).toBe("idle"); + }); +}); + +describe("getSharedConnection", () => { + beforeEach(() => { + _resetSharedConnections(); + }); + + it("returns same instance for same hubUrl", () => { + const a = getSharedConnection({ hubUrl: "https://hub.test/a" }); + const b = getSharedConnection({ hubUrl: "https://hub.test/a" }); + expect(a).toBe(b); + }); + + it("returns different instances for different hubUrls", () => { + const a = getSharedConnection({ hubUrl: "https://hub.test/a" }); + const b = getSharedConnection({ hubUrl: "https://hub.test/b" }); + expect(a).not.toBe(b); + }); +}); diff --git a/src/shared/signalr/connection.ts b/src/shared/signalr/connection.ts new file mode 100644 index 00000000..8bc5624a --- /dev/null +++ b/src/shared/signalr/connection.ts @@ -0,0 +1,206 @@ +/** + * Reference-counted SignalR connection wrapper. + * + * Uses dynamic import("@microsoft/signalr") so the library never enters + * the SSR bundle. Connections are lazily started on first subscriber and + * closed gracefully after the last unsubscriber + a configurable grace period. + */ + +export interface HubOptions { + hubUrl: string; + reconnectDelaysMs?: number[]; + gracePeriodMs?: number; +} + +export type ConnectionStatus = + | "idle" + | "connecting" + | "live" + | "reconnecting" + | "offline"; + +type StatusHandler = (status: ConnectionStatus) => void; +type MessageHandler = (message: unknown) => void; + +/** Minimal subset of HubConnection we actually use -- aids testability. */ +interface HubConnectionLike { + start(): Promise; + stop(): Promise; + on(methodName: string, handler: (...args: unknown[]) => void): void; + off(methodName: string, handler: (...args: unknown[]) => void): void; + onclose(callback: (error?: Error) => void): void; + onreconnecting(callback: (error?: Error) => void): void; + onreconnected(callback: (connectionId?: string) => void): void; +} + +export class SignalRConnection { + private readonly hubUrl: string; + private readonly reconnectDelaysMs: number[]; + private readonly gracePeriodMs: number; + + private connection: HubConnectionLike | null = null; + private buildPromise: Promise | null = null; + private _status: ConnectionStatus = "idle"; + private refCount = 0; + private graceTimer: ReturnType | null = null; + + private readonly statusHandlers = new Set(); + private readonly channelHandlers = new Map>(); + + /** Overridable for testing -- avoids importing @microsoft/signalr at module level. */ + _buildConnection: ((url: string, delays: number[]) => Promise) | null = null; + + constructor(options: HubOptions) { + this.hubUrl = options.hubUrl; + this.reconnectDelaysMs = options.reconnectDelaysMs ?? [0, 2000, 10000, 30000]; + this.gracePeriodMs = options.gracePeriodMs ?? 5000; + } + + get status(): ConnectionStatus { + return this._status; + } + + /** + * Subscribe to messages on a channel. Returns an unsubscribe function. + * The underlying connection starts lazily on the first subscriber. + */ + subscribe(channel: string, handler: MessageHandler): () => void { + let handlers = this.channelHandlers.get(channel); + if (!handlers) { + handlers = new Set(); + this.channelHandlers.set(channel, handlers); + } + handlers.add(handler); + + this.refCount++; + this.cancelGraceTimer(); + void this.ensureConnected(channel, handler); + + let removed = false; + return () => { + if (removed) return; + removed = true; + handlers!.delete(handler); + if (handlers!.size === 0) { + this.channelHandlers.delete(channel); + this.connection?.off(channel, handler as (...args: unknown[]) => void); + } + this.refCount--; + if (this.refCount <= 0) { + this.refCount = 0; + this.startGraceTimer(); + } + }; + } + + /** Listen for connection status changes. Returns an unsubscribe function. */ + onStatusChange(handler: StatusHandler): () => void { + this.statusHandlers.add(handler); + return () => { + this.statusHandlers.delete(handler); + }; + } + + /** Tear down connection immediately (used by tests / cleanup). */ + async destroy(): Promise { + this.cancelGraceTimer(); + this.refCount = 0; + if (this.connection) { + await this.connection.stop(); + this.connection = null; + this.buildPromise = null; + } + this.setStatus("idle"); + } + + // ---- internals ---- + + private async ensureConnected(channel: string, handler: MessageHandler): Promise { + if (!this.buildPromise) { + this.buildPromise = this.buildAndStart(); + } + await this.buildPromise; + + // Register handler on the live connection + this.connection?.on(channel, handler as (...args: unknown[]) => void); + } + + private async buildAndStart(): Promise { + this.setStatus("connecting"); + + const build = this._buildConnection ?? defaultBuildConnection; + this.connection = await build(this.hubUrl, this.reconnectDelaysMs); + + this.connection.onclose(() => { + this.setStatus("offline"); + }); + this.connection.onreconnecting(() => { + this.setStatus("reconnecting"); + }); + this.connection.onreconnected(() => { + this.setStatus("live"); + }); + + await this.connection.start(); + this.setStatus("live"); + } + + private setStatus(s: ConnectionStatus): void { + if (this._status === s) return; + this._status = s; + for (const h of this.statusHandlers) { + h(s); + } + } + + private startGraceTimer(): void { + this.cancelGraceTimer(); + this.graceTimer = setTimeout(() => { + this.graceTimer = null; + if (this.refCount === 0 && this.connection) { + void this.connection.stop().then(() => { + this.connection = null; + this.buildPromise = null; + this.setStatus("idle"); + }); + } + }, this.gracePeriodMs); + } + + private cancelGraceTimer(): void { + if (this.graceTimer !== null) { + clearTimeout(this.graceTimer); + this.graceTimer = null; + } + } +} + +// ---- Shared singleton map ---- + +const sharedConnections = new Map(); + +export function getSharedConnection(options: HubOptions): SignalRConnection { + const existing = sharedConnections.get(options.hubUrl); + if (existing) return existing; + const conn = new SignalRConnection(options); + sharedConnections.set(options.hubUrl, conn); + return conn; +} + +/** Reset the singleton map (test helper). */ +export function _resetSharedConnections(): void { + sharedConnections.clear(); +} + +// ---- Default builder (dynamic import) ---- + +async function defaultBuildConnection( + url: string, + delays: number[], +): Promise { + const { HubConnectionBuilder, HttpTransportType } = await import("@microsoft/signalr"); + return new HubConnectionBuilder() + .withUrl(url, { transport: HttpTransportType.WebSockets }) + .withAutomaticReconnect(delays) + .build(); +}