Add JsonLinesHttpTransport with batching, backpressure, and redaction
This commit is contained in:
@@ -0,0 +1,131 @@
|
||||
import { describe, expect, it, vi, beforeEach, afterEach } from "vitest";
|
||||
import type { LogRecord } from "./types.js";
|
||||
import { JsonLinesHttpTransport } from "./json-lines-transport.js";
|
||||
|
||||
function record(overrides?: Partial<LogRecord>): LogRecord {
|
||||
return {
|
||||
ts: "2025-01-01T00:00:00.000Z",
|
||||
level: "info",
|
||||
msg: "test",
|
||||
fields: {},
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
describe("JsonLinesHttpTransport", () => {
|
||||
let fetchSpy: ReturnType<typeof vi.fn>;
|
||||
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
fetchSpy = vi.fn(async () => new Response(null, { status: 200 }));
|
||||
vi.stubGlobal("fetch", fetchSpy);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
it("flushes batch after batchSize records", async () => {
|
||||
const transport = new JsonLinesHttpTransport({
|
||||
endpoint: "https://logs.example/ingest",
|
||||
batchSize: 2,
|
||||
flushIntervalMs: 60000,
|
||||
maxBufferSize: 100,
|
||||
fetchImpl: fetchSpy,
|
||||
});
|
||||
|
||||
transport.write(record({ msg: "one" }));
|
||||
expect(fetchSpy).not.toHaveBeenCalled();
|
||||
|
||||
transport.write(record({ msg: "two" }));
|
||||
// Should have flushed after 2nd record
|
||||
expect(fetchSpy).toHaveBeenCalledTimes(1);
|
||||
|
||||
const body = fetchSpy.mock.calls[0]?.[1]?.body as string;
|
||||
const lines = body.trim().split("\n");
|
||||
expect(lines).toHaveLength(2);
|
||||
});
|
||||
|
||||
it("flushes after flushIntervalMs even if batch is not full", async () => {
|
||||
const transport = new JsonLinesHttpTransport({
|
||||
endpoint: "https://logs.example/ingest",
|
||||
batchSize: 100,
|
||||
flushIntervalMs: 1000,
|
||||
maxBufferSize: 100,
|
||||
fetchImpl: fetchSpy,
|
||||
});
|
||||
|
||||
transport.write(record());
|
||||
expect(fetchSpy).not.toHaveBeenCalled();
|
||||
|
||||
vi.advanceTimersByTime(1001);
|
||||
expect(fetchSpy).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("drops oldest records when buffer exceeds maxBufferSize", () => {
|
||||
const transport = new JsonLinesHttpTransport({
|
||||
endpoint: "https://logs.example/ingest",
|
||||
batchSize: 100,
|
||||
flushIntervalMs: 60000,
|
||||
maxBufferSize: 3,
|
||||
fetchImpl: fetchSpy,
|
||||
});
|
||||
|
||||
transport.write(record({ msg: "1" }));
|
||||
transport.write(record({ msg: "2" }));
|
||||
transport.write(record({ msg: "3" }));
|
||||
transport.write(record({ msg: "4" }));
|
||||
transport.write(record({ msg: "5" }));
|
||||
|
||||
// Force flush to see what's in the buffer
|
||||
transport.flush();
|
||||
const body = fetchSpy.mock.calls[0]?.[1]?.body as string;
|
||||
const lines = body.trim().split("\n");
|
||||
expect(lines).toHaveLength(3);
|
||||
// Should contain the 3 most recent
|
||||
expect(lines[0]).toContain('"3"');
|
||||
expect(lines[1]).toContain('"4"');
|
||||
expect(lines[2]).toContain('"5"');
|
||||
});
|
||||
|
||||
it("redacts sensitive field names", async () => {
|
||||
const transport = new JsonLinesHttpTransport({
|
||||
endpoint: "https://logs.example/ingest",
|
||||
batchSize: 1,
|
||||
flushIntervalMs: 60000,
|
||||
maxBufferSize: 100,
|
||||
redactFields: ["password", "token", "secret"],
|
||||
fetchImpl: fetchSpy,
|
||||
});
|
||||
|
||||
transport.write(record({
|
||||
fields: { password: "hunter2", token: "abc123", safe: "visible" },
|
||||
}));
|
||||
|
||||
const body = fetchSpy.mock.calls[0]?.[1]?.body as string;
|
||||
const parsed = JSON.parse(body.trim());
|
||||
expect(parsed.fields.password).toBe("[REDACTED]");
|
||||
expect(parsed.fields.token).toBe("[REDACTED]");
|
||||
expect(parsed.fields.safe).toBe("visible");
|
||||
});
|
||||
|
||||
it("flush() sends all buffered records and clears the buffer", async () => {
|
||||
const transport = new JsonLinesHttpTransport({
|
||||
endpoint: "https://logs.example/ingest",
|
||||
batchSize: 100,
|
||||
flushIntervalMs: 60000,
|
||||
maxBufferSize: 100,
|
||||
fetchImpl: fetchSpy,
|
||||
});
|
||||
|
||||
transport.write(record({ msg: "a" }));
|
||||
transport.write(record({ msg: "b" }));
|
||||
await transport.flush();
|
||||
expect(fetchSpy).toHaveBeenCalledTimes(1);
|
||||
|
||||
// Flush again — nothing to send
|
||||
await transport.flush();
|
||||
expect(fetchSpy).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,93 @@
|
||||
import type { LogRecord, LogTransport } from "./types.js";
|
||||
|
||||
export interface JsonLinesHttpTransportOptions {
|
||||
endpoint: string;
|
||||
batchSize?: number;
|
||||
flushIntervalMs?: number;
|
||||
maxBufferSize?: number;
|
||||
redactFields?: string[];
|
||||
fetchImpl?: typeof fetch;
|
||||
}
|
||||
|
||||
export class JsonLinesHttpTransport implements LogTransport {
|
||||
private readonly endpoint: string;
|
||||
private readonly batchSize: number;
|
||||
private readonly maxBufferSize: number;
|
||||
private readonly redactFields: Set<string>;
|
||||
private readonly fetchFn: typeof fetch;
|
||||
private buffer: LogRecord[] = [];
|
||||
private timer: ReturnType<typeof setInterval> | null = null;
|
||||
|
||||
constructor(options: JsonLinesHttpTransportOptions) {
|
||||
this.endpoint = options.endpoint;
|
||||
this.batchSize = options.batchSize ?? 50;
|
||||
this.maxBufferSize = options.maxBufferSize ?? 500;
|
||||
this.redactFields = new Set(
|
||||
(options.redactFields ?? ["password", "token", "secret", "authorization"])
|
||||
.map((f) => f.toLowerCase()),
|
||||
);
|
||||
this.fetchFn = options.fetchImpl ?? globalThis.fetch;
|
||||
|
||||
const intervalMs = options.flushIntervalMs ?? 5000;
|
||||
this.timer = setInterval(() => {
|
||||
if (this.buffer.length > 0) {
|
||||
void this.sendBatch();
|
||||
}
|
||||
}, intervalMs);
|
||||
|
||||
// Unref so the timer doesn't keep the Node process alive
|
||||
if (typeof this.timer === "object" && "unref" in this.timer) {
|
||||
this.timer.unref();
|
||||
}
|
||||
}
|
||||
|
||||
write(record: LogRecord): void {
|
||||
const redacted = this.redact(record);
|
||||
this.buffer.push(redacted);
|
||||
|
||||
// Backpressure: drop oldest if buffer exceeds max
|
||||
while (this.buffer.length > this.maxBufferSize) {
|
||||
this.buffer.shift();
|
||||
}
|
||||
|
||||
// Flush if batch is full
|
||||
if (this.buffer.length >= this.batchSize) {
|
||||
void this.sendBatch();
|
||||
}
|
||||
}
|
||||
|
||||
async flush(): Promise<void> {
|
||||
if (this.buffer.length === 0) return;
|
||||
await this.sendBatch();
|
||||
}
|
||||
|
||||
private async sendBatch(): Promise<void> {
|
||||
const batch = this.buffer.splice(0, this.buffer.length);
|
||||
if (batch.length === 0) return;
|
||||
|
||||
const body = batch.map((r) => JSON.stringify(r)).join("\n");
|
||||
|
||||
try {
|
||||
await this.fetchFn(this.endpoint, {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/x-ndjson" },
|
||||
body,
|
||||
});
|
||||
} catch {
|
||||
// Silently drop failed sends — logging a log failure causes recursion.
|
||||
// In production, a metrics counter would track send failures.
|
||||
}
|
||||
}
|
||||
|
||||
private redact(record: LogRecord): LogRecord {
|
||||
if (this.redactFields.size === 0) return record;
|
||||
|
||||
const fields = { ...record.fields };
|
||||
for (const key of Object.keys(fields)) {
|
||||
if (this.redactFields.has(key.toLowerCase())) {
|
||||
fields[key] = "[REDACTED]";
|
||||
}
|
||||
}
|
||||
return { ...record, fields };
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user