mirror of
https://github.com/fosrl/pangolin.git
synced 2026-05-08 17:29:54 +00:00
Compare commits
1 Commits
dependabot
...
s3
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cbdc74768f |
@@ -3079,7 +3079,34 @@
|
||||
"S3DestEditTitle": "Edit Destination",
|
||||
"S3DestAddTitle": "Add S3 Destination",
|
||||
"S3DestEditDescription": "Update the configuration for this S3 event streaming destination.",
|
||||
"S3DestAddDescription": "Configure a new S3 endpoint to receive your organization's events.",
|
||||
"S3DestAddDescription": "Configure a new Amazon S3 (or S3-compatible) bucket to receive your organization's events.",
|
||||
"s3DestTabSettings": "Settings",
|
||||
"s3DestTabFormat": "Format",
|
||||
"s3DestNameLabel": "Name",
|
||||
"s3DestNamePlaceholder": "My S3 destination",
|
||||
"s3DestAccessKeyIdLabel": "AWS Access Key ID",
|
||||
"s3DestSecretAccessKeyLabel": "AWS Secret Access Key",
|
||||
"s3DestSecretAccessKeyPlaceholder": "Your AWS secret access key",
|
||||
"s3DestRegionLabel": "AWS Region",
|
||||
"s3DestBucketLabel": "Bucket Name",
|
||||
"s3DestPrefixLabel": "Key Prefix (optional)",
|
||||
"s3DestPrefixDescription": "Optional path prefix prepended to every object key. Objects are stored at {prefix}/{logType}/{YYYY}/{MM}/{DD}/{filename}.",
|
||||
"s3DestEndpointLabel": "Custom Endpoint (optional)",
|
||||
"s3DestEndpointDescription": "Override the S3 endpoint for S3-compatible storage such as MinIO or Cloudflare R2. Leave blank for standard AWS S3.",
|
||||
"s3DestGzipLabel": "Gzip compression",
|
||||
"s3DestGzipDescription": "Compress each uploaded object with gzip. Reduces storage costs and upload size.",
|
||||
"s3DestFormatTitle": "File Format",
|
||||
"s3DestFormatDescription": "How events are serialised inside each uploaded object.",
|
||||
"s3DestFormatJsonArrayDescription": "Each object is a JSON array of event records. Compatible with most analytics tools.",
|
||||
"s3DestFormatNdjsonDescription": "Each object contains one JSON record per line (newline-delimited JSON). Compatible with Athena, BigQuery, and Spark.",
|
||||
"s3DestFormatCsvTitle": "CSV",
|
||||
"s3DestFormatCsvDescription": "Each object is an RFC-4180 CSV file with a header row. Column names are derived from the event data fields.",
|
||||
"s3DestSaveChanges": "Save Changes",
|
||||
"s3DestCreateDestination": "Create Destination",
|
||||
"s3DestUpdatedSuccess": "Destination updated successfully",
|
||||
"s3DestCreatedSuccess": "Destination created successfully",
|
||||
"s3DestUpdateFailed": "Failed to update destination",
|
||||
"s3DestCreateFailed": "Failed to create destination",
|
||||
"datadogDestEditTitle": "Edit Destination",
|
||||
"datadogDestAddTitle": "Add Datadog Destination",
|
||||
"datadogDestEditDescription": "Update the configuration for this Datadog event streaming destination.",
|
||||
|
||||
@@ -30,10 +30,12 @@ import {
|
||||
LOG_TYPES,
|
||||
LogEvent,
|
||||
DestinationFailureState,
|
||||
HttpConfig
|
||||
HttpConfig,
|
||||
S3Config
|
||||
} from "./types";
|
||||
import { LogDestinationProvider } from "./providers/LogDestinationProvider";
|
||||
import { HttpLogDestination } from "./providers/HttpLogDestination";
|
||||
import { S3LogDestination } from "./providers/S3LogDestination";
|
||||
import type { EventStreamingDestination } from "@server/db";
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -72,11 +74,11 @@ const MAX_CATCHUP_BATCHES = 20;
|
||||
* After the last entry the max value is re-used.
|
||||
*/
|
||||
const BACKOFF_SCHEDULE_MS = [
|
||||
60_000, // 1 min (failure 1)
|
||||
2 * 60_000, // 2 min (failure 2)
|
||||
5 * 60_000, // 5 min (failure 3)
|
||||
10 * 60_000, // 10 min (failure 4)
|
||||
30 * 60_000 // 30 min (failure 5+)
|
||||
60_000, // 1 min (failure 1)
|
||||
2 * 60_000, // 2 min (failure 2)
|
||||
5 * 60_000, // 5 min (failure 3)
|
||||
10 * 60_000, // 10 min (failure 4)
|
||||
30 * 60_000 // 30 min (failure 5+)
|
||||
];
|
||||
|
||||
/**
|
||||
@@ -204,7 +206,10 @@ export class LogStreamingManager {
|
||||
this.pollTimer = null;
|
||||
this.runPoll()
|
||||
.catch((err) =>
|
||||
logger.error("LogStreamingManager: unexpected poll error", err)
|
||||
logger.error(
|
||||
"LogStreamingManager: unexpected poll error",
|
||||
err
|
||||
)
|
||||
)
|
||||
.finally(() => {
|
||||
if (this.isRunning) {
|
||||
@@ -275,10 +280,13 @@ export class LogStreamingManager {
|
||||
}
|
||||
|
||||
// Decrypt and parse config – skip destination if either step fails
|
||||
let configFromDb: HttpConfig;
|
||||
let configFromDb: unknown;
|
||||
try {
|
||||
const decryptedConfig = decrypt(dest.config, config.getRawConfig().server.secret!);
|
||||
configFromDb = JSON.parse(decryptedConfig) as HttpConfig;
|
||||
const decryptedConfig = decrypt(
|
||||
dest.config,
|
||||
config.getRawConfig().server.secret!
|
||||
);
|
||||
configFromDb = JSON.parse(decryptedConfig);
|
||||
} catch (err) {
|
||||
logger.error(
|
||||
`LogStreamingManager: destination ${dest.destinationId} has invalid or undecryptable config`,
|
||||
@@ -362,7 +370,10 @@ export class LogStreamingManager {
|
||||
.from(eventStreamingCursors)
|
||||
.where(
|
||||
and(
|
||||
eq(eventStreamingCursors.destinationId, dest.destinationId),
|
||||
eq(
|
||||
eventStreamingCursors.destinationId,
|
||||
dest.destinationId
|
||||
),
|
||||
eq(eventStreamingCursors.logType, logType)
|
||||
)
|
||||
)
|
||||
@@ -431,9 +442,7 @@ export class LogStreamingManager {
|
||||
|
||||
if (rows.length === 0) break;
|
||||
|
||||
const events = rows.map((row) =>
|
||||
this.rowToLogEvent(logType, row)
|
||||
);
|
||||
const events = rows.map((row) => this.rowToLogEvent(logType, row));
|
||||
|
||||
// Throws on failure – caught by the caller which applies back-off
|
||||
await provider.send(events);
|
||||
@@ -677,8 +686,7 @@ export class LogStreamingManager {
|
||||
break;
|
||||
}
|
||||
|
||||
const orgId =
|
||||
typeof row.orgId === "string" ? row.orgId : "";
|
||||
const orgId = typeof row.orgId === "string" ? row.orgId : "";
|
||||
|
||||
return {
|
||||
id: row.id,
|
||||
@@ -708,6 +716,8 @@ export class LogStreamingManager {
|
||||
switch (type) {
|
||||
case "http":
|
||||
return new HttpLogDestination(config as HttpConfig);
|
||||
case "s3":
|
||||
return new S3LogDestination(config as S3Config);
|
||||
// Future providers:
|
||||
// case "datadog": return new DatadogLogDestination(config as DatadogConfig);
|
||||
default:
|
||||
|
||||
279
server/private/lib/logStreaming/providers/S3LogDestination.ts
Normal file
279
server/private/lib/logStreaming/providers/S3LogDestination.ts
Normal file
@@ -0,0 +1,279 @@
|
||||
/*
|
||||
* This file is part of a proprietary work.
|
||||
*
|
||||
* Copyright (c) 2025-2026 Fossorial, Inc.
|
||||
* All rights reserved.
|
||||
*
|
||||
* This file is licensed under the Fossorial Commercial License.
|
||||
* You may not use this file except in compliance with the License.
|
||||
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
|
||||
*
|
||||
* This file is not licensed under the AGPLv3.
|
||||
*/
|
||||
|
||||
import { S3Client, PutObjectCommand } from "@aws-sdk/client-s3";
|
||||
import { gzip as gzipCallback } from "zlib";
|
||||
import { promisify } from "util";
|
||||
import { randomUUID } from "crypto";
|
||||
import logger from "@server/logger";
|
||||
import { LogEvent, S3Config, S3PayloadFormat } from "../types";
|
||||
import { LogDestinationProvider } from "./LogDestinationProvider";
|
||||
|
||||
const gzipAsync = promisify(gzipCallback);
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Constants
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/** Maximum time (ms) to wait for a single S3 PutObject response. */
|
||||
const REQUEST_TIMEOUT_MS = 60_000;
|
||||
|
||||
/** Default payload format when none is specified in the config. */
|
||||
const DEFAULT_FORMAT: S3PayloadFormat = "json_array";
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// S3LogDestination
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Forwards a batch of log events to an S3-compatible object store by
|
||||
* uploading a single object per `send()` call.
|
||||
*
|
||||
* **Object key layout**
|
||||
* ```
|
||||
* {prefix}/{logType}/{YYYY}/{MM}/{DD}/{HH}-{mm}-{ss}-{uuid}.{ext}[.gz]
|
||||
* ```
|
||||
* - `prefix` – from `config.prefix` (default: empty – key starts at logType)
|
||||
* - `logType` – one of "request", "action", "access", "connection"
|
||||
* - Date components are derived from the upload time (UTC)
|
||||
* - `ext` – `json` | `ndjson` | `csv`
|
||||
* - `.gz` – appended when `config.gzip` is true
|
||||
*
|
||||
* **Payload formats** (controlled by `config.format`):
|
||||
* - `json_array` (default) – body is a JSON array of event objects.
|
||||
* - `ndjson` – one JSON object per line (newline-delimited).
|
||||
* - `csv` – RFC-4180 CSV with a header row; columns are the
|
||||
* union of all field names in the batch's event data.
|
||||
*
|
||||
* **Compression**: when `config.gzip` is `true` the body is gzip-compressed
|
||||
* before upload and `Content-Encoding: gzip` is set on the object.
|
||||
*
|
||||
* **Custom endpoint**: set `config.endpoint` to target any S3-compatible
|
||||
* storage service (e.g. MinIO, Cloudflare R2).
|
||||
*/
|
||||
export class S3LogDestination implements LogDestinationProvider {
|
||||
readonly type = "s3";
|
||||
|
||||
private readonly config: S3Config;
|
||||
|
||||
constructor(config: S3Config) {
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// LogDestinationProvider implementation
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
async send(events: LogEvent[]): Promise<void> {
|
||||
if (events.length === 0) return;
|
||||
|
||||
const format = this.config.format ?? DEFAULT_FORMAT;
|
||||
const useGzip = this.config.gzip ?? false;
|
||||
const logType = events[0].logType;
|
||||
|
||||
const rawBody = this.serialize(events, format);
|
||||
const bodyBuffer = Buffer.from(rawBody, "utf-8");
|
||||
|
||||
let uploadBody: Buffer;
|
||||
let contentEncoding: string | undefined;
|
||||
|
||||
if (useGzip) {
|
||||
uploadBody = (await gzipAsync(bodyBuffer)) as Buffer;
|
||||
contentEncoding = "gzip";
|
||||
} else {
|
||||
uploadBody = bodyBuffer;
|
||||
}
|
||||
|
||||
const key = this.buildObjectKey(logType, format, useGzip);
|
||||
const contentType = this.contentType(format);
|
||||
|
||||
const clientConfig: ConstructorParameters<typeof S3Client>[0] = {
|
||||
region: this.config.region,
|
||||
credentials: {
|
||||
accessKeyId: this.config.accessKeyId,
|
||||
secretAccessKey: this.config.secretAccessKey
|
||||
},
|
||||
requestHandler: {
|
||||
requestTimeout: REQUEST_TIMEOUT_MS
|
||||
}
|
||||
};
|
||||
|
||||
if (this.config.endpoint?.trim()) {
|
||||
clientConfig.endpoint = this.config.endpoint.trim();
|
||||
}
|
||||
|
||||
const client = new S3Client(clientConfig);
|
||||
|
||||
try {
|
||||
await client.send(
|
||||
new PutObjectCommand({
|
||||
Bucket: this.config.bucket,
|
||||
Key: key,
|
||||
Body: uploadBody,
|
||||
ContentType: contentType,
|
||||
...(contentEncoding
|
||||
? { ContentEncoding: contentEncoding }
|
||||
: {})
|
||||
})
|
||||
);
|
||||
} catch (err: unknown) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
throw new Error(
|
||||
`S3LogDestination: failed to upload object "${key}" ` +
|
||||
`to bucket "${this.config.bucket}" – ${msg}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// Internal helpers
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Construct a unique S3 object key for the given log type and format.
|
||||
* Keys are partitioned by logType and date so they can be queried or
|
||||
* lifecycle-managed independently.
|
||||
*/
|
||||
private buildObjectKey(
|
||||
logType: string,
|
||||
format: S3PayloadFormat,
|
||||
gzip: boolean
|
||||
): string {
|
||||
const now = new Date();
|
||||
const year = now.getUTCFullYear();
|
||||
const month = String(now.getUTCMonth() + 1).padStart(2, "0");
|
||||
const day = String(now.getUTCDate()).padStart(2, "0");
|
||||
const hh = String(now.getUTCHours()).padStart(2, "0");
|
||||
const mm = String(now.getUTCMinutes()).padStart(2, "0");
|
||||
const ss = String(now.getUTCSeconds()).padStart(2, "0");
|
||||
const uid = randomUUID();
|
||||
|
||||
const ext =
|
||||
format === "csv" ? "csv" : format === "ndjson" ? "ndjson" : "json";
|
||||
const fileName = `${hh}-${mm}-${ss}-${uid}.${ext}${gzip ? ".gz" : ""}`;
|
||||
|
||||
const rawPrefix = (this.config.prefix ?? "").trim().replace(/\/+$/, "");
|
||||
const parts = [
|
||||
rawPrefix,
|
||||
logType,
|
||||
`${year}/${month}/${day}`,
|
||||
fileName
|
||||
].filter((p) => p !== "");
|
||||
|
||||
return parts.join("/");
|
||||
}
|
||||
|
||||
private contentType(format: S3PayloadFormat): string {
|
||||
switch (format) {
|
||||
case "csv":
|
||||
return "text/csv; charset=utf-8";
|
||||
case "ndjson":
|
||||
return "application/x-ndjson";
|
||||
default:
|
||||
return "application/json";
|
||||
}
|
||||
}
|
||||
|
||||
private serialize(events: LogEvent[], format: S3PayloadFormat): string {
|
||||
switch (format) {
|
||||
case "json_array":
|
||||
return JSON.stringify(events.map(toPayload));
|
||||
case "ndjson":
|
||||
return events
|
||||
.map((e) => JSON.stringify(toPayload(e)))
|
||||
.join("\n");
|
||||
case "csv":
|
||||
return toCsv(events);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Payload helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function toPayload(event: LogEvent): unknown {
|
||||
return {
|
||||
event: event.logType,
|
||||
timestamp: new Date(event.timestamp * 1000).toISOString(),
|
||||
data: event.data
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a batch of events to RFC-4180 CSV.
|
||||
*
|
||||
* The column set is the union of `event`, `timestamp`, and all keys present in
|
||||
* `event.data` across the batch, preserving insertion order. Values that
|
||||
* contain commas, double-quotes, or newlines are quoted and escaped.
|
||||
*/
|
||||
function toCsv(events: LogEvent[]): string {
|
||||
if (events.length === 0) return "";
|
||||
|
||||
// Collect all unique data keys in stable order
|
||||
const keySet = new LinkedSet<string>();
|
||||
keySet.add("event");
|
||||
keySet.add("timestamp");
|
||||
for (const e of events) {
|
||||
for (const k of Object.keys(e.data)) {
|
||||
keySet.add(k);
|
||||
}
|
||||
}
|
||||
const headers = keySet.toArray();
|
||||
|
||||
const rows: string[] = [headers.map(csvEscape).join(",")];
|
||||
|
||||
for (const e of events) {
|
||||
const flat: Record<string, unknown> = {
|
||||
event: e.logType,
|
||||
timestamp: new Date(e.timestamp * 1000).toISOString(),
|
||||
...e.data
|
||||
};
|
||||
rows.push(
|
||||
headers.map((h) => csvEscape(flattenValue(flat[h]))).join(",")
|
||||
);
|
||||
}
|
||||
|
||||
return rows.join("\n");
|
||||
}
|
||||
|
||||
/** Flatten a value to a plain string suitable for a CSV cell. */
|
||||
function flattenValue(value: unknown): string {
|
||||
if (value === null || value === undefined) return "";
|
||||
if (typeof value === "object") return JSON.stringify(value);
|
||||
return String(value);
|
||||
}
|
||||
|
||||
/** RFC-4180 CSV escaping. */
|
||||
function csvEscape(value: string): string {
|
||||
if (/[",\n\r]/.test(value)) {
|
||||
return `"${value.replace(/"/g, '""')}"`;
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Minimal ordered set (preserves insertion order, deduplicates)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
class LinkedSet<T> {
|
||||
private readonly map = new Map<T, true>();
|
||||
|
||||
add(value: T): void {
|
||||
this.map.set(value, true);
|
||||
}
|
||||
|
||||
toArray(): T[] {
|
||||
return Array.from(this.map.keys());
|
||||
}
|
||||
}
|
||||
@@ -107,6 +107,40 @@ export interface HttpConfig {
|
||||
bodyTemplate?: string;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// S3 destination configuration
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Controls how the batch of events is serialised into each S3 object.
|
||||
*
|
||||
* - `json_array` – `[{…}, {…}]` – default; each object is a JSON array.
|
||||
* - `ndjson` – `{…}\n{…}` – newline-delimited JSON, one object per line.
|
||||
* - `csv` – RFC-4180 CSV with a header row derived from the event fields.
|
||||
*/
|
||||
export type S3PayloadFormat = "json_array" | "ndjson" | "csv";
|
||||
|
||||
export interface S3Config {
|
||||
/** Human-readable label for the destination */
|
||||
name: string;
|
||||
/** AWS Access Key ID */
|
||||
accessKeyId: string;
|
||||
/** AWS Secret Access Key */
|
||||
secretAccessKey: string;
|
||||
/** AWS region (e.g. "us-east-1") */
|
||||
region: string;
|
||||
/** Target S3 bucket name */
|
||||
bucket: string;
|
||||
/** Optional key prefix – appended before the auto-generated path */
|
||||
prefix?: string;
|
||||
/** Override the S3 endpoint for S3-compatible storage (e.g. MinIO, R2) */
|
||||
endpoint?: string;
|
||||
/** How events are serialised into each object. Defaults to "json_array". */
|
||||
format: S3PayloadFormat;
|
||||
/** Whether to gzip-compress the object before upload. */
|
||||
gzip: boolean;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Per-destination per-log-type cursor (reflects the DB table)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"use client";
|
||||
|
||||
import { useState, useEffect, useCallback } from "react";
|
||||
import React, { useState, useEffect, useCallback } from "react";
|
||||
import { useParams } from "next/navigation";
|
||||
import { createApiClient, formatAxiosError } from "@app/lib/api";
|
||||
import { useEnvContext } from "@app/hooks/useEnvContext";
|
||||
@@ -38,7 +38,10 @@ import {
|
||||
HttpDestinationCredenza,
|
||||
parseHttpConfig
|
||||
} from "@app/components/HttpDestinationCredenza";
|
||||
import { S3DestinationCredenza } from "@app/components/S3DestinationCredenza";
|
||||
import {
|
||||
S3DestinationCredenza,
|
||||
parseS3Config
|
||||
} from "@app/components/S3DestinationCredenza";
|
||||
import { DatadogDestinationCredenza } from "@app/components/DatadogDestinationCredenza";
|
||||
import { useTranslations } from "next-intl";
|
||||
|
||||
@@ -64,6 +67,42 @@ interface DestinationCardProps {
|
||||
disabled?: boolean;
|
||||
}
|
||||
|
||||
function getDestinationDisplay(destination: Destination): {
|
||||
name: string;
|
||||
typeLabel: string;
|
||||
detail: string;
|
||||
icon: React.ReactNode;
|
||||
} {
|
||||
if (destination.type === "s3") {
|
||||
const cfg = parseS3Config(destination.config);
|
||||
const detail = cfg.bucket
|
||||
? `s3://${cfg.bucket}${cfg.prefix ? `/${cfg.prefix.replace(/^\/+/, "")}` : ""}`
|
||||
: "";
|
||||
return {
|
||||
name: cfg.name,
|
||||
typeLabel: "Amazon S3",
|
||||
detail,
|
||||
icon: (
|
||||
<Image
|
||||
src="/third-party/s3.png"
|
||||
alt="Amazon S3"
|
||||
width={16}
|
||||
height={16}
|
||||
className="rounded-sm"
|
||||
/>
|
||||
)
|
||||
};
|
||||
}
|
||||
// Default: HTTP
|
||||
const cfg = parseHttpConfig(destination.config);
|
||||
return {
|
||||
name: cfg.name,
|
||||
typeLabel: "HTTP",
|
||||
detail: cfg.url,
|
||||
icon: <Globe className="h-3.5 w-3.5 text-black" />
|
||||
};
|
||||
}
|
||||
|
||||
function DestinationCard({
|
||||
destination,
|
||||
onToggle,
|
||||
@@ -73,25 +112,25 @@ function DestinationCard({
|
||||
disabled = false
|
||||
}: DestinationCardProps) {
|
||||
const t = useTranslations();
|
||||
const cfg = parseHttpConfig(destination.config);
|
||||
const { name, typeLabel, detail, icon } =
|
||||
getDestinationDisplay(destination);
|
||||
|
||||
return (
|
||||
<div className="relative flex flex-col rounded-lg border bg-card text-card-foreground p-5 gap-3">
|
||||
{/* Top row: icon + name/type + toggle */}
|
||||
<div className="flex items-start justify-between gap-3">
|
||||
<div className="flex items-center gap-3 min-w-0">
|
||||
{/* Squirkle icon: gray outer → white inner → black globe */}
|
||||
<div className="shrink-0 flex items-center justify-center w-10 h-10 rounded-2xl bg-muted">
|
||||
<div className="flex items-center justify-center w-6 h-6 rounded-xl bg-white shadow-sm">
|
||||
<Globe className="h-3.5 w-3.5 text-black" />
|
||||
{icon}
|
||||
</div>
|
||||
</div>
|
||||
<div className="min-w-0">
|
||||
<p className="font-semibold text-sm leading-tight truncate">
|
||||
{cfg.name || t("streamingUnnamedDestination")}
|
||||
{name || t("streamingUnnamedDestination")}
|
||||
</p>
|
||||
<p className="text-xs text-muted-foreground truncate mt-0.5">
|
||||
HTTP
|
||||
{typeLabel}
|
||||
</p>
|
||||
</div>
|
||||
</div>
|
||||
@@ -105,9 +144,9 @@ function DestinationCard({
|
||||
/>
|
||||
</div>
|
||||
|
||||
{/* URL preview */}
|
||||
{/* Detail preview (URL for HTTP, s3:// path for S3) */}
|
||||
<p className="text-xs text-muted-foreground truncate">
|
||||
{cfg.url || (
|
||||
{detail || (
|
||||
<span className="italic">
|
||||
{t("streamingNoUrlConfigured")}
|
||||
</span>
|
||||
@@ -485,7 +524,7 @@ export default function StreamingDestinationsPage() {
|
||||
if (!v) setDeleteTarget(null);
|
||||
}}
|
||||
string={
|
||||
parseHttpConfig(deleteTarget.config).name ||
|
||||
getDestinationDisplay(deleteTarget).name ||
|
||||
t("streamingDeleteDialogThisDestination")
|
||||
}
|
||||
title={t("streamingDeleteTitle")}
|
||||
@@ -493,7 +532,7 @@ export default function StreamingDestinationsPage() {
|
||||
<p>
|
||||
{t("streamingDeleteDialogAreYouSure")}{" "}
|
||||
<span>
|
||||
{parseHttpConfig(deleteTarget.config).name ||
|
||||
{getDestinationDisplay(deleteTarget).name ||
|
||||
t("streamingDeleteDialogThisDestination")}
|
||||
</span>
|
||||
{t("streamingDeleteDialogPermanentlyRemoved")}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"use client";
|
||||
|
||||
|
||||
import { useState, useEffect } from "react";
|
||||
import {
|
||||
Credenza,
|
||||
CredenzaBody,
|
||||
@@ -12,13 +12,62 @@ import {
|
||||
CredenzaTitle
|
||||
} from "@app/components/Credenza";
|
||||
import { Button } from "@app/components/ui/button";
|
||||
import { ContactSalesBanner } from "@app/components/ContactSalesBanner";
|
||||
import { Input } from "@app/components/ui/input";
|
||||
import { Label } from "@app/components/ui/label";
|
||||
import { Switch } from "@app/components/ui/switch";
|
||||
import { HorizontalTabs } from "@app/components/HorizontalTabs";
|
||||
import { RadioGroup, RadioGroupItem } from "@app/components/ui/radio-group";
|
||||
import { Checkbox } from "@app/components/ui/checkbox";
|
||||
import { createApiClient, formatAxiosError } from "@app/lib/api";
|
||||
import { useEnvContext } from "@app/hooks/useEnvContext";
|
||||
import { toast } from "@app/hooks/useToast";
|
||||
import { useTranslations } from "next-intl";
|
||||
import { Destination } from "@app/components/HttpDestinationCredenza";
|
||||
|
||||
// ── Types ──────────────────────────────────────────────────────────────────────
|
||||
|
||||
export type S3PayloadFormat = "json_array" | "ndjson" | "csv";
|
||||
|
||||
export interface S3Config {
|
||||
name: string;
|
||||
accessKeyId: string;
|
||||
secretAccessKey: string;
|
||||
region: string;
|
||||
bucket: string;
|
||||
prefix: string;
|
||||
endpoint: string;
|
||||
format: S3PayloadFormat;
|
||||
gzip: boolean;
|
||||
}
|
||||
|
||||
// ── Helpers ────────────────────────────────────────────────────────────────────
|
||||
|
||||
export const defaultS3Config = (): S3Config => ({
|
||||
name: "",
|
||||
accessKeyId: "",
|
||||
secretAccessKey: "",
|
||||
region: "us-east-1",
|
||||
bucket: "",
|
||||
prefix: "",
|
||||
endpoint: "",
|
||||
format: "json_array",
|
||||
gzip: false
|
||||
});
|
||||
|
||||
export function parseS3Config(raw: string): S3Config {
|
||||
try {
|
||||
return { ...defaultS3Config(), ...JSON.parse(raw) };
|
||||
} catch {
|
||||
return defaultS3Config();
|
||||
}
|
||||
}
|
||||
|
||||
// ── Component ──────────────────────────────────────────────────────────────────
|
||||
|
||||
export interface S3DestinationCredenzaProps {
|
||||
open: boolean;
|
||||
onOpenChange: (open: boolean) => void;
|
||||
editing: any;
|
||||
editing: Destination | null;
|
||||
orgId: string;
|
||||
onSaved: () => void;
|
||||
}
|
||||
@@ -28,18 +77,84 @@ export function S3DestinationCredenza({
|
||||
onOpenChange,
|
||||
editing,
|
||||
orgId,
|
||||
onSaved,
|
||||
onSaved
|
||||
}: S3DestinationCredenzaProps) {
|
||||
const api = createApiClient(useEnvContext());
|
||||
const t = useTranslations();
|
||||
|
||||
const [saving, setSaving] = useState(false);
|
||||
const [cfg, setCfg] = useState<S3Config>(defaultS3Config());
|
||||
const [sendAccessLogs, setSendAccessLogs] = useState(false);
|
||||
const [sendActionLogs, setSendActionLogs] = useState(false);
|
||||
const [sendConnectionLogs, setSendConnectionLogs] = useState(false);
|
||||
const [sendRequestLogs, setSendRequestLogs] = useState(false);
|
||||
|
||||
useEffect(() => {
|
||||
if (open) {
|
||||
setCfg(editing ? parseS3Config(editing.config) : defaultS3Config());
|
||||
setSendAccessLogs(editing?.sendAccessLogs ?? false);
|
||||
setSendActionLogs(editing?.sendActionLogs ?? false);
|
||||
setSendConnectionLogs(editing?.sendConnectionLogs ?? false);
|
||||
setSendRequestLogs(editing?.sendRequestLogs ?? false);
|
||||
}
|
||||
}, [open, editing]);
|
||||
|
||||
const update = (patch: Partial<S3Config>) =>
|
||||
setCfg((prev) => ({ ...prev, ...patch }));
|
||||
|
||||
const isValid =
|
||||
cfg.name.trim() !== "" &&
|
||||
cfg.accessKeyId.trim() !== "" &&
|
||||
cfg.secretAccessKey.trim() !== "" &&
|
||||
cfg.region.trim() !== "" &&
|
||||
cfg.bucket.trim() !== "";
|
||||
|
||||
async function handleSave() {
|
||||
if (!isValid) return;
|
||||
setSaving(true);
|
||||
try {
|
||||
const payload = {
|
||||
type: "s3",
|
||||
config: JSON.stringify(cfg),
|
||||
sendAccessLogs,
|
||||
sendActionLogs,
|
||||
sendConnectionLogs,
|
||||
sendRequestLogs
|
||||
};
|
||||
if (editing) {
|
||||
await api.post(
|
||||
`/org/${orgId}/event-streaming-destination/${editing.destinationId}`,
|
||||
payload
|
||||
);
|
||||
toast({ title: t("s3DestUpdatedSuccess") });
|
||||
} else {
|
||||
await api.put(
|
||||
`/org/${orgId}/event-streaming-destination`,
|
||||
payload
|
||||
);
|
||||
toast({ title: t("s3DestCreatedSuccess") });
|
||||
}
|
||||
onSaved();
|
||||
onOpenChange(false);
|
||||
} catch (e) {
|
||||
toast({
|
||||
variant: "destructive",
|
||||
title: editing
|
||||
? t("s3DestUpdateFailed")
|
||||
: t("s3DestCreateFailed"),
|
||||
description: formatAxiosError(e, t("streamingUnexpectedError"))
|
||||
});
|
||||
} finally {
|
||||
setSaving(false);
|
||||
}
|
||||
}
|
||||
|
||||
return (
|
||||
<Credenza open={open} onOpenChange={onOpenChange}>
|
||||
<CredenzaContent className="sm:max-w-2xl">
|
||||
<CredenzaHeader>
|
||||
<CredenzaTitle>
|
||||
{editing
|
||||
? t("S3DestEditTitle")
|
||||
: t("S3DestAddTitle")}
|
||||
{editing ? t("S3DestEditTitle") : t("S3DestAddTitle")}
|
||||
</CredenzaTitle>
|
||||
<CredenzaDescription>
|
||||
{editing
|
||||
@@ -49,13 +164,367 @@ export function S3DestinationCredenza({
|
||||
</CredenzaHeader>
|
||||
|
||||
<CredenzaBody>
|
||||
<ContactSalesBanner />
|
||||
<HorizontalTabs
|
||||
clientSide
|
||||
items={[
|
||||
{ title: t("s3DestTabSettings"), href: "" },
|
||||
{ title: t("s3DestTabFormat"), href: "" },
|
||||
{ title: t("httpDestTabLogs"), href: "" }
|
||||
]}
|
||||
>
|
||||
{/* ── Settings tab ────────────────────────────── */}
|
||||
<div className="space-y-6 mt-4 p-1">
|
||||
{/* Name */}
|
||||
<div className="space-y-2">
|
||||
<Label htmlFor="s3-name">
|
||||
{t("s3DestNameLabel")}
|
||||
</Label>
|
||||
<Input
|
||||
id="s3-name"
|
||||
placeholder={t("s3DestNamePlaceholder")}
|
||||
value={cfg.name}
|
||||
onChange={(e) =>
|
||||
update({ name: e.target.value })
|
||||
}
|
||||
/>
|
||||
</div>
|
||||
|
||||
{/* AWS Access Key ID */}
|
||||
<div className="space-y-2">
|
||||
<Label htmlFor="s3-access-key-id">
|
||||
{t("s3DestAccessKeyIdLabel")}
|
||||
</Label>
|
||||
<Input
|
||||
id="s3-access-key-id"
|
||||
placeholder="AKIAIOSFODNN7EXAMPLE"
|
||||
value={cfg.accessKeyId}
|
||||
onChange={(e) =>
|
||||
update({
|
||||
accessKeyId: e.target.value
|
||||
})
|
||||
}
|
||||
autoComplete="off"
|
||||
/>
|
||||
</div>
|
||||
|
||||
{/* AWS Secret Access Key */}
|
||||
<div className="space-y-2">
|
||||
<Label htmlFor="s3-secret-key">
|
||||
{t("s3DestSecretAccessKeyLabel")}
|
||||
</Label>
|
||||
<Input
|
||||
id="s3-secret-key"
|
||||
type="password"
|
||||
placeholder={t(
|
||||
"s3DestSecretAccessKeyPlaceholder"
|
||||
)}
|
||||
value={cfg.secretAccessKey}
|
||||
onChange={(e) =>
|
||||
update({
|
||||
secretAccessKey: e.target.value
|
||||
})
|
||||
}
|
||||
autoComplete="new-password"
|
||||
/>
|
||||
</div>
|
||||
|
||||
{/* Region */}
|
||||
<div className="space-y-2">
|
||||
<Label htmlFor="s3-region">
|
||||
{t("s3DestRegionLabel")}
|
||||
</Label>
|
||||
<Input
|
||||
id="s3-region"
|
||||
placeholder="us-east-1"
|
||||
value={cfg.region}
|
||||
onChange={(e) =>
|
||||
update({ region: e.target.value })
|
||||
}
|
||||
/>
|
||||
</div>
|
||||
|
||||
{/* Bucket */}
|
||||
<div className="space-y-2">
|
||||
<Label htmlFor="s3-bucket">
|
||||
{t("s3DestBucketLabel")}
|
||||
</Label>
|
||||
<Input
|
||||
id="s3-bucket"
|
||||
placeholder="my-logs-bucket"
|
||||
value={cfg.bucket}
|
||||
onChange={(e) =>
|
||||
update({ bucket: e.target.value })
|
||||
}
|
||||
/>
|
||||
</div>
|
||||
|
||||
{/* Prefix */}
|
||||
<div className="space-y-2">
|
||||
<Label htmlFor="s3-prefix">
|
||||
{t("s3DestPrefixLabel")}
|
||||
</Label>
|
||||
<Input
|
||||
id="s3-prefix"
|
||||
placeholder="pangolin/logs"
|
||||
value={cfg.prefix}
|
||||
onChange={(e) =>
|
||||
update({ prefix: e.target.value })
|
||||
}
|
||||
/>
|
||||
<p className="text-xs text-muted-foreground">
|
||||
{t("s3DestPrefixDescription")}
|
||||
</p>
|
||||
</div>
|
||||
|
||||
{/* Custom endpoint (optional – for S3-compatible storage) */}
|
||||
<div className="space-y-2">
|
||||
<Label htmlFor="s3-endpoint">
|
||||
{t("s3DestEndpointLabel")}
|
||||
</Label>
|
||||
<Input
|
||||
id="s3-endpoint"
|
||||
placeholder="https://s3.example.com"
|
||||
value={cfg.endpoint}
|
||||
onChange={(e) =>
|
||||
update({ endpoint: e.target.value })
|
||||
}
|
||||
/>
|
||||
<p className="text-xs text-muted-foreground">
|
||||
{t("s3DestEndpointDescription")}
|
||||
</p>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
{/* ── Format tab ───────────────────────────────── */}
|
||||
<div className="space-y-6 mt-4 p-1">
|
||||
{/* Gzip compression toggle */}
|
||||
<div className="flex items-start gap-3 rounded-md border p-3">
|
||||
<Switch
|
||||
id="s3-gzip"
|
||||
checked={cfg.gzip}
|
||||
onCheckedChange={(v) => update({ gzip: v })}
|
||||
className="mt-0.5"
|
||||
/>
|
||||
<div>
|
||||
<Label
|
||||
htmlFor="s3-gzip"
|
||||
className="cursor-pointer font-medium"
|
||||
>
|
||||
{t("s3DestGzipLabel")}
|
||||
</Label>
|
||||
<p className="text-xs text-muted-foreground mt-0.5">
|
||||
{t("s3DestGzipDescription")}
|
||||
</p>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
{/* Payload format selector */}
|
||||
<div className="space-y-3">
|
||||
<div>
|
||||
<label className="font-medium block">
|
||||
{t("s3DestFormatTitle")}
|
||||
</label>
|
||||
<p className="text-sm text-muted-foreground mt-0.5">
|
||||
{t("s3DestFormatDescription")}
|
||||
</p>
|
||||
</div>
|
||||
|
||||
<RadioGroup
|
||||
value={cfg.format}
|
||||
onValueChange={(v) =>
|
||||
update({
|
||||
format: v as S3PayloadFormat
|
||||
})
|
||||
}
|
||||
className="gap-2"
|
||||
>
|
||||
{/* JSON Array */}
|
||||
<label className="flex items-start gap-3 rounded-md border p-3 cursor-pointer has-[:checked]:border-primary has-[:checked]:bg-primary/5">
|
||||
<RadioGroupItem
|
||||
value="json_array"
|
||||
className="mt-0.5"
|
||||
/>
|
||||
<div>
|
||||
<p className="text-sm font-medium leading-none">
|
||||
{t(
|
||||
"httpDestFormatJsonArrayTitle"
|
||||
)}
|
||||
</p>
|
||||
<p className="text-xs text-muted-foreground mt-1">
|
||||
{t(
|
||||
"s3DestFormatJsonArrayDescription"
|
||||
)}
|
||||
</p>
|
||||
</div>
|
||||
</label>
|
||||
|
||||
{/* NDJSON */}
|
||||
<label className="flex items-start gap-3 rounded-md border p-3 cursor-pointer has-[:checked]:border-primary has-[:checked]:bg-primary/5">
|
||||
<RadioGroupItem
|
||||
value="ndjson"
|
||||
className="mt-0.5"
|
||||
/>
|
||||
<div>
|
||||
<p className="text-sm font-medium leading-none">
|
||||
{t("httpDestFormatNdjsonTitle")}
|
||||
</p>
|
||||
<p className="text-xs text-muted-foreground mt-1">
|
||||
{t(
|
||||
"s3DestFormatNdjsonDescription"
|
||||
)}
|
||||
</p>
|
||||
</div>
|
||||
</label>
|
||||
|
||||
{/* CSV */}
|
||||
<label className="flex items-start gap-3 rounded-md border p-3 cursor-pointer has-[:checked]:border-primary has-[:checked]:bg-primary/5">
|
||||
<RadioGroupItem
|
||||
value="csv"
|
||||
className="mt-0.5"
|
||||
/>
|
||||
<div>
|
||||
<p className="text-sm font-medium leading-none">
|
||||
{t("s3DestFormatCsvTitle")}
|
||||
</p>
|
||||
<p className="text-xs text-muted-foreground mt-1">
|
||||
{t(
|
||||
"s3DestFormatCsvDescription"
|
||||
)}
|
||||
</p>
|
||||
</div>
|
||||
</label>
|
||||
</RadioGroup>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
{/* ── Logs tab ──────────────────────────────────── */}
|
||||
<div className="space-y-6 mt-4 p-1">
|
||||
<div>
|
||||
<label className="font-medium block">
|
||||
{t("httpDestLogTypesTitle")}
|
||||
</label>
|
||||
<p className="text-sm text-muted-foreground mt-0.5">
|
||||
{t("httpDestLogTypesDescription")}
|
||||
</p>
|
||||
</div>
|
||||
|
||||
<div className="space-y-3">
|
||||
<div className="flex items-start gap-3 rounded-md border p-3">
|
||||
<Checkbox
|
||||
id="s3-log-access"
|
||||
checked={sendAccessLogs}
|
||||
onCheckedChange={(v) =>
|
||||
setSendAccessLogs(v === true)
|
||||
}
|
||||
className="mt-0.5"
|
||||
/>
|
||||
<div>
|
||||
<Label
|
||||
htmlFor="s3-log-access"
|
||||
className="cursor-pointer font-medium"
|
||||
>
|
||||
{t("httpDestAccessLogsTitle")}
|
||||
</Label>
|
||||
<p className="text-xs text-muted-foreground mt-0.5">
|
||||
{t("httpDestAccessLogsDescription")}
|
||||
</p>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div className="flex items-start gap-3 rounded-md border p-3">
|
||||
<Checkbox
|
||||
id="s3-log-action"
|
||||
checked={sendActionLogs}
|
||||
onCheckedChange={(v) =>
|
||||
setSendActionLogs(v === true)
|
||||
}
|
||||
className="mt-0.5"
|
||||
/>
|
||||
<div>
|
||||
<Label
|
||||
htmlFor="s3-log-action"
|
||||
className="cursor-pointer font-medium"
|
||||
>
|
||||
{t("httpDestActionLogsTitle")}
|
||||
</Label>
|
||||
<p className="text-xs text-muted-foreground mt-0.5">
|
||||
{t("httpDestActionLogsDescription")}
|
||||
</p>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div className="flex items-start gap-3 rounded-md border p-3">
|
||||
<Checkbox
|
||||
id="s3-log-connection"
|
||||
checked={sendConnectionLogs}
|
||||
onCheckedChange={(v) =>
|
||||
setSendConnectionLogs(v === true)
|
||||
}
|
||||
className="mt-0.5"
|
||||
/>
|
||||
<div>
|
||||
<Label
|
||||
htmlFor="s3-log-connection"
|
||||
className="cursor-pointer font-medium"
|
||||
>
|
||||
{t("httpDestConnectionLogsTitle")}
|
||||
</Label>
|
||||
<p className="text-xs text-muted-foreground mt-0.5">
|
||||
{t(
|
||||
"httpDestConnectionLogsDescription"
|
||||
)}
|
||||
</p>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div className="flex items-start gap-3 rounded-md border p-3">
|
||||
<Checkbox
|
||||
id="s3-log-request"
|
||||
checked={sendRequestLogs}
|
||||
onCheckedChange={(v) =>
|
||||
setSendRequestLogs(v === true)
|
||||
}
|
||||
className="mt-0.5"
|
||||
/>
|
||||
<div>
|
||||
<Label
|
||||
htmlFor="s3-log-request"
|
||||
className="cursor-pointer font-medium"
|
||||
>
|
||||
{t("httpDestRequestLogsTitle")}
|
||||
</Label>
|
||||
<p className="text-xs text-muted-foreground mt-0.5">
|
||||
{t(
|
||||
"httpDestRequestLogsDescription"
|
||||
)}
|
||||
</p>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</HorizontalTabs>
|
||||
</CredenzaBody>
|
||||
|
||||
<CredenzaFooter>
|
||||
<CredenzaClose asChild>
|
||||
<Button variant="outline">{t("cancel")}</Button>
|
||||
<Button
|
||||
type="button"
|
||||
variant="outline"
|
||||
disabled={saving}
|
||||
>
|
||||
{t("cancel")}
|
||||
</Button>
|
||||
</CredenzaClose>
|
||||
<Button
|
||||
type="button"
|
||||
onClick={handleSave}
|
||||
loading={saving}
|
||||
disabled={!isValid || saving}
|
||||
>
|
||||
{editing
|
||||
? t("s3DestSaveChanges")
|
||||
: t("s3DestCreateDestination")}
|
||||
</Button>
|
||||
</CredenzaFooter>
|
||||
</CredenzaContent>
|
||||
</Credenza>
|
||||
|
||||
Reference in New Issue
Block a user