mirror of
https://github.com/fosrl/pangolin.git
synced 2026-04-01 07:26:38 +00:00
Add option for how to batch
This commit is contained in:
@@ -12,7 +12,7 @@
|
||||
*/
|
||||
|
||||
import logger from "@server/logger";
|
||||
import { LogEvent, HttpConfig } from "../types";
|
||||
import { LogEvent, HttpConfig, PayloadFormat } from "../types";
|
||||
import { LogDestinationProvider } from "./LogDestinationProvider";
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -22,6 +22,9 @@ import { LogDestinationProvider } from "./LogDestinationProvider";
|
||||
/** Maximum time (ms) to wait for a single HTTP response. */
|
||||
const REQUEST_TIMEOUT_MS = 30_000;
|
||||
|
||||
/** Default payload format when none is specified in the config. */
|
||||
const DEFAULT_FORMAT: PayloadFormat = "json_array";
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// HttpLogDestination
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -32,16 +35,31 @@ const REQUEST_TIMEOUT_MS = 30_000;
|
||||
*
|
||||
* **Payload format**
|
||||
*
|
||||
* Without a body template the payload is a JSON array, one object per event:
|
||||
* ```json
|
||||
* [
|
||||
* { "event": "request", "timestamp": "2024-01-01T00:00:00.000Z", "data": { … } },
|
||||
* …
|
||||
* ]
|
||||
* ```
|
||||
* **Payload formats** (controlled by `config.format`):
|
||||
*
|
||||
* With a body template each event is rendered through the template and the
|
||||
* resulting objects are wrapped in the same outer array. Template placeholders:
|
||||
* - `json_array` (default) — one POST per batch, body is a JSON array:
|
||||
* ```json
|
||||
* [
|
||||
* { "event": "request", "timestamp": "2024-01-01T00:00:00.000Z", "data": { … } },
|
||||
* …
|
||||
* ]
|
||||
* ```
|
||||
* `Content-Type: application/json`
|
||||
*
|
||||
* - `ndjson` — one POST per batch, body is newline-delimited JSON (one object
|
||||
* per line, no outer array). Required by Splunk HEC, Elastic/OpenSearch,
|
||||
* and Grafana Loki:
|
||||
* ```
|
||||
* {"event":"request","timestamp":"…","data":{…}}
|
||||
* {"event":"action","timestamp":"…","data":{…}}
|
||||
* ```
|
||||
* `Content-Type: application/x-ndjson`
|
||||
*
|
||||
* - `json_single` — one POST **per event**, body is a plain JSON object.
|
||||
* Use only for endpoints that cannot handle batches at all.
|
||||
*
|
||||
* With a body template each event is rendered through the template before
|
||||
* serialisation. Template placeholders:
|
||||
* - `{{event}}` → the LogType string ("request", "action", etc.)
|
||||
* - `{{timestamp}}` → ISO-8601 UTC datetime string
|
||||
* - `{{data}}` → raw inline JSON object (**no surrounding quotes**)
|
||||
@@ -67,9 +85,41 @@ export class HttpLogDestination implements LogDestinationProvider {
|
||||
async send(events: LogEvent[]): Promise<void> {
|
||||
if (events.length === 0) return;
|
||||
|
||||
const headers = this.buildHeaders();
|
||||
const payload = this.buildPayload(events);
|
||||
const body = JSON.stringify(payload);
|
||||
const format = this.config.format ?? DEFAULT_FORMAT;
|
||||
|
||||
if (format === "json_single") {
|
||||
// One HTTP POST per event – send sequentially so a failure on one
|
||||
// event throws and lets the manager retry the whole batch from the
|
||||
// same cursor position.
|
||||
for (const event of events) {
|
||||
await this.postRequest(
|
||||
this.buildSingleBody(event),
|
||||
"application/json"
|
||||
);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (format === "ndjson") {
|
||||
const body = this.buildNdjsonBody(events);
|
||||
await this.postRequest(body, "application/x-ndjson");
|
||||
return;
|
||||
}
|
||||
|
||||
// json_array (default)
|
||||
const body = JSON.stringify(this.buildArrayPayload(events));
|
||||
await this.postRequest(body, "application/json");
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// Internal HTTP sender
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
private async postRequest(
|
||||
body: string,
|
||||
contentType: string
|
||||
): Promise<void> {
|
||||
const headers = this.buildHeaders(contentType);
|
||||
|
||||
const controller = new AbortController();
|
||||
const timeoutHandle = setTimeout(
|
||||
@@ -124,9 +174,9 @@ export class HttpLogDestination implements LogDestinationProvider {
|
||||
// Header construction
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
private buildHeaders(): Record<string, string> {
|
||||
private buildHeaders(contentType: string): Record<string, string> {
|
||||
const headers: Record<string, string> = {
|
||||
"Content-Type": "application/json"
|
||||
"Content-Type": contentType
|
||||
};
|
||||
|
||||
// Authentication
|
||||
@@ -176,24 +226,36 @@ export class HttpLogDestination implements LogDestinationProvider {
|
||||
// Payload construction
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Build the JSON-serialisable value that will be sent as the request body.
|
||||
*
|
||||
* - No template → `Array<{ event, timestamp, data }>`
|
||||
* - With template → `Array<parsed-template-result>`
|
||||
*/
|
||||
private buildPayload(events: LogEvent[]): unknown {
|
||||
/** Single default event object (no surrounding array). */
|
||||
private buildEventObject(event: LogEvent): unknown {
|
||||
if (this.config.useBodyTemplate && this.config.bodyTemplate?.trim()) {
|
||||
return events.map((event) =>
|
||||
this.renderTemplate(this.config.bodyTemplate!, event)
|
||||
);
|
||||
return this.renderTemplate(this.config.bodyTemplate!, event);
|
||||
}
|
||||
|
||||
return events.map((event) => ({
|
||||
return {
|
||||
event: event.logType,
|
||||
timestamp: epochSecondsToIso(event.timestamp),
|
||||
data: event.data
|
||||
}));
|
||||
};
|
||||
}
|
||||
|
||||
/** JSON array payload – used for `json_array` format. */
|
||||
private buildArrayPayload(events: LogEvent[]): unknown[] {
|
||||
return events.map((e) => this.buildEventObject(e));
|
||||
}
|
||||
|
||||
/**
|
||||
* NDJSON payload – one JSON object per line, no outer array.
|
||||
* Each line must be a complete, valid JSON object.
|
||||
*/
|
||||
private buildNdjsonBody(events: LogEvent[]): string {
|
||||
return events
|
||||
.map((e) => JSON.stringify(this.buildEventObject(e)))
|
||||
.join("\n");
|
||||
}
|
||||
|
||||
/** Single-event body – used for `json_single` format. */
|
||||
private buildSingleBody(event: LogEvent): string {
|
||||
return JSON.stringify(this.buildEventObject(event));
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -57,6 +57,18 @@ export interface LogBatch {
|
||||
|
||||
export type AuthType = "none" | "bearer" | "basic" | "custom";
|
||||
|
||||
/**
|
||||
* Controls how the batch of events is serialised into the HTTP request body.
|
||||
*
|
||||
* - `json_array` – `[{…}, {…}]` — default; one POST per batch wrapped in a
|
||||
* JSON array. Works with most generic webhooks and Datadog.
|
||||
* - `ndjson` – `{…}\n{…}` — newline-delimited JSON, one object per
|
||||
* line. Required by Splunk HEC, Elastic/OpenSearch, Loki.
|
||||
* - `json_single` – one HTTP POST per event, body is a plain JSON object.
|
||||
* Use only for endpoints that cannot handle batches at all.
|
||||
*/
|
||||
export type PayloadFormat = "json_array" | "ndjson" | "json_single";
|
||||
|
||||
export interface HttpConfig {
|
||||
/** Human-readable label for the destination */
|
||||
name: string;
|
||||
@@ -75,6 +87,11 @@ export interface HttpConfig {
|
||||
/** Additional static headers appended to every request */
|
||||
headers: Array<{ key: string; value: string }>;
|
||||
/** Whether to render a custom body template instead of the default shape */
|
||||
/**
|
||||
* How events are serialised into the request body.
|
||||
* Defaults to `"json_array"` when absent.
|
||||
*/
|
||||
format?: PayloadFormat;
|
||||
useBodyTemplate: boolean;
|
||||
/**
|
||||
* Handlebars-style template for the JSON body of each event.
|
||||
|
||||
Reference in New Issue
Block a user