From 9a1b4bace1b1fa997c617f9c14feadf02e7c1df3 Mon Sep 17 00:00:00 2001 From: gnezim Date: Tue, 14 Apr 2026 23:55:12 +0300 Subject: [PATCH] Add JsonLinesHttpTransport with batching, backpressure, and redaction --- .../logger/json-lines-transport.test.ts | 131 ++++++++++++++++++ .../logger/json-lines-transport.ts | 93 +++++++++++++ 2 files changed, 224 insertions(+) create mode 100644 src/observability/logger/json-lines-transport.test.ts create mode 100644 src/observability/logger/json-lines-transport.ts diff --git a/src/observability/logger/json-lines-transport.test.ts b/src/observability/logger/json-lines-transport.test.ts new file mode 100644 index 00000000..705c16e3 --- /dev/null +++ b/src/observability/logger/json-lines-transport.test.ts @@ -0,0 +1,131 @@ +import { describe, expect, it, vi, beforeEach, afterEach } from "vitest"; +import type { LogRecord } from "./types.js"; +import { JsonLinesHttpTransport } from "./json-lines-transport.js"; + +function record(overrides?: Partial): LogRecord { + return { + ts: "2025-01-01T00:00:00.000Z", + level: "info", + msg: "test", + fields: {}, + ...overrides, + }; +} + +describe("JsonLinesHttpTransport", () => { + let fetchSpy: ReturnType; + + beforeEach(() => { + vi.useFakeTimers(); + fetchSpy = vi.fn(async () => new Response(null, { status: 200 })); + vi.stubGlobal("fetch", fetchSpy); + }); + + afterEach(() => { + vi.useRealTimers(); + vi.restoreAllMocks(); + }); + + it("flushes batch after batchSize records", async () => { + const transport = new JsonLinesHttpTransport({ + endpoint: "https://logs.example/ingest", + batchSize: 2, + flushIntervalMs: 60000, + maxBufferSize: 100, + fetchImpl: fetchSpy, + }); + + transport.write(record({ msg: "one" })); + expect(fetchSpy).not.toHaveBeenCalled(); + + transport.write(record({ msg: "two" })); + // Should have flushed after 2nd record + expect(fetchSpy).toHaveBeenCalledTimes(1); + + const body = fetchSpy.mock.calls[0]?.[1]?.body as string; + const lines = body.trim().split("\n"); + expect(lines).toHaveLength(2); + }); + + it("flushes after flushIntervalMs even if batch is not full", async () => { + const transport = new JsonLinesHttpTransport({ + endpoint: "https://logs.example/ingest", + batchSize: 100, + flushIntervalMs: 1000, + maxBufferSize: 100, + fetchImpl: fetchSpy, + }); + + transport.write(record()); + expect(fetchSpy).not.toHaveBeenCalled(); + + vi.advanceTimersByTime(1001); + expect(fetchSpy).toHaveBeenCalledTimes(1); + }); + + it("drops oldest records when buffer exceeds maxBufferSize", () => { + const transport = new JsonLinesHttpTransport({ + endpoint: "https://logs.example/ingest", + batchSize: 100, + flushIntervalMs: 60000, + maxBufferSize: 3, + fetchImpl: fetchSpy, + }); + + transport.write(record({ msg: "1" })); + transport.write(record({ msg: "2" })); + transport.write(record({ msg: "3" })); + transport.write(record({ msg: "4" })); + transport.write(record({ msg: "5" })); + + // Force flush to see what's in the buffer + transport.flush(); + const body = fetchSpy.mock.calls[0]?.[1]?.body as string; + const lines = body.trim().split("\n"); + expect(lines).toHaveLength(3); + // Should contain the 3 most recent + expect(lines[0]).toContain('"3"'); + expect(lines[1]).toContain('"4"'); + expect(lines[2]).toContain('"5"'); + }); + + it("redacts sensitive field names", async () => { + const transport = new JsonLinesHttpTransport({ + endpoint: "https://logs.example/ingest", + batchSize: 1, + flushIntervalMs: 60000, + maxBufferSize: 100, + redactFields: ["password", "token", "secret"], + fetchImpl: fetchSpy, + }); + + transport.write(record({ + fields: { password: "hunter2", token: "abc123", safe: "visible" }, + })); + + const body = fetchSpy.mock.calls[0]?.[1]?.body as string; + const parsed = JSON.parse(body.trim()); + expect(parsed.fields.password).toBe("[REDACTED]"); + expect(parsed.fields.token).toBe("[REDACTED]"); + expect(parsed.fields.safe).toBe("visible"); + }); + + it("flush() sends all buffered records and clears the buffer", async () => { + const transport = new JsonLinesHttpTransport({ + endpoint: "https://logs.example/ingest", + batchSize: 100, + flushIntervalMs: 60000, + maxBufferSize: 100, + fetchImpl: fetchSpy, + }); + + transport.write(record({ msg: "a" })); + transport.write(record({ msg: "b" })); + await transport.flush(); + expect(fetchSpy).toHaveBeenCalledTimes(1); + + // Flush again — nothing to send + await transport.flush(); + expect(fetchSpy).toHaveBeenCalledTimes(1); + }); +}); diff --git a/src/observability/logger/json-lines-transport.ts b/src/observability/logger/json-lines-transport.ts new file mode 100644 index 00000000..0f03139f --- /dev/null +++ b/src/observability/logger/json-lines-transport.ts @@ -0,0 +1,93 @@ +import type { LogRecord, LogTransport } from "./types.js"; + +export interface JsonLinesHttpTransportOptions { + endpoint: string; + batchSize?: number; + flushIntervalMs?: number; + maxBufferSize?: number; + redactFields?: string[]; + fetchImpl?: typeof fetch; +} + +export class JsonLinesHttpTransport implements LogTransport { + private readonly endpoint: string; + private readonly batchSize: number; + private readonly maxBufferSize: number; + private readonly redactFields: Set; + private readonly fetchFn: typeof fetch; + private buffer: LogRecord[] = []; + private timer: ReturnType | null = null; + + constructor(options: JsonLinesHttpTransportOptions) { + this.endpoint = options.endpoint; + this.batchSize = options.batchSize ?? 50; + this.maxBufferSize = options.maxBufferSize ?? 500; + this.redactFields = new Set( + (options.redactFields ?? ["password", "token", "secret", "authorization"]) + .map((f) => f.toLowerCase()), + ); + this.fetchFn = options.fetchImpl ?? globalThis.fetch; + + const intervalMs = options.flushIntervalMs ?? 5000; + this.timer = setInterval(() => { + if (this.buffer.length > 0) { + void this.sendBatch(); + } + }, intervalMs); + + // Unref so the timer doesn't keep the Node process alive + if (typeof this.timer === "object" && "unref" in this.timer) { + this.timer.unref(); + } + } + + write(record: LogRecord): void { + const redacted = this.redact(record); + this.buffer.push(redacted); + + // Backpressure: drop oldest if buffer exceeds max + while (this.buffer.length > this.maxBufferSize) { + this.buffer.shift(); + } + + // Flush if batch is full + if (this.buffer.length >= this.batchSize) { + void this.sendBatch(); + } + } + + async flush(): Promise { + if (this.buffer.length === 0) return; + await this.sendBatch(); + } + + private async sendBatch(): Promise { + const batch = this.buffer.splice(0, this.buffer.length); + if (batch.length === 0) return; + + const body = batch.map((r) => JSON.stringify(r)).join("\n"); + + try { + await this.fetchFn(this.endpoint, { + method: "POST", + headers: { "Content-Type": "application/x-ndjson" }, + body, + }); + } catch { + // Silently drop failed sends — logging a log failure causes recursion. + // In production, a metrics counter would track send failures. + } + } + + private redact(record: LogRecord): LogRecord { + if (this.redactFields.size === 0) return record; + + const fields = { ...record.fields }; + for (const key of Object.keys(fields)) { + if (this.redactFields.has(key.toLowerCase())) { + fields[key] = "[REDACTED]"; + } + } + return { ...record, fields }; + } +}