plan/react-rewrite #1
@@ -0,0 +1,253 @@
|
||||
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
|
||||
import {
|
||||
SignalRConnection,
|
||||
getSharedConnection,
|
||||
_resetSharedConnections,
|
||||
type ConnectionStatus,
|
||||
} from "./connection.js";
|
||||
|
||||
// ---- helpers ----
|
||||
|
||||
function createMockHub() {
|
||||
const handlers: Record<string, ((...args: unknown[]) => void)[]> = {};
|
||||
let onCloseCallback: ((error?: Error) => void) | null = null;
|
||||
let onReconnectingCallback: ((error?: Error) => void) | null = null;
|
||||
let onReconnectedCallback: ((connectionId?: string) => void) | null = null;
|
||||
|
||||
const hub = {
|
||||
start: vi.fn().mockResolvedValue(undefined),
|
||||
stop: vi.fn().mockResolvedValue(undefined),
|
||||
on: vi.fn((method: string, handler: (...args: unknown[]) => void) => {
|
||||
if (!handlers[method]) handlers[method] = [];
|
||||
handlers[method]!.push(handler);
|
||||
}),
|
||||
off: vi.fn((method: string, handler: (...args: unknown[]) => void) => {
|
||||
const list = handlers[method];
|
||||
if (list) {
|
||||
const idx = list.indexOf(handler);
|
||||
if (idx >= 0) list.splice(idx, 1);
|
||||
}
|
||||
}),
|
||||
onclose: vi.fn((cb: (error?: Error) => void) => {
|
||||
onCloseCallback = cb;
|
||||
}),
|
||||
onreconnecting: vi.fn((cb: (error?: Error) => void) => {
|
||||
onReconnectingCallback = cb;
|
||||
}),
|
||||
onreconnected: vi.fn((cb: (connectionId?: string) => void) => {
|
||||
onReconnectedCallback = cb;
|
||||
}),
|
||||
// Test helpers to simulate events
|
||||
_simulateMessage(channel: string, ...args: unknown[]) {
|
||||
for (const h of handlers[channel] ?? []) h(...args);
|
||||
},
|
||||
_simulateClose(error?: Error) {
|
||||
onCloseCallback?.(error);
|
||||
},
|
||||
_simulateReconnecting(error?: Error) {
|
||||
onReconnectingCallback?.(error);
|
||||
},
|
||||
_simulateReconnected(connectionId?: string) {
|
||||
onReconnectedCallback?.(connectionId);
|
||||
},
|
||||
};
|
||||
|
||||
return hub;
|
||||
}
|
||||
|
||||
function createConnection(
|
||||
hub: ReturnType<typeof createMockHub>,
|
||||
gracePeriodMs = 5000,
|
||||
): SignalRConnection {
|
||||
const conn = new SignalRConnection({
|
||||
hubUrl: "https://hub.test/flights",
|
||||
gracePeriodMs,
|
||||
});
|
||||
conn._buildConnection = vi.fn().mockResolvedValue(hub);
|
||||
return conn;
|
||||
}
|
||||
|
||||
// ---- tests ----
|
||||
|
||||
describe("SignalRConnection", () => {
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
_resetSharedConnections();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("two rapid subscribes produce exactly one start() call", async () => {
|
||||
const hub = createMockHub();
|
||||
const conn = createConnection(hub);
|
||||
|
||||
const handler1 = vi.fn();
|
||||
const handler2 = vi.fn();
|
||||
|
||||
conn.subscribe("flights", handler1);
|
||||
conn.subscribe("flights", handler2);
|
||||
|
||||
// Let the async build + start resolve
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
expect(hub.start).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("unmount + remount within grace period reuses connection", async () => {
|
||||
const hub = createMockHub();
|
||||
const conn = createConnection(hub, 5000);
|
||||
|
||||
const handler1 = vi.fn();
|
||||
const unsub1 = conn.subscribe("flights", handler1);
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
expect(hub.start).toHaveBeenCalledTimes(1);
|
||||
|
||||
// Unsubscribe (starts grace timer)
|
||||
unsub1();
|
||||
|
||||
// Advance time partially (within grace period)
|
||||
vi.advanceTimersByTime(2000);
|
||||
|
||||
// Remount -- subscribe again
|
||||
const handler2 = vi.fn();
|
||||
conn.subscribe("flights", handler2);
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
// Still only one start() -- connection was reused
|
||||
expect(hub.start).toHaveBeenCalledTimes(1);
|
||||
expect(hub.stop).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("unmount + remount after grace period creates fresh connection", async () => {
|
||||
const hub1 = createMockHub();
|
||||
const hub2 = createMockHub();
|
||||
let buildCallCount = 0;
|
||||
|
||||
const conn = new SignalRConnection({
|
||||
hubUrl: "https://hub.test/flights",
|
||||
gracePeriodMs: 100,
|
||||
});
|
||||
conn._buildConnection = vi.fn().mockImplementation(async () => {
|
||||
buildCallCount++;
|
||||
return buildCallCount === 1 ? hub1 : hub2;
|
||||
});
|
||||
|
||||
const handler1 = vi.fn();
|
||||
const unsub1 = conn.subscribe("flights", handler1);
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
expect(hub1.start).toHaveBeenCalledTimes(1);
|
||||
|
||||
// Unsubscribe
|
||||
unsub1();
|
||||
|
||||
// Advance past grace period (100ms) so connection stops
|
||||
await vi.advanceTimersByTimeAsync(200);
|
||||
|
||||
expect(hub1.stop).toHaveBeenCalledTimes(1);
|
||||
|
||||
// Remount -- subscribe again, should create fresh connection
|
||||
const handler2 = vi.fn();
|
||||
conn.subscribe("flights", handler2);
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
expect(hub2.start).toHaveBeenCalledTimes(1);
|
||||
expect(buildCallCount).toBe(2);
|
||||
});
|
||||
|
||||
it("status transitions fire onStatusChange handlers", async () => {
|
||||
const hub = createMockHub();
|
||||
const conn = createConnection(hub);
|
||||
|
||||
const statuses: ConnectionStatus[] = [];
|
||||
conn.onStatusChange((s) => statuses.push(s));
|
||||
|
||||
expect(conn.status).toBe("idle");
|
||||
|
||||
conn.subscribe("flights", vi.fn());
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
expect(statuses).toContain("connecting");
|
||||
expect(statuses).toContain("live");
|
||||
expect(conn.status).toBe("live");
|
||||
|
||||
// Simulate reconnecting
|
||||
hub._simulateReconnecting();
|
||||
expect(conn.status).toBe("reconnecting");
|
||||
expect(statuses).toContain("reconnecting");
|
||||
|
||||
// Simulate reconnected
|
||||
hub._simulateReconnected("abc");
|
||||
expect(conn.status).toBe("live");
|
||||
|
||||
// Simulate close
|
||||
hub._simulateClose();
|
||||
expect(conn.status).toBe("offline");
|
||||
expect(statuses).toContain("offline");
|
||||
});
|
||||
|
||||
it("onStatusChange unsubscribe stops notifications", async () => {
|
||||
const hub = createMockHub();
|
||||
const conn = createConnection(hub);
|
||||
|
||||
const statuses: ConnectionStatus[] = [];
|
||||
const unsub = conn.onStatusChange((s) => statuses.push(s));
|
||||
|
||||
conn.subscribe("flights", vi.fn());
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
unsub();
|
||||
hub._simulateClose();
|
||||
|
||||
// Should not have "offline" since we unsubscribed
|
||||
expect(statuses).not.toContain("offline");
|
||||
});
|
||||
|
||||
it("unsubscribe is idempotent", async () => {
|
||||
const hub = createMockHub();
|
||||
const conn = createConnection(hub, 100);
|
||||
|
||||
const unsub = conn.subscribe("flights", vi.fn());
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
unsub();
|
||||
unsub(); // Second call should be a no-op
|
||||
|
||||
// Grace timer should only be started once
|
||||
await vi.advanceTimersByTimeAsync(200);
|
||||
expect(hub.stop).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("destroy tears down immediately", async () => {
|
||||
const hub = createMockHub();
|
||||
const conn = createConnection(hub);
|
||||
|
||||
conn.subscribe("flights", vi.fn());
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
await conn.destroy();
|
||||
expect(hub.stop).toHaveBeenCalledTimes(1);
|
||||
expect(conn.status).toBe("idle");
|
||||
});
|
||||
});
|
||||
|
||||
describe("getSharedConnection", () => {
|
||||
beforeEach(() => {
|
||||
_resetSharedConnections();
|
||||
});
|
||||
|
||||
it("returns same instance for same hubUrl", () => {
|
||||
const a = getSharedConnection({ hubUrl: "https://hub.test/a" });
|
||||
const b = getSharedConnection({ hubUrl: "https://hub.test/a" });
|
||||
expect(a).toBe(b);
|
||||
});
|
||||
|
||||
it("returns different instances for different hubUrls", () => {
|
||||
const a = getSharedConnection({ hubUrl: "https://hub.test/a" });
|
||||
const b = getSharedConnection({ hubUrl: "https://hub.test/b" });
|
||||
expect(a).not.toBe(b);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,206 @@
|
||||
/**
|
||||
* Reference-counted SignalR connection wrapper.
|
||||
*
|
||||
* Uses dynamic import("@microsoft/signalr") so the library never enters
|
||||
* the SSR bundle. Connections are lazily started on first subscriber and
|
||||
* closed gracefully after the last unsubscriber + a configurable grace period.
|
||||
*/
|
||||
|
||||
export interface HubOptions {
|
||||
hubUrl: string;
|
||||
reconnectDelaysMs?: number[];
|
||||
gracePeriodMs?: number;
|
||||
}
|
||||
|
||||
export type ConnectionStatus =
|
||||
| "idle"
|
||||
| "connecting"
|
||||
| "live"
|
||||
| "reconnecting"
|
||||
| "offline";
|
||||
|
||||
type StatusHandler = (status: ConnectionStatus) => void;
|
||||
type MessageHandler = (message: unknown) => void;
|
||||
|
||||
/** Minimal subset of HubConnection we actually use -- aids testability. */
|
||||
interface HubConnectionLike {
|
||||
start(): Promise<void>;
|
||||
stop(): Promise<void>;
|
||||
on(methodName: string, handler: (...args: unknown[]) => void): void;
|
||||
off(methodName: string, handler: (...args: unknown[]) => void): void;
|
||||
onclose(callback: (error?: Error) => void): void;
|
||||
onreconnecting(callback: (error?: Error) => void): void;
|
||||
onreconnected(callback: (connectionId?: string) => void): void;
|
||||
}
|
||||
|
||||
export class SignalRConnection {
|
||||
private readonly hubUrl: string;
|
||||
private readonly reconnectDelaysMs: number[];
|
||||
private readonly gracePeriodMs: number;
|
||||
|
||||
private connection: HubConnectionLike | null = null;
|
||||
private buildPromise: Promise<void> | null = null;
|
||||
private _status: ConnectionStatus = "idle";
|
||||
private refCount = 0;
|
||||
private graceTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
|
||||
private readonly statusHandlers = new Set<StatusHandler>();
|
||||
private readonly channelHandlers = new Map<string, Set<MessageHandler>>();
|
||||
|
||||
/** Overridable for testing -- avoids importing @microsoft/signalr at module level. */
|
||||
_buildConnection: ((url: string, delays: number[]) => Promise<HubConnectionLike>) | null = null;
|
||||
|
||||
constructor(options: HubOptions) {
|
||||
this.hubUrl = options.hubUrl;
|
||||
this.reconnectDelaysMs = options.reconnectDelaysMs ?? [0, 2000, 10000, 30000];
|
||||
this.gracePeriodMs = options.gracePeriodMs ?? 5000;
|
||||
}
|
||||
|
||||
get status(): ConnectionStatus {
|
||||
return this._status;
|
||||
}
|
||||
|
||||
/**
|
||||
* Subscribe to messages on a channel. Returns an unsubscribe function.
|
||||
* The underlying connection starts lazily on the first subscriber.
|
||||
*/
|
||||
subscribe(channel: string, handler: MessageHandler): () => void {
|
||||
let handlers = this.channelHandlers.get(channel);
|
||||
if (!handlers) {
|
||||
handlers = new Set();
|
||||
this.channelHandlers.set(channel, handlers);
|
||||
}
|
||||
handlers.add(handler);
|
||||
|
||||
this.refCount++;
|
||||
this.cancelGraceTimer();
|
||||
void this.ensureConnected(channel, handler);
|
||||
|
||||
let removed = false;
|
||||
return () => {
|
||||
if (removed) return;
|
||||
removed = true;
|
||||
handlers!.delete(handler);
|
||||
if (handlers!.size === 0) {
|
||||
this.channelHandlers.delete(channel);
|
||||
this.connection?.off(channel, handler as (...args: unknown[]) => void);
|
||||
}
|
||||
this.refCount--;
|
||||
if (this.refCount <= 0) {
|
||||
this.refCount = 0;
|
||||
this.startGraceTimer();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/** Listen for connection status changes. Returns an unsubscribe function. */
|
||||
onStatusChange(handler: StatusHandler): () => void {
|
||||
this.statusHandlers.add(handler);
|
||||
return () => {
|
||||
this.statusHandlers.delete(handler);
|
||||
};
|
||||
}
|
||||
|
||||
/** Tear down connection immediately (used by tests / cleanup). */
|
||||
async destroy(): Promise<void> {
|
||||
this.cancelGraceTimer();
|
||||
this.refCount = 0;
|
||||
if (this.connection) {
|
||||
await this.connection.stop();
|
||||
this.connection = null;
|
||||
this.buildPromise = null;
|
||||
}
|
||||
this.setStatus("idle");
|
||||
}
|
||||
|
||||
// ---- internals ----
|
||||
|
||||
private async ensureConnected(channel: string, handler: MessageHandler): Promise<void> {
|
||||
if (!this.buildPromise) {
|
||||
this.buildPromise = this.buildAndStart();
|
||||
}
|
||||
await this.buildPromise;
|
||||
|
||||
// Register handler on the live connection
|
||||
this.connection?.on(channel, handler as (...args: unknown[]) => void);
|
||||
}
|
||||
|
||||
private async buildAndStart(): Promise<void> {
|
||||
this.setStatus("connecting");
|
||||
|
||||
const build = this._buildConnection ?? defaultBuildConnection;
|
||||
this.connection = await build(this.hubUrl, this.reconnectDelaysMs);
|
||||
|
||||
this.connection.onclose(() => {
|
||||
this.setStatus("offline");
|
||||
});
|
||||
this.connection.onreconnecting(() => {
|
||||
this.setStatus("reconnecting");
|
||||
});
|
||||
this.connection.onreconnected(() => {
|
||||
this.setStatus("live");
|
||||
});
|
||||
|
||||
await this.connection.start();
|
||||
this.setStatus("live");
|
||||
}
|
||||
|
||||
private setStatus(s: ConnectionStatus): void {
|
||||
if (this._status === s) return;
|
||||
this._status = s;
|
||||
for (const h of this.statusHandlers) {
|
||||
h(s);
|
||||
}
|
||||
}
|
||||
|
||||
private startGraceTimer(): void {
|
||||
this.cancelGraceTimer();
|
||||
this.graceTimer = setTimeout(() => {
|
||||
this.graceTimer = null;
|
||||
if (this.refCount === 0 && this.connection) {
|
||||
void this.connection.stop().then(() => {
|
||||
this.connection = null;
|
||||
this.buildPromise = null;
|
||||
this.setStatus("idle");
|
||||
});
|
||||
}
|
||||
}, this.gracePeriodMs);
|
||||
}
|
||||
|
||||
private cancelGraceTimer(): void {
|
||||
if (this.graceTimer !== null) {
|
||||
clearTimeout(this.graceTimer);
|
||||
this.graceTimer = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ---- Shared singleton map ----
|
||||
|
||||
const sharedConnections = new Map<string, SignalRConnection>();
|
||||
|
||||
export function getSharedConnection(options: HubOptions): SignalRConnection {
|
||||
const existing = sharedConnections.get(options.hubUrl);
|
||||
if (existing) return existing;
|
||||
const conn = new SignalRConnection(options);
|
||||
sharedConnections.set(options.hubUrl, conn);
|
||||
return conn;
|
||||
}
|
||||
|
||||
/** Reset the singleton map (test helper). */
|
||||
export function _resetSharedConnections(): void {
|
||||
sharedConnections.clear();
|
||||
}
|
||||
|
||||
// ---- Default builder (dynamic import) ----
|
||||
|
||||
async function defaultBuildConnection(
|
||||
url: string,
|
||||
delays: number[],
|
||||
): Promise<HubConnectionLike> {
|
||||
const { HubConnectionBuilder, HttpTransportType } = await import("@microsoft/signalr");
|
||||
return new HubConnectionBuilder()
|
||||
.withUrl(url, { transport: HttpTransportType.WebSockets })
|
||||
.withAutomaticReconnect(delays)
|
||||
.build();
|
||||
}
|
||||
Reference in New Issue
Block a user