From fe30bb280e5d80ca66f915b45a5a0c25b501df3a Mon Sep 17 00:00:00 2001 From: Owen Date: Tue, 31 Mar 2026 14:04:58 -0700 Subject: [PATCH] Add option for how to batch --- .../providers/HttpLogDestination.ts | 118 +++++++++++++----- server/private/lib/logStreaming/types.ts | 17 +++ src/components/HttpDestinationCredenza.tsx | 111 +++++++++++++++- 3 files changed, 215 insertions(+), 31 deletions(-) diff --git a/server/private/lib/logStreaming/providers/HttpLogDestination.ts b/server/private/lib/logStreaming/providers/HttpLogDestination.ts index eac3c42e2..5e149f814 100644 --- a/server/private/lib/logStreaming/providers/HttpLogDestination.ts +++ b/server/private/lib/logStreaming/providers/HttpLogDestination.ts @@ -12,7 +12,7 @@ */ import logger from "@server/logger"; -import { LogEvent, HttpConfig } from "../types"; +import { LogEvent, HttpConfig, PayloadFormat } from "../types"; import { LogDestinationProvider } from "./LogDestinationProvider"; // --------------------------------------------------------------------------- @@ -22,6 +22,9 @@ import { LogDestinationProvider } from "./LogDestinationProvider"; /** Maximum time (ms) to wait for a single HTTP response. */ const REQUEST_TIMEOUT_MS = 30_000; +/** Default payload format when none is specified in the config. */ +const DEFAULT_FORMAT: PayloadFormat = "json_array"; + // --------------------------------------------------------------------------- // HttpLogDestination // --------------------------------------------------------------------------- @@ -32,16 +35,31 @@ const REQUEST_TIMEOUT_MS = 30_000; * * **Payload format** * - * Without a body template the payload is a JSON array, one object per event: - * ```json - * [ - * { "event": "request", "timestamp": "2024-01-01T00:00:00.000Z", "data": { … } }, - * … - * ] - * ``` + * **Payload formats** (controlled by `config.format`): * - * With a body template each event is rendered through the template and the - * resulting objects are wrapped in the same outer array. Template placeholders: + * - `json_array` (default) — one POST per batch, body is a JSON array: + * ```json + * [ + * { "event": "request", "timestamp": "2024-01-01T00:00:00.000Z", "data": { … } }, + * … + * ] + * ``` + * `Content-Type: application/json` + * + * - `ndjson` — one POST per batch, body is newline-delimited JSON (one object + * per line, no outer array). Required by Splunk HEC, Elastic/OpenSearch, + * and Grafana Loki: + * ``` + * {"event":"request","timestamp":"…","data":{…}} + * {"event":"action","timestamp":"…","data":{…}} + * ``` + * `Content-Type: application/x-ndjson` + * + * - `json_single` — one POST **per event**, body is a plain JSON object. + * Use only for endpoints that cannot handle batches at all. + * + * With a body template each event is rendered through the template before + * serialisation. Template placeholders: * - `{{event}}` → the LogType string ("request", "action", etc.) * - `{{timestamp}}` → ISO-8601 UTC datetime string * - `{{data}}` → raw inline JSON object (**no surrounding quotes**) @@ -67,9 +85,41 @@ export class HttpLogDestination implements LogDestinationProvider { async send(events: LogEvent[]): Promise { if (events.length === 0) return; - const headers = this.buildHeaders(); - const payload = this.buildPayload(events); - const body = JSON.stringify(payload); + const format = this.config.format ?? DEFAULT_FORMAT; + + if (format === "json_single") { + // One HTTP POST per event – send sequentially so a failure on one + // event throws and lets the manager retry the whole batch from the + // same cursor position. + for (const event of events) { + await this.postRequest( + this.buildSingleBody(event), + "application/json" + ); + } + return; + } + + if (format === "ndjson") { + const body = this.buildNdjsonBody(events); + await this.postRequest(body, "application/x-ndjson"); + return; + } + + // json_array (default) + const body = JSON.stringify(this.buildArrayPayload(events)); + await this.postRequest(body, "application/json"); + } + + // ----------------------------------------------------------------------- + // Internal HTTP sender + // ----------------------------------------------------------------------- + + private async postRequest( + body: string, + contentType: string + ): Promise { + const headers = this.buildHeaders(contentType); const controller = new AbortController(); const timeoutHandle = setTimeout( @@ -124,9 +174,9 @@ export class HttpLogDestination implements LogDestinationProvider { // Header construction // ----------------------------------------------------------------------- - private buildHeaders(): Record { + private buildHeaders(contentType: string): Record { const headers: Record = { - "Content-Type": "application/json" + "Content-Type": contentType }; // Authentication @@ -176,24 +226,36 @@ export class HttpLogDestination implements LogDestinationProvider { // Payload construction // ----------------------------------------------------------------------- - /** - * Build the JSON-serialisable value that will be sent as the request body. - * - * - No template → `Array<{ event, timestamp, data }>` - * - With template → `Array` - */ - private buildPayload(events: LogEvent[]): unknown { + /** Single default event object (no surrounding array). */ + private buildEventObject(event: LogEvent): unknown { if (this.config.useBodyTemplate && this.config.bodyTemplate?.trim()) { - return events.map((event) => - this.renderTemplate(this.config.bodyTemplate!, event) - ); + return this.renderTemplate(this.config.bodyTemplate!, event); } - - return events.map((event) => ({ + return { event: event.logType, timestamp: epochSecondsToIso(event.timestamp), data: event.data - })); + }; + } + + /** JSON array payload – used for `json_array` format. */ + private buildArrayPayload(events: LogEvent[]): unknown[] { + return events.map((e) => this.buildEventObject(e)); + } + + /** + * NDJSON payload – one JSON object per line, no outer array. + * Each line must be a complete, valid JSON object. + */ + private buildNdjsonBody(events: LogEvent[]): string { + return events + .map((e) => JSON.stringify(this.buildEventObject(e))) + .join("\n"); + } + + /** Single-event body – used for `json_single` format. */ + private buildSingleBody(event: LogEvent): string { + return JSON.stringify(this.buildEventObject(event)); } /** diff --git a/server/private/lib/logStreaming/types.ts b/server/private/lib/logStreaming/types.ts index 585adb2a2..03fe88cad 100644 --- a/server/private/lib/logStreaming/types.ts +++ b/server/private/lib/logStreaming/types.ts @@ -57,6 +57,18 @@ export interface LogBatch { export type AuthType = "none" | "bearer" | "basic" | "custom"; +/** + * Controls how the batch of events is serialised into the HTTP request body. + * + * - `json_array` – `[{…}, {…}]` — default; one POST per batch wrapped in a + * JSON array. Works with most generic webhooks and Datadog. + * - `ndjson` – `{…}\n{…}` — newline-delimited JSON, one object per + * line. Required by Splunk HEC, Elastic/OpenSearch, Loki. + * - `json_single` – one HTTP POST per event, body is a plain JSON object. + * Use only for endpoints that cannot handle batches at all. + */ +export type PayloadFormat = "json_array" | "ndjson" | "json_single"; + export interface HttpConfig { /** Human-readable label for the destination */ name: string; @@ -75,6 +87,11 @@ export interface HttpConfig { /** Additional static headers appended to every request */ headers: Array<{ key: string; value: string }>; /** Whether to render a custom body template instead of the default shape */ + /** + * How events are serialised into the request body. + * Defaults to `"json_array"` when absent. + */ + format?: PayloadFormat; useBodyTemplate: boolean; /** * Handlebars-style template for the JSON body of each event. diff --git a/src/components/HttpDestinationCredenza.tsx b/src/components/HttpDestinationCredenza.tsx index 653b766b0..205c17c84 100644 --- a/src/components/HttpDestinationCredenza.tsx +++ b/src/components/HttpDestinationCredenza.tsx @@ -29,6 +29,8 @@ import { build } from "@server/build"; export type AuthType = "none" | "bearer" | "basic" | "custom"; +export type PayloadFormat = "json_array" | "ndjson" | "json_single"; + export interface HttpConfig { name: string; url: string; @@ -38,6 +40,7 @@ export interface HttpConfig { customHeaderName?: string; customHeaderValue?: string; headers: Array<{ key: string; value: string }>; + format: PayloadFormat; useBodyTemplate: boolean; bodyTemplate?: string; } @@ -67,6 +70,7 @@ export const defaultHttpConfig = (): HttpConfig => ({ customHeaderName: "", customHeaderValue: "", headers: [], + format: "json_array", useBodyTemplate: false, bodyTemplate: "" }); @@ -278,7 +282,7 @@ export function HttpDestinationCredenza({ items={[ { title: "Settings", href: "" }, { title: "Headers", href: "" }, - { title: "Body Template", href: "" }, + { title: "Body", href: "" }, { title: "Logs", href: "" } ]} > @@ -539,7 +543,7 @@ export function HttpDestinationCredenza({ /> - {/* ── Body Template tab ─────────────────────────── */} + {/* ── Body tab ─────────────────────────── */}
)} + + {/* Payload Format */} +
+
+ +

+ How events are serialised into each + request body. +

+
+ + + update({ + format: v as PayloadFormat + }) + } + className="gap-2" + > + {/* JSON Array */} +
+ +
+ +

+ One request per batch, body is + a JSON array{" "} + + [{"{...}"}, {"{...}"}] + + . Compatible with most generic + webhooks and Datadog. +

+
+
+ + {/* NDJSON */} +
+ +
+ +

+ One request per batch, body is + newline-delimited JSON — one + object per line, no outer + array. Required by{" "} + Splunk HEC,{" "} + + Elastic / OpenSearch + + , and{" "} + Grafana Loki. +

+
+
+ + {/* Single event per request */} +
+ +
+ +

+ Sends a separate HTTP POST for + each individual event. Use only + for endpoints that cannot + handle batches. +

+
+
+
+
{/* ── Logs tab ──────────────────────────────────── */} @@ -728,4 +833,4 @@ export function HttpDestinationCredenza({ ); -} \ No newline at end of file +}