Compare commits

..

1 Commits

Author SHA1 Message Date
dependabot[bot]
034ff5a396 Bump node from 24-alpine to 26-alpine
Bumps node from 24-alpine to 26-alpine.

---
updated-dependencies:
- dependency-name: node
  dependency-version: 26-alpine
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-05-07 01:34:47 +00:00
33 changed files with 337 additions and 1374 deletions

View File

@@ -1,4 +1,4 @@
FROM node:24-alpine FROM node:26-alpine
WORKDIR /app WORKDIR /app

View File

@@ -3079,34 +3079,7 @@
"S3DestEditTitle": "Edit Destination", "S3DestEditTitle": "Edit Destination",
"S3DestAddTitle": "Add S3 Destination", "S3DestAddTitle": "Add S3 Destination",
"S3DestEditDescription": "Update the configuration for this S3 event streaming destination.", "S3DestEditDescription": "Update the configuration for this S3 event streaming destination.",
"S3DestAddDescription": "Configure a new Amazon S3 (or S3-compatible) bucket to receive your organization's events.", "S3DestAddDescription": "Configure a new S3 endpoint to receive your organization's events.",
"s3DestTabSettings": "Settings",
"s3DestTabFormat": "Format",
"s3DestNameLabel": "Name",
"s3DestNamePlaceholder": "My S3 destination",
"s3DestAccessKeyIdLabel": "AWS Access Key ID",
"s3DestSecretAccessKeyLabel": "AWS Secret Access Key",
"s3DestSecretAccessKeyPlaceholder": "Your AWS secret access key",
"s3DestRegionLabel": "AWS Region",
"s3DestBucketLabel": "Bucket Name",
"s3DestPrefixLabel": "Key Prefix (optional)",
"s3DestPrefixDescription": "Optional path prefix prepended to every object key. Objects are stored at {prefix}/{logType}/{YYYY}/{MM}/{DD}/{filename}.",
"s3DestEndpointLabel": "Custom Endpoint (optional)",
"s3DestEndpointDescription": "Override the S3 endpoint for S3-compatible storage such as MinIO or Cloudflare R2. Leave blank for standard AWS S3.",
"s3DestGzipLabel": "Gzip compression",
"s3DestGzipDescription": "Compress each uploaded object with gzip. Reduces storage costs and upload size.",
"s3DestFormatTitle": "File Format",
"s3DestFormatDescription": "How events are serialised inside each uploaded object.",
"s3DestFormatJsonArrayDescription": "Each object is a JSON array of event records. Compatible with most analytics tools.",
"s3DestFormatNdjsonDescription": "Each object contains one JSON record per line (newline-delimited JSON). Compatible with Athena, BigQuery, and Spark.",
"s3DestFormatCsvTitle": "CSV",
"s3DestFormatCsvDescription": "Each object is an RFC-4180 CSV file with a header row. Column names are derived from the event data fields.",
"s3DestSaveChanges": "Save Changes",
"s3DestCreateDestination": "Create Destination",
"s3DestUpdatedSuccess": "Destination updated successfully",
"s3DestCreatedSuccess": "Destination created successfully",
"s3DestUpdateFailed": "Failed to update destination",
"s3DestCreateFailed": "Failed to create destination",
"datadogDestEditTitle": "Edit Destination", "datadogDestEditTitle": "Edit Destination",
"datadogDestAddTitle": "Add Datadog Destination", "datadogDestAddTitle": "Add Datadog Destination",
"datadogDestEditDescription": "Update the configuration for this Datadog event streaming destination.", "datadogDestEditDescription": "Update the configuration for this Datadog event streaming destination.",

View File

@@ -87,7 +87,7 @@ function createDb() {
export const db = createDb(); export const db = createDb();
export default db; export default db;
export const primaryDb = db.$primary as typeof db; // is this typeof a problem - techincally they are different types export const primaryDb = db.$primary;
export type Transaction = Parameters< export type Transaction = Parameters<
Parameters<(typeof db)["transaction"]>[0] Parameters<(typeof db)["transaction"]>[0]
>[0]; >[0];

View File

@@ -25,9 +25,9 @@ import { tierMatrix } from "./billing/tierMatrix";
export async function calculateUserClientsForOrgs( export async function calculateUserClientsForOrgs(
userId: string, userId: string,
trx: Transaction | typeof db = db trx?: Transaction
): Promise<void> { ): Promise<void> {
const execute = async (transaction: Transaction | typeof db) => { const execute = async (transaction: Transaction) => {
const orgCache = new Map<string, typeof orgs.$inferSelect | null>(); const orgCache = new Map<string, typeof orgs.$inferSelect | null>();
const adminRoleCache = new Map< const adminRoleCache = new Map<
string, string,
@@ -437,7 +437,7 @@ export async function calculateUserClientsForOrgs(
async function cleanupOrphanedClients( async function cleanupOrphanedClients(
userId: string, userId: string,
trx: Transaction | typeof db, trx: Transaction,
userOrgIds: string[] = [] userOrgIds: string[] = []
): Promise<void> { ): Promise<void> {
// Find all OLM clients for this user that should be deleted // Find all OLM clients for this user that should be deleted

View File

@@ -124,7 +124,7 @@ export function computeBuckets(
let totalDowntime = 0; let totalDowntime = 0;
for (let d = 0; d < days; d++) { for (let d = 0; d < days; d++) {
const dayStartSec = todayMidnightSec - (days - 1 - d) * 86400; const dayStartSec = todayMidnightSec - (days - d) * 86400;
const dayEndSec = dayStartSec + 86400; const dayEndSec = dayStartSec + 86400;
const dayEvents = events.filter( const dayEvents = events.filter(

View File

@@ -485,133 +485,6 @@ async function syncAcmeCertsFromHttp(endpoint: string): Promise<void> {
} }
} }
async function storeCertForDomain(
domain: string,
certPem: string,
keyPem: string,
validatedX509: crypto.X509Certificate
): Promise<void> {
const wildcard = domain.startsWith("*.");
const existing = await db
.select()
.from(certificates)
.where(eq(certificates.domain, domain))
.limit(1);
let oldCertPem: string | null = null;
let oldKeyPem: string | null = null;
if (existing.length > 0 && existing[0].certFile) {
try {
const storedCertPem = decrypt(
existing[0].certFile,
config.getRawConfig().server.secret!
);
const wildcardUnchanged = existing[0].wildcard === wildcard;
if (storedCertPem === certPem && wildcardUnchanged) {
return;
}
oldCertPem = storedCertPem;
if (existing[0].keyFile) {
try {
oldKeyPem = decrypt(
existing[0].keyFile,
config.getRawConfig().server.secret!
);
} catch (keyErr) {
logger.debug(
`acmeCertSync: could not decrypt stored key for ${domain}: ${keyErr}`
);
}
}
} catch (err) {
logger.debug(
`acmeCertSync: could not decrypt stored cert for ${domain}, will update: ${err}`
);
}
}
let expiresAt: number | null = null;
try {
expiresAt = Math.floor(
new Date(validatedX509.validTo).getTime() / 1000
);
} catch (err) {
logger.debug(
`acmeCertSync: could not parse cert expiry for ${domain}: ${err}`
);
}
const encryptedCert = encrypt(
certPem,
config.getRawConfig().server.secret!
);
const encryptedKey = encrypt(keyPem, config.getRawConfig().server.secret!);
const now = Math.floor(Date.now() / 1000);
const domainId = await findDomainId(domain);
if (domainId) {
logger.debug(
`acmeCertSync: resolved domainId "${domainId}" for cert domain "${domain}"`
);
} else {
logger.debug(
`acmeCertSync: no matching domain record found for cert domain "${domain}"`
);
}
if (existing.length > 0) {
logger.debug(
`acmeCertSync: updating existing certificate for ${domain} (expires ${expiresAt ? new Date(expiresAt * 1000).toISOString() : "unknown"})`
);
await db
.update(certificates)
.set({
certFile: encryptedCert,
keyFile: encryptedKey,
status: "valid",
expiresAt,
updatedAt: now,
wildcard,
...(domainId !== null && { domainId })
})
.where(eq(certificates.domain, domain));
logger.debug(
`acmeCertSync: updated certificate for ${domain} (expires ${expiresAt ? new Date(expiresAt * 1000).toISOString() : "unknown"})`
);
await pushCertUpdateToAffectedNewts(
domain,
domainId,
oldCertPem,
oldKeyPem
);
} else {
logger.debug(
`acmeCertSync: inserting new certificate for ${domain} (expires ${expiresAt ? new Date(expiresAt * 1000).toISOString() : "unknown"})`
);
await db.insert(certificates).values({
domain,
domainId,
certFile: encryptedCert,
keyFile: encryptedKey,
status: "valid",
expiresAt,
createdAt: now,
updatedAt: now,
wildcard
});
logger.debug(
`acmeCertSync: inserted new certificate for ${domain} (expires ${expiresAt ? new Date(expiresAt * 1000).toISOString() : "unknown"})`
);
await pushCertUpdateToAffectedNewts(domain, domainId, null, null);
}
}
function findAcmeJsonFiles(dirPath: string): string[] { function findAcmeJsonFiles(dirPath: string): string[] {
const results: string[] = []; const results: string[] = [];
let entries: fs.Dirent[]; let entries: fs.Dirent[];
@@ -702,16 +575,18 @@ async function syncAcmeCerts(acmeJsonPath: string): Promise<void> {
} }
for (const cert of allCerts) { for (const cert of allCerts) {
const mainDomain = cert?.domain?.main; const domain = cert?.domain?.main;
if (!mainDomain || typeof mainDomain !== "string") { if (!domain || typeof domain !== "string") {
logger.debug(`acmeCertSync: skipping cert with missing domain`); logger.debug(`acmeCertSync: skipping cert with missing domain`);
continue; continue;
} }
const { wildcard } = detectWildcard(domain, cert.domain?.sans);
if (!cert.certificate || !cert.key) { if (!cert.certificate || !cert.key) {
logger.debug( logger.debug(
`acmeCertSync: skipping cert for ${mainDomain} - empty certificate or key field` `acmeCertSync: skipping cert for ${domain} - empty certificate or key field`
); );
continue; continue;
} }
@@ -723,14 +598,14 @@ async function syncAcmeCerts(acmeJsonPath: string): Promise<void> {
keyPem = Buffer.from(cert.key, "base64").toString("utf8"); keyPem = Buffer.from(cert.key, "base64").toString("utf8");
} catch (err) { } catch (err) {
logger.debug( logger.debug(
`acmeCertSync: skipping cert for ${mainDomain} - failed to base64-decode cert/key: ${err}` `acmeCertSync: skipping cert for ${domain} - failed to base64-decode cert/key: ${err}`
); );
continue; continue;
} }
if (!certPem.trim() || !keyPem.trim()) { if (!certPem.trim() || !keyPem.trim()) {
logger.debug( logger.debug(
`acmeCertSync: skipping cert for ${mainDomain} - blank PEM after base64 decode` `acmeCertSync: skipping cert for ${domain} - blank PEM after base64 decode`
); );
continue; continue;
} }
@@ -741,7 +616,7 @@ async function syncAcmeCerts(acmeJsonPath: string): Promise<void> {
const firstCertPemForValidation = extractFirstCert(certPem); const firstCertPemForValidation = extractFirstCert(certPem);
if (!firstCertPemForValidation) { if (!firstCertPemForValidation) {
logger.debug( logger.debug(
`acmeCertSync: skipping cert for ${mainDomain} - no PEM certificate block found` `acmeCertSync: skipping cert for ${domain} - no PEM certificate block found`
); );
continue; continue;
} }
@@ -753,7 +628,7 @@ async function syncAcmeCerts(acmeJsonPath: string): Promise<void> {
); );
} catch (err) { } catch (err) {
logger.debug( logger.debug(
`acmeCertSync: skipping cert for ${mainDomain} - invalid X.509 certificate: ${err}` `acmeCertSync: skipping cert for ${domain} - invalid X.509 certificate: ${err}`
); );
continue; continue;
} }
@@ -763,40 +638,139 @@ async function syncAcmeCerts(acmeJsonPath: string): Promise<void> {
crypto.createPrivateKey(keyPem); crypto.createPrivateKey(keyPem);
} catch (err) { } catch (err) {
logger.debug( logger.debug(
`acmeCertSync: skipping cert for ${mainDomain} - invalid private key: ${err}` `acmeCertSync: skipping cert for ${domain} - invalid private key: ${err}`
); );
continue; continue;
} }
// Collect all domains covered by this cert: main + every SAN. // Check if cert already exists in DB
// Each domain gets its own row in the certificates table so that const existing = await db
// lookups by any hostname on the cert succeed independently. .select()
const allDomains = new Set<string>([mainDomain]); .from(certificates)
if (Array.isArray(cert.domain?.sans)) { .where(and(eq(certificates.domain, domain)))
for (const san of cert.domain.sans) { .limit(1);
if (typeof san === "string" && san.trim()) {
allDomains.add(san.trim()); let oldCertPem: string | null = null;
let oldKeyPem: string | null = null;
if (existing.length > 0 && existing[0].certFile) {
try {
const storedCertPem = decrypt(
existing[0].certFile,
config.getRawConfig().server.secret!
);
const wildcardUnchanged = existing[0].wildcard === wildcard;
if (storedCertPem === certPem && wildcardUnchanged) {
// logger.debug(
// `acmeCertSync: cert for ${domain} is unchanged, skipping`
// );
continue;
} }
// Cert has changed; capture old values so we can send a correct
// update message to the newt after the DB write.
oldCertPem = storedCertPem;
if (existing[0].keyFile) {
try {
oldKeyPem = decrypt(
existing[0].keyFile,
config.getRawConfig().server.secret!
);
} catch (keyErr) {
logger.debug(
`acmeCertSync: could not decrypt stored key for ${domain}: ${keyErr}`
);
}
}
} catch (err) {
// Decryption failure means we should proceed with the update
logger.debug(
`acmeCertSync: could not decrypt stored cert for ${domain}, will update: ${err}`
);
} }
} }
logger.debug( // Parse cert expiry from the validated X.509 certificate
`acmeCertSync: cert for ${mainDomain} covers ${allDomains.size} domain(s): ${[...allDomains].join(", ")}` let expiresAt: number | null = null;
); try {
expiresAt = Math.floor(
new Date(validatedX509.validTo).getTime() / 1000
);
} catch (err) {
logger.debug(
`acmeCertSync: could not parse cert expiry for ${domain}: ${err}`
);
}
for (const domain of allDomains) { const encryptedCert = encrypt(
try { certPem,
await storeCertForDomain( config.getRawConfig().server.secret!
domain, );
certPem, const encryptedKey = encrypt(
keyPem, keyPem,
validatedX509 config.getRawConfig().server.secret!
); );
} catch (err) { const now = Math.floor(Date.now() / 1000);
logger.error(
`acmeCertSync: error storing cert for domain "${domain}": ${err}` const domainId = await findDomainId(domain);
); if (domainId) {
} logger.debug(
`acmeCertSync: resolved domainId "${domainId}" for cert domain "${domain}"`
);
} else {
logger.debug(
`acmeCertSync: no matching domain record found for cert domain "${domain}"`
);
}
if (existing.length > 0) {
logger.debug(
`acmeCertSync: updating existing certificate for ${domain} (expires ${expiresAt ? new Date(expiresAt * 1000).toISOString() : "unknown"})`
);
await db
.update(certificates)
.set({
certFile: encryptedCert,
keyFile: encryptedKey,
status: "valid",
expiresAt,
updatedAt: now,
wildcard,
...(domainId !== null && { domainId })
})
.where(eq(certificates.domain, domain));
logger.debug(
`acmeCertSync: updated certificate for ${domain} (expires ${expiresAt ? new Date(expiresAt * 1000).toISOString() : "unknown"})`
);
await pushCertUpdateToAffectedNewts(
domain,
domainId,
oldCertPem,
oldKeyPem
);
} else {
logger.debug(
`acmeCertSync: inserting new certificate for ${domain} (expires ${expiresAt ? new Date(expiresAt * 1000).toISOString() : "unknown"})`
);
await db.insert(certificates).values({
domain,
domainId,
certFile: encryptedCert,
keyFile: encryptedKey,
status: "valid",
expiresAt,
createdAt: now,
updatedAt: now,
wildcard
});
logger.debug(
`acmeCertSync: inserted new certificate for ${domain} (expires ${expiresAt ? new Date(expiresAt * 1000).toISOString() : "unknown"})`
);
// For a brand-new cert, push to any SSL resources that were waiting for it
await pushCertUpdateToAffectedNewts(domain, domainId, null, null);
} }
} }
} }

View File

@@ -30,12 +30,10 @@ import {
LOG_TYPES, LOG_TYPES,
LogEvent, LogEvent,
DestinationFailureState, DestinationFailureState,
HttpConfig, HttpConfig
S3Config
} from "./types"; } from "./types";
import { LogDestinationProvider } from "./providers/LogDestinationProvider"; import { LogDestinationProvider } from "./providers/LogDestinationProvider";
import { HttpLogDestination } from "./providers/HttpLogDestination"; import { HttpLogDestination } from "./providers/HttpLogDestination";
import { S3LogDestination } from "./providers/S3LogDestination";
import type { EventStreamingDestination } from "@server/db"; import type { EventStreamingDestination } from "@server/db";
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -74,11 +72,11 @@ const MAX_CATCHUP_BATCHES = 20;
* After the last entry the max value is re-used. * After the last entry the max value is re-used.
*/ */
const BACKOFF_SCHEDULE_MS = [ const BACKOFF_SCHEDULE_MS = [
60_000, // 1 min (failure 1) 60_000, // 1 min (failure 1)
2 * 60_000, // 2 min (failure 2) 2 * 60_000, // 2 min (failure 2)
5 * 60_000, // 5 min (failure 3) 5 * 60_000, // 5 min (failure 3)
10 * 60_000, // 10 min (failure 4) 10 * 60_000, // 10 min (failure 4)
30 * 60_000 // 30 min (failure 5+) 30 * 60_000 // 30 min (failure 5+)
]; ];
/** /**
@@ -206,10 +204,7 @@ export class LogStreamingManager {
this.pollTimer = null; this.pollTimer = null;
this.runPoll() this.runPoll()
.catch((err) => .catch((err) =>
logger.error( logger.error("LogStreamingManager: unexpected poll error", err)
"LogStreamingManager: unexpected poll error",
err
)
) )
.finally(() => { .finally(() => {
if (this.isRunning) { if (this.isRunning) {
@@ -280,13 +275,10 @@ export class LogStreamingManager {
} }
// Decrypt and parse config skip destination if either step fails // Decrypt and parse config skip destination if either step fails
let configFromDb: unknown; let configFromDb: HttpConfig;
try { try {
const decryptedConfig = decrypt( const decryptedConfig = decrypt(dest.config, config.getRawConfig().server.secret!);
dest.config, configFromDb = JSON.parse(decryptedConfig) as HttpConfig;
config.getRawConfig().server.secret!
);
configFromDb = JSON.parse(decryptedConfig);
} catch (err) { } catch (err) {
logger.error( logger.error(
`LogStreamingManager: destination ${dest.destinationId} has invalid or undecryptable config`, `LogStreamingManager: destination ${dest.destinationId} has invalid or undecryptable config`,
@@ -370,10 +362,7 @@ export class LogStreamingManager {
.from(eventStreamingCursors) .from(eventStreamingCursors)
.where( .where(
and( and(
eq( eq(eventStreamingCursors.destinationId, dest.destinationId),
eventStreamingCursors.destinationId,
dest.destinationId
),
eq(eventStreamingCursors.logType, logType) eq(eventStreamingCursors.logType, logType)
) )
) )
@@ -442,7 +431,9 @@ export class LogStreamingManager {
if (rows.length === 0) break; if (rows.length === 0) break;
const events = rows.map((row) => this.rowToLogEvent(logType, row)); const events = rows.map((row) =>
this.rowToLogEvent(logType, row)
);
// Throws on failure caught by the caller which applies back-off // Throws on failure caught by the caller which applies back-off
await provider.send(events); await provider.send(events);
@@ -686,7 +677,8 @@ export class LogStreamingManager {
break; break;
} }
const orgId = typeof row.orgId === "string" ? row.orgId : ""; const orgId =
typeof row.orgId === "string" ? row.orgId : "";
return { return {
id: row.id, id: row.id,
@@ -716,8 +708,6 @@ export class LogStreamingManager {
switch (type) { switch (type) {
case "http": case "http":
return new HttpLogDestination(config as HttpConfig); return new HttpLogDestination(config as HttpConfig);
case "s3":
return new S3LogDestination(config as S3Config);
// Future providers: // Future providers:
// case "datadog": return new DatadogLogDestination(config as DatadogConfig); // case "datadog": return new DatadogLogDestination(config as DatadogConfig);
default: default:

View File

@@ -1,279 +0,0 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.
* You may not use this file except in compliance with the License.
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
*
* This file is not licensed under the AGPLv3.
*/
import { S3Client, PutObjectCommand } from "@aws-sdk/client-s3";
import { gzip as gzipCallback } from "zlib";
import { promisify } from "util";
import { randomUUID } from "crypto";
import logger from "@server/logger";
import { LogEvent, S3Config, S3PayloadFormat } from "../types";
import { LogDestinationProvider } from "./LogDestinationProvider";
const gzipAsync = promisify(gzipCallback);
// ---------------------------------------------------------------------------
// Constants
// ---------------------------------------------------------------------------
/** Maximum time (ms) to wait for a single S3 PutObject response. */
const REQUEST_TIMEOUT_MS = 60_000;
/** Default payload format when none is specified in the config. */
const DEFAULT_FORMAT: S3PayloadFormat = "json_array";
// ---------------------------------------------------------------------------
// S3LogDestination
// ---------------------------------------------------------------------------
/**
* Forwards a batch of log events to an S3-compatible object store by
* uploading a single object per `send()` call.
*
* **Object key layout**
* ```
* {prefix}/{logType}/{YYYY}/{MM}/{DD}/{HH}-{mm}-{ss}-{uuid}.{ext}[.gz]
* ```
* - `prefix` from `config.prefix` (default: empty key starts at logType)
* - `logType` one of "request", "action", "access", "connection"
* - Date components are derived from the upload time (UTC)
* - `ext` `json` | `ndjson` | `csv`
* - `.gz` appended when `config.gzip` is true
*
* **Payload formats** (controlled by `config.format`):
* - `json_array` (default) body is a JSON array of event objects.
* - `ndjson` one JSON object per line (newline-delimited).
* - `csv` RFC-4180 CSV with a header row; columns are the
* union of all field names in the batch's event data.
*
* **Compression**: when `config.gzip` is `true` the body is gzip-compressed
* before upload and `Content-Encoding: gzip` is set on the object.
*
* **Custom endpoint**: set `config.endpoint` to target any S3-compatible
* storage service (e.g. MinIO, Cloudflare R2).
*/
export class S3LogDestination implements LogDestinationProvider {
readonly type = "s3";
private readonly config: S3Config;
constructor(config: S3Config) {
this.config = config;
}
// -----------------------------------------------------------------------
// LogDestinationProvider implementation
// -----------------------------------------------------------------------
async send(events: LogEvent[]): Promise<void> {
if (events.length === 0) return;
const format = this.config.format ?? DEFAULT_FORMAT;
const useGzip = this.config.gzip ?? false;
const logType = events[0].logType;
const rawBody = this.serialize(events, format);
const bodyBuffer = Buffer.from(rawBody, "utf-8");
let uploadBody: Buffer;
let contentEncoding: string | undefined;
if (useGzip) {
uploadBody = (await gzipAsync(bodyBuffer)) as Buffer;
contentEncoding = "gzip";
} else {
uploadBody = bodyBuffer;
}
const key = this.buildObjectKey(logType, format, useGzip);
const contentType = this.contentType(format);
const clientConfig: ConstructorParameters<typeof S3Client>[0] = {
region: this.config.region,
credentials: {
accessKeyId: this.config.accessKeyId,
secretAccessKey: this.config.secretAccessKey
},
requestHandler: {
requestTimeout: REQUEST_TIMEOUT_MS
}
};
if (this.config.endpoint?.trim()) {
clientConfig.endpoint = this.config.endpoint.trim();
}
const client = new S3Client(clientConfig);
try {
await client.send(
new PutObjectCommand({
Bucket: this.config.bucket,
Key: key,
Body: uploadBody,
ContentType: contentType,
...(contentEncoding
? { ContentEncoding: contentEncoding }
: {})
})
);
} catch (err: unknown) {
const msg = err instanceof Error ? err.message : String(err);
throw new Error(
`S3LogDestination: failed to upload object "${key}" ` +
`to bucket "${this.config.bucket}" ${msg}`
);
}
}
// -----------------------------------------------------------------------
// Internal helpers
// -----------------------------------------------------------------------
/**
* Construct a unique S3 object key for the given log type and format.
* Keys are partitioned by logType and date so they can be queried or
* lifecycle-managed independently.
*/
private buildObjectKey(
logType: string,
format: S3PayloadFormat,
gzip: boolean
): string {
const now = new Date();
const year = now.getUTCFullYear();
const month = String(now.getUTCMonth() + 1).padStart(2, "0");
const day = String(now.getUTCDate()).padStart(2, "0");
const hh = String(now.getUTCHours()).padStart(2, "0");
const mm = String(now.getUTCMinutes()).padStart(2, "0");
const ss = String(now.getUTCSeconds()).padStart(2, "0");
const uid = randomUUID();
const ext =
format === "csv" ? "csv" : format === "ndjson" ? "ndjson" : "json";
const fileName = `${hh}-${mm}-${ss}-${uid}.${ext}${gzip ? ".gz" : ""}`;
const rawPrefix = (this.config.prefix ?? "").trim().replace(/\/+$/, "");
const parts = [
rawPrefix,
logType,
`${year}/${month}/${day}`,
fileName
].filter((p) => p !== "");
return parts.join("/");
}
private contentType(format: S3PayloadFormat): string {
switch (format) {
case "csv":
return "text/csv; charset=utf-8";
case "ndjson":
return "application/x-ndjson";
default:
return "application/json";
}
}
private serialize(events: LogEvent[], format: S3PayloadFormat): string {
switch (format) {
case "json_array":
return JSON.stringify(events.map(toPayload));
case "ndjson":
return events
.map((e) => JSON.stringify(toPayload(e)))
.join("\n");
case "csv":
return toCsv(events);
}
}
}
// ---------------------------------------------------------------------------
// Payload helpers
// ---------------------------------------------------------------------------
function toPayload(event: LogEvent): unknown {
return {
event: event.logType,
timestamp: new Date(event.timestamp * 1000).toISOString(),
data: event.data
};
}
/**
* Convert a batch of events to RFC-4180 CSV.
*
* The column set is the union of `event`, `timestamp`, and all keys present in
* `event.data` across the batch, preserving insertion order. Values that
* contain commas, double-quotes, or newlines are quoted and escaped.
*/
function toCsv(events: LogEvent[]): string {
if (events.length === 0) return "";
// Collect all unique data keys in stable order
const keySet = new LinkedSet<string>();
keySet.add("event");
keySet.add("timestamp");
for (const e of events) {
for (const k of Object.keys(e.data)) {
keySet.add(k);
}
}
const headers = keySet.toArray();
const rows: string[] = [headers.map(csvEscape).join(",")];
for (const e of events) {
const flat: Record<string, unknown> = {
event: e.logType,
timestamp: new Date(e.timestamp * 1000).toISOString(),
...e.data
};
rows.push(
headers.map((h) => csvEscape(flattenValue(flat[h]))).join(",")
);
}
return rows.join("\n");
}
/** Flatten a value to a plain string suitable for a CSV cell. */
function flattenValue(value: unknown): string {
if (value === null || value === undefined) return "";
if (typeof value === "object") return JSON.stringify(value);
return String(value);
}
/** RFC-4180 CSV escaping. */
function csvEscape(value: string): string {
if (/[",\n\r]/.test(value)) {
return `"${value.replace(/"/g, '""')}"`;
}
return value;
}
// ---------------------------------------------------------------------------
// Minimal ordered set (preserves insertion order, deduplicates)
// ---------------------------------------------------------------------------
class LinkedSet<T> {
private readonly map = new Map<T, true>();
add(value: T): void {
this.map.set(value, true);
}
toArray(): T[] {
return Array.from(this.map.keys());
}
}

View File

@@ -107,40 +107,6 @@ export interface HttpConfig {
bodyTemplate?: string; bodyTemplate?: string;
} }
// ---------------------------------------------------------------------------
// S3 destination configuration
// ---------------------------------------------------------------------------
/**
* Controls how the batch of events is serialised into each S3 object.
*
* - `json_array` `[{…}, {…}]` default; each object is a JSON array.
* - `ndjson` `{…}\n{…}` newline-delimited JSON, one object per line.
* - `csv` RFC-4180 CSV with a header row derived from the event fields.
*/
export type S3PayloadFormat = "json_array" | "ndjson" | "csv";
export interface S3Config {
/** Human-readable label for the destination */
name: string;
/** AWS Access Key ID */
accessKeyId: string;
/** AWS Secret Access Key */
secretAccessKey: string;
/** AWS region (e.g. "us-east-1") */
region: string;
/** Target S3 bucket name */
bucket: string;
/** Optional key prefix appended before the auto-generated path */
prefix?: string;
/** Override the S3 endpoint for S3-compatible storage (e.g. MinIO, R2) */
endpoint?: string;
/** How events are serialised into each object. Defaults to "json_array". */
format: S3PayloadFormat;
/** Whether to gzip-compress the object before upload. */
gzip: boolean;
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Per-destination per-log-type cursor (reflects the DB table) // Per-destination per-log-type cursor (reflects the DB table)
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------

View File

@@ -14,7 +14,7 @@
import { Request, Response, NextFunction } from "express"; import { Request, Response, NextFunction } from "express";
import { z } from "zod"; import { z } from "zod";
import stoi from "@server/lib/stoi"; import stoi from "@server/lib/stoi";
import { clients, db, primaryDb, Client } from "@server/db"; import { clients, db } from "@server/db";
import { userOrgRoles, userOrgs, roles } from "@server/db"; import { userOrgRoles, userOrgs, roles } from "@server/db";
import { eq, and } from "drizzle-orm"; import { eq, and } from "drizzle-orm";
import response from "@server/lib/response"; import response from "@server/lib/response";
@@ -122,12 +122,8 @@ export async function addUserRole(
); );
} }
let newUserRole: { let newUserRole: { userId: string; orgId: string; roleId: number } | null =
userId: string; null;
orgId: string;
roleId: number;
} | null = null;
let orgClientsToRebuild: Client[] = [];
await db.transaction(async (trx) => { await db.transaction(async (trx) => {
const inserted = await trx const inserted = await trx
.insert(userOrgRoles) .insert(userOrgRoles)
@@ -153,19 +149,11 @@ export async function addUserRole(
) )
); );
orgClientsToRebuild = orgClients; for (const orgClient of orgClients) {
await rebuildClientAssociationsFromClient(orgClient, trx);
}
}); });
for (const orgClient of orgClientsToRebuild) {
rebuildClientAssociationsFromClient(orgClient, primaryDb).catch(
(e) => {
logger.error(
`Failed to rebuild client associations for client ${orgClient.clientId} after adding role: ${e}`
);
}
);
}
return response(res, { return response(res, {
data: newUserRole ?? { userId, orgId: role.orgId, roleId }, data: newUserRole ?? { userId, orgId: role.orgId, roleId },
success: true, success: true,

View File

@@ -14,7 +14,7 @@
import { Request, Response, NextFunction } from "express"; import { Request, Response, NextFunction } from "express";
import { z } from "zod"; import { z } from "zod";
import stoi from "@server/lib/stoi"; import stoi from "@server/lib/stoi";
import { db, primaryDb, Client } from "@server/db"; import { db } from "@server/db";
import { userOrgRoles, userOrgs, roles, clients } from "@server/db"; import { userOrgRoles, userOrgs, roles, clients } from "@server/db";
import { eq, and } from "drizzle-orm"; import { eq, and } from "drizzle-orm";
import response from "@server/lib/response"; import response from "@server/lib/response";
@@ -129,7 +129,6 @@ export async function removeUserRole(
} }
} }
let orgClientsToRebuild: Client[] = [];
await db.transaction(async (trx) => { await db.transaction(async (trx) => {
await trx await trx
.delete(userOrgRoles) .delete(userOrgRoles)
@@ -151,19 +150,11 @@ export async function removeUserRole(
) )
); );
orgClientsToRebuild = orgClients; for (const orgClient of orgClients) {
await rebuildClientAssociationsFromClient(orgClient, trx);
}
}); });
for (const orgClient of orgClientsToRebuild) {
rebuildClientAssociationsFromClient(orgClient, primaryDb).catch(
(e) => {
logger.error(
`Failed to rebuild client associations for client ${orgClient.clientId} after removing role: ${e}`
);
}
);
}
return response(res, { return response(res, {
data: { userId, orgId: role.orgId, roleId }, data: { userId, orgId: role.orgId, roleId },
success: true, success: true,

View File

@@ -13,7 +13,7 @@
import { Request, Response, NextFunction } from "express"; import { Request, Response, NextFunction } from "express";
import { z } from "zod"; import { z } from "zod";
import { clients, db, primaryDb, Client } from "@server/db"; import { clients, db } from "@server/db";
import { userOrgRoles, userOrgs, roles } from "@server/db"; import { userOrgRoles, userOrgs, roles } from "@server/db";
import { eq, and, inArray } from "drizzle-orm"; import { eq, and, inArray } from "drizzle-orm";
import response from "@server/lib/response"; import response from "@server/lib/response";
@@ -115,7 +115,6 @@ export async function setUserOrgRoles(
); );
} }
let orgClientsToRebuild: Client[] = [];
await db.transaction(async (trx) => { await db.transaction(async (trx) => {
await trx await trx
.delete(userOrgRoles) .delete(userOrgRoles)
@@ -143,19 +142,11 @@ export async function setUserOrgRoles(
and(eq(clients.userId, userId), eq(clients.orgId, orgId)) and(eq(clients.userId, userId), eq(clients.orgId, orgId))
); );
orgClientsToRebuild = orgClients; for (const orgClient of orgClients) {
await rebuildClientAssociationsFromClient(orgClient, trx);
}
}); });
for (const orgClient of orgClientsToRebuild) {
rebuildClientAssociationsFromClient(orgClient, primaryDb).catch(
(e) => {
logger.error(
`Failed to rebuild client associations for client ${orgClient.clientId} after setting roles: ${e}`
);
}
);
}
return response(res, { return response(res, {
data: { userId, orgId, roleIds: uniqueRoleIds }, data: { userId, orgId, roleIds: uniqueRoleIds },
success: true, success: true,

View File

@@ -1,6 +1,6 @@
import { Request, Response, NextFunction } from "express"; import { Request, Response, NextFunction } from "express";
import { z } from "zod"; import { z } from "zod";
import { db, orgs, userOrgs, users, primaryDb } from "@server/db"; import { db, orgs, userOrgs, users } from "@server/db";
import { eq, and, inArray, not } from "drizzle-orm"; import { eq, and, inArray, not } from "drizzle-orm";
import response from "@server/lib/response"; import response from "@server/lib/response";
import HttpCode from "@server/types/HttpCode"; import HttpCode from "@server/types/HttpCode";
@@ -218,18 +218,13 @@ export async function deleteMyAccount(
await db.transaction(async (trx) => { await db.transaction(async (trx) => {
await trx.delete(users).where(eq(users.userId, userId)); await trx.delete(users).where(eq(users.userId, userId));
await calculateUserClientsForOrgs(userId, trx);
// loop through the other orgs and decrement the count // loop through the other orgs and decrement the count
for (const userOrg of otherOrgsTheUserWasIn) { for (const userOrg of otherOrgsTheUserWasIn) {
await usageService.add(userOrg.orgId, FeatureId.USERS, -1, trx); await usageService.add(userOrg.orgId, FeatureId.USERS, -1, trx);
} }
}); });
calculateUserClientsForOrgs(userId, primaryDb).catch((e) => {
logger.error(
`Failed to calculate user clients after deleting account for user ${userId}: ${e}`
);
});
try { try {
await invalidateSession(session.sessionId); await invalidateSession(session.sessionId);
} catch (error) { } catch (error) {

View File

@@ -1,6 +1,6 @@
import { Request, Response, NextFunction } from "express"; import { Request, Response, NextFunction } from "express";
import { z } from "zod"; import { z } from "zod";
import { db, primaryDb } from "@server/db"; import { db } from "@server/db";
import { import {
roles, roles,
Client, Client,
@@ -92,10 +92,7 @@ export async function createClient(
const { orgId } = parsedParams.data; const { orgId } = parsedParams.data;
if ( if (req.user && (!req.userOrgRoleIds || req.userOrgRoleIds.length === 0)) {
req.user &&
(!req.userOrgRoleIds || req.userOrgRoleIds.length === 0)
) {
return next( return next(
createHttpError(HttpCode.FORBIDDEN, "User does not have a role") createHttpError(HttpCode.FORBIDDEN, "User does not have a role")
); );
@@ -201,10 +198,7 @@ export async function createClient(
if (!randomExitNode) { if (!randomExitNode) {
return next( return next(
createHttpError( createHttpError(HttpCode.NOT_FOUND, `No exit nodes available. ${build == "saas" ? "Please contact support." : "You need to install gerbil to use the clients."}`)
HttpCode.NOT_FOUND,
`No exit nodes available. ${build == "saas" ? "Please contact support." : "You need to install gerbil to use the clients."}`
)
); );
} }
@@ -262,17 +256,9 @@ export async function createClient(
clientId: newClient.clientId, clientId: newClient.clientId,
dateCreated: moment().toISOString() dateCreated: moment().toISOString()
}); });
});
if (newClient) { await rebuildClientAssociationsFromClient(newClient, trx);
rebuildClientAssociationsFromClient(newClient, primaryDb).catch( });
(e) => {
logger.error(
`Failed to rebuild client associations after creating client: ${e}`
);
}
);
}
return response<CreateClientResponse>(res, { return response<CreateClientResponse>(res, {
data: newClient, data: newClient,

View File

@@ -1,6 +1,6 @@
import { Request, Response, NextFunction } from "express"; import { Request, Response, NextFunction } from "express";
import { z } from "zod"; import { z } from "zod";
import { db, primaryDb } from "@server/db"; import { db } from "@server/db";
import { import {
roles, roles,
Client, Client,
@@ -237,17 +237,9 @@ export async function createUserClient(
userId, userId,
clientId: newClient.clientId clientId: newClient.clientId
}); });
});
if (newClient) { await rebuildClientAssociationsFromClient(newClient, trx);
rebuildClientAssociationsFromClient(newClient, primaryDb).catch( });
(e) => {
logger.error(
`Failed to rebuild client associations after creating user client: ${e}`
);
}
);
}
return response<CreateClientAndOlmResponse>(res, { return response<CreateClientAndOlmResponse>(res, {
data: newClient, data: newClient,

View File

@@ -1,6 +1,6 @@
import { Request, Response, NextFunction } from "express"; import { Request, Response, NextFunction } from "express";
import { z } from "zod"; import { z } from "zod";
import { db, olms, primaryDb, Client, Olm } from "@server/db"; import { db, olms } from "@server/db";
import { clients, clientSitesAssociationsCache } from "@server/db"; import { clients, clientSitesAssociationsCache } from "@server/db";
import { eq } from "drizzle-orm"; import { eq } from "drizzle-orm";
import response from "@server/lib/response"; import response from "@server/lib/response";
@@ -71,17 +71,14 @@ export async function deleteClient(
); );
} }
let deletedClient: Client | undefined;
let olm: Olm | undefined;
await db.transaction(async (trx) => { await db.transaction(async (trx) => {
// Then delete the client itself // Then delete the client itself
[deletedClient] = await trx const [deletedClient] = await trx
.delete(clients) .delete(clients)
.where(eq(clients.clientId, clientId)) .where(eq(clients.clientId, clientId))
.returning(); .returning();
[olm] = await trx const [olm] = await trx
.select() .select()
.from(olms) .from(olms)
.where(eq(olms.clientId, clientId)) .where(eq(olms.clientId, clientId))
@@ -91,28 +88,13 @@ export async function deleteClient(
if (!client.userId && client.olmId) { if (!client.userId && client.olmId) {
await trx.delete(olms).where(eq(olms.olmId, client.olmId)); await trx.delete(olms).where(eq(olms.olmId, client.olmId));
} }
});
if (deletedClient) { await rebuildClientAssociationsFromClient(deletedClient, trx);
rebuildClientAssociationsFromClient(deletedClient, primaryDb).catch(
(e) => {
logger.error(
`Failed to rebuild client associations after deleting client ${clientId}: ${e}`
);
}
);
if (olm) { if (olm) {
sendTerminateClient( await sendTerminateClient(deletedClient.clientId, OlmErrorCodes.TERMINATED_DELETED, olm.olmId); // the olmId needs to be provided because it cant look it up after deletion
deletedClient.clientId,
OlmErrorCodes.TERMINATED_DELETED,
olm.olmId
).catch((e) => {
logger.error(
`Failed to send terminate message for client ${deletedClient?.clientId} after deleting client ${clientId}: ${e}`
);
});
} }
} });
return response(res, { return response(res, {
data: null, data: null,

View File

@@ -1,5 +1,5 @@
import { NextFunction, Request, Response } from "express"; import { NextFunction, Request, Response } from "express";
import { db, olms, primaryDb } from "@server/db"; import { db, olms } from "@server/db";
import HttpCode from "@server/types/HttpCode"; import HttpCode from "@server/types/HttpCode";
import { z } from "zod"; import { z } from "zod";
import createHttpError from "http-errors"; import createHttpError from "http-errors";
@@ -81,19 +81,16 @@ export async function createUserOlm(
const secretHash = await hashPassword(secret); const secretHash = await hashPassword(secret);
await db.insert(olms).values({ await db.transaction(async (trx) => {
olmId: olmId, await trx.insert(olms).values({
userId, olmId: olmId,
name, userId,
secretHash, name,
dateCreated: moment().toISOString() secretHash,
}); dateCreated: moment().toISOString()
});
calculateUserClientsForOrgs(userId, primaryDb).catch((e) => { await calculateUserClientsForOrgs(userId, trx);
console.error(
"Error calculating user clients after creating olm:",
e
);
}); });
return response<CreateOlmResponse>(res, { return response<CreateOlmResponse>(res, {

View File

@@ -1,5 +1,5 @@
import { NextFunction, Request, Response } from "express"; import { NextFunction, Request, Response } from "express";
import { Client, db, Olm, primaryDb } from "@server/db"; import { Client, db } from "@server/db";
import { olms, clients, clientSitesAssociationsCache } from "@server/db"; import { olms, clients, clientSitesAssociationsCache } from "@server/db";
import { eq } from "drizzle-orm"; import { eq } from "drizzle-orm";
import HttpCode from "@server/types/HttpCode"; import HttpCode from "@server/types/HttpCode";
@@ -49,7 +49,6 @@ export async function deleteUserOlm(
const { olmId } = parsedParams.data; const { olmId } = parsedParams.data;
let deletedClient: Client | undefined;
// Delete associated clients and the OLM in a transaction // Delete associated clients and the OLM in a transaction
await db.transaction(async (trx) => { await db.transaction(async (trx) => {
// Find all clients associated with this OLM // Find all clients associated with this OLM
@@ -58,6 +57,7 @@ export async function deleteUserOlm(
.from(clients) .from(clients)
.where(eq(clients.olmId, olmId)); .where(eq(clients.olmId, olmId));
let deletedClient: Client | null = null;
// Delete all associated clients // Delete all associated clients
if (associatedClients.length > 0) { if (associatedClients.length > 0) {
[deletedClient] = await trx [deletedClient] = await trx
@@ -67,27 +67,22 @@ export async function deleteUserOlm(
} }
// Finally, delete the OLM itself // Finally, delete the OLM itself
await trx.delete(olms).where(eq(olms.olmId, olmId)).returning(); const [olm] = await trx
}); .delete(olms)
.where(eq(olms.olmId, olmId))
.returning();
if (deletedClient) { if (deletedClient) {
rebuildClientAssociationsFromClient(deletedClient, primaryDb).catch( await rebuildClientAssociationsFromClient(deletedClient, trx);
(e) => { if (olm) {
logger.error( await sendTerminateClient(
`Failed to rebuild client-site associations after deleting OLM ${olmId}: ${e}` deletedClient.clientId,
); OlmErrorCodes.TERMINATED_DELETED,
olm.olmId
); // the olmId needs to be provided because it cant look it up after deletion
} }
); }
sendTerminateClient( });
deletedClient.clientId,
OlmErrorCodes.TERMINATED_DELETED,
olmId
).catch((e) => {
logger.error(
`Failed to send terminate message for client ${deletedClient?.clientId} after deleting OLM ${olmId}: ${e}`
);
});
}
return response(res, { return response(res, {
data: null, data: null,

View File

@@ -22,14 +22,14 @@ import { canCompress } from "@server/lib/clientVersionChecks";
import config from "@server/lib/config"; import config from "@server/lib/config";
export const handleOlmRegisterMessage: MessageHandler = async (context) => { export const handleOlmRegisterMessage: MessageHandler = async (context) => {
logger.info("[handleOlmRegisterMessage] Handling register olm message"); logger.info("Handling register olm message!");
const { message, client: c, sendToClient } = context; const { message, client: c, sendToClient } = context;
const olm = c as Olm; const olm = c as Olm;
const now = Math.floor(Date.now() / 1000); const now = Math.floor(Date.now() / 1000);
if (!olm) { if (!olm) {
logger.warn("[handleOlmRegisterMessage] Olm not found"); logger.warn("Olm not found");
return; return;
} }
@@ -46,19 +46,16 @@ export const handleOlmRegisterMessage: MessageHandler = async (context) => {
} = message.data; } = message.data;
if (!olm.clientId) { if (!olm.clientId) {
logger.warn("[handleOlmRegisterMessage] Olm client ID not found"); logger.warn("Olm client ID not found");
sendOlmError(OlmErrorCodes.CLIENT_ID_NOT_FOUND, olm.olmId); sendOlmError(OlmErrorCodes.CLIENT_ID_NOT_FOUND, olm.olmId);
return; return;
} }
logger.debug( logger.debug("Handling fingerprint insertion for olm register...", {
"[handleOlmRegisterMessage] Handling fingerprint insertion for olm register...", olmId: olm.olmId,
{ fingerprint,
olmId: olm.olmId, postures
fingerprint, });
postures
}
);
const isUserDevice = olm.userId !== null && olm.userId !== undefined; const isUserDevice = olm.userId !== null && olm.userId !== undefined;
@@ -88,17 +85,14 @@ export const handleOlmRegisterMessage: MessageHandler = async (context) => {
.limit(1); .limit(1);
if (!client) { if (!client) {
logger.warn("[handleOlmRegisterMessage] Client not found", { logger.warn("Client ID not found");
clientId: olm.clientId
});
sendOlmError(OlmErrorCodes.CLIENT_NOT_FOUND, olm.olmId); sendOlmError(OlmErrorCodes.CLIENT_NOT_FOUND, olm.olmId);
return; return;
} }
if (client.blocked) { if (client.blocked) {
logger.debug( logger.debug(
`[handleOlmRegisterMessage] Client ${client.clientId} is blocked. Ignoring register.`, `Client ${client.clientId} is blocked. Ignoring register.`
{ orgId: client.orgId }
); );
sendOlmError(OlmErrorCodes.CLIENT_BLOCKED, olm.olmId); sendOlmError(OlmErrorCodes.CLIENT_BLOCKED, olm.olmId);
return; return;
@@ -106,8 +100,7 @@ export const handleOlmRegisterMessage: MessageHandler = async (context) => {
if (client.approvalState == "pending") { if (client.approvalState == "pending") {
logger.debug( logger.debug(
`[handleOlmRegisterMessage] Client ${client.clientId} approval is pending. Ignoring register.`, `Client ${client.clientId} approval is pending. Ignoring register.`
{ orgId: client.orgId }
); );
sendOlmError(OlmErrorCodes.CLIENT_PENDING, olm.olmId); sendOlmError(OlmErrorCodes.CLIENT_PENDING, olm.olmId);
return; return;
@@ -135,18 +128,14 @@ export const handleOlmRegisterMessage: MessageHandler = async (context) => {
.limit(1); .limit(1);
if (!org) { if (!org) {
logger.warn("[handleOlmRegisterMessage] Org not found", { logger.warn("Org not found");
orgId: client.orgId
});
sendOlmError(OlmErrorCodes.ORG_NOT_FOUND, olm.olmId); sendOlmError(OlmErrorCodes.ORG_NOT_FOUND, olm.olmId);
return; return;
} }
if (orgId) { if (orgId) {
if (!olm.userId) { if (!olm.userId) {
logger.warn("[handleOlmRegisterMessage] Olm has no user ID", { logger.warn("Olm has no user ID");
orgId: client.orgId
});
sendOlmError(OlmErrorCodes.USER_ID_NOT_FOUND, olm.olmId); sendOlmError(OlmErrorCodes.USER_ID_NOT_FOUND, olm.olmId);
return; return;
} }
@@ -154,18 +143,12 @@ export const handleOlmRegisterMessage: MessageHandler = async (context) => {
const { session: userSession, user } = const { session: userSession, user } =
await validateSessionToken(userToken); await validateSessionToken(userToken);
if (!userSession || !user) { if (!userSession || !user) {
logger.warn( logger.warn("Invalid user session for olm register");
"[handleOlmRegisterMessage] Invalid user session for olm register",
{ orgId: client.orgId }
);
sendOlmError(OlmErrorCodes.INVALID_USER_SESSION, olm.olmId); sendOlmError(OlmErrorCodes.INVALID_USER_SESSION, olm.olmId);
return; return;
} }
if (user.userId !== olm.userId) { if (user.userId !== olm.userId) {
logger.warn( logger.warn("User ID mismatch for olm register");
"[handleOlmRegisterMessage] User ID mismatch for olm register",
{ orgId: client.orgId }
);
sendOlmError(OlmErrorCodes.USER_ID_MISMATCH, olm.olmId); sendOlmError(OlmErrorCodes.USER_ID_MISMATCH, olm.olmId);
return; return;
} }
@@ -180,15 +163,11 @@ export const handleOlmRegisterMessage: MessageHandler = async (context) => {
sessionId // this is the user token passed in the message sessionId // this is the user token passed in the message
}); });
logger.debug("[handleOlmRegisterMessage] Policy check result", { logger.debug("Policy check result:", policyCheck);
orgId: client.orgId,
policyCheck
});
if (policyCheck?.error) { if (policyCheck?.error) {
logger.error( logger.error(
`[handleOlmRegisterMessage] Error checking access policies for olm user ${olm.userId} in org ${orgId}: ${policyCheck?.error}`, `Error checking access policies for olm user ${olm.userId} in org ${orgId}: ${policyCheck?.error}`
{ orgId: client.orgId }
); );
sendOlmError(OlmErrorCodes.ORG_ACCESS_POLICY_DENIED, olm.olmId); sendOlmError(OlmErrorCodes.ORG_ACCESS_POLICY_DENIED, olm.olmId);
return; return;
@@ -196,8 +175,7 @@ export const handleOlmRegisterMessage: MessageHandler = async (context) => {
if (policyCheck.policies?.passwordAge?.compliant === false) { if (policyCheck.policies?.passwordAge?.compliant === false) {
logger.warn( logger.warn(
`[handleOlmRegisterMessage] Olm user ${olm.userId} has non-compliant password age for org ${orgId}`, `Olm user ${olm.userId} has non-compliant password age for org ${orgId}`
{ orgId: client.orgId }
); );
sendOlmError( sendOlmError(
OlmErrorCodes.ORG_ACCESS_POLICY_PASSWORD_EXPIRED, OlmErrorCodes.ORG_ACCESS_POLICY_PASSWORD_EXPIRED,
@@ -208,8 +186,7 @@ export const handleOlmRegisterMessage: MessageHandler = async (context) => {
policyCheck.policies?.maxSessionLength?.compliant === false policyCheck.policies?.maxSessionLength?.compliant === false
) { ) {
logger.warn( logger.warn(
`[handleOlmRegisterMessage] Olm user ${olm.userId} has non-compliant session length for org ${orgId}`, `Olm user ${olm.userId} has non-compliant session length for org ${orgId}`
{ orgId: client.orgId }
); );
sendOlmError( sendOlmError(
OlmErrorCodes.ORG_ACCESS_POLICY_SESSION_EXPIRED, OlmErrorCodes.ORG_ACCESS_POLICY_SESSION_EXPIRED,
@@ -218,8 +195,7 @@ export const handleOlmRegisterMessage: MessageHandler = async (context) => {
return; return;
} else if (policyCheck.policies?.requiredTwoFactor === false) { } else if (policyCheck.policies?.requiredTwoFactor === false) {
logger.warn( logger.warn(
`[handleOlmRegisterMessage] Olm user ${olm.userId} does not have 2FA enabled for org ${orgId}`, `Olm user ${olm.userId} does not have 2FA enabled for org ${orgId}`
{ orgId: client.orgId }
); );
sendOlmError( sendOlmError(
OlmErrorCodes.ORG_ACCESS_POLICY_2FA_REQUIRED, OlmErrorCodes.ORG_ACCESS_POLICY_2FA_REQUIRED,
@@ -228,8 +204,7 @@ export const handleOlmRegisterMessage: MessageHandler = async (context) => {
return; return;
} else if (!policyCheck.allowed) { } else if (!policyCheck.allowed) {
logger.warn( logger.warn(
`[handleOlmRegisterMessage] Olm user ${olm.userId} does not pass access policies for org ${orgId}: ${policyCheck.error}`, `Olm user ${olm.userId} does not pass access policies for org ${orgId}: ${policyCheck.error}`
{ orgId: client.orgId }
); );
sendOlmError(OlmErrorCodes.ORG_ACCESS_POLICY_DENIED, olm.olmId); sendOlmError(OlmErrorCodes.ORG_ACCESS_POLICY_DENIED, olm.olmId);
return; return;
@@ -251,39 +226,29 @@ export const handleOlmRegisterMessage: MessageHandler = async (context) => {
sitesCountResult.length > 0 ? sitesCountResult[0].count : 0; sitesCountResult.length > 0 ? sitesCountResult[0].count : 0;
// Prepare an array to store site configurations // Prepare an array to store site configurations
logger.debug( logger.debug(`Found ${sitesCount} sites for client ${client.clientId}`);
`[handleOlmRegisterMessage] Found ${sitesCount} sites for client ${client.clientId}`,
{ orgId: client.orgId }
);
let jitMode = false; let jitMode = false;
if (sitesCount > 250 && build == "saas") { if (sitesCount > 250 && build == "saas") {
// THIS IS THE MAX ON THE BUSINESS TIER // THIS IS THE MAX ON THE BUSINESS TIER
// we have too many sites // we have too many sites
// If we have too many sites we need to drop into fully JIT mode by not sending any of the sites // If we have too many sites we need to drop into fully JIT mode by not sending any of the sites
logger.info( logger.info("Too many sites (%d), dropping into JIT mode", sitesCount);
`[handleOlmRegisterMessage] Too many sites (${sitesCount}), dropping into JIT mode`,
{ orgId: client.orgId }
);
jitMode = true; jitMode = true;
} }
logger.debug( logger.debug(
`[handleOlmRegisterMessage] Olm client ID: ${client.clientId}, Public Key: ${publicKey}, Relay: ${relay}`, `Olm client ID: ${client.clientId}, Public Key: ${publicKey}, Relay: ${relay}`
{ orgId: client.orgId }
); );
if (!publicKey) { if (!publicKey) {
logger.warn("[handleOlmRegisterMessage] Public key not provided", { logger.warn("Public key not provided");
orgId: client.orgId
});
return; return;
} }
if (client.pubKey !== publicKey || client.archived) { if (client.pubKey !== publicKey || client.archived) {
logger.info( logger.info(
"[handleOlmRegisterMessage] Public key mismatch. Updating public key and clearing session info...", "Public key mismatch. Updating public key and clearing session info..."
{ orgId: client.orgId }
); );
// Update the client's public key // Update the client's public key
await db await db
@@ -309,13 +274,12 @@ export const handleOlmRegisterMessage: MessageHandler = async (context) => {
// TODO: I still think there is a better way to do this rather than locking it out here but ??? // TODO: I still think there is a better way to do this rather than locking it out here but ???
if (now - (client.lastHolePunch || 0) > 5 && sitesCount > 0) { if (now - (client.lastHolePunch || 0) > 5 && sitesCount > 0) {
logger.warn( logger.warn(
`[handleOlmRegisterMessage] Client last hole punch is too old and we have sites to send; skipping this register. The client is failing to hole punch and identify its network address with the server. Can the client reach the server on UDP port ${config.getRawConfig().gerbil.clients_start_port}?`, `Client last hole punch is too old and we have sites to send; skipping this register. The client is failing to hole punch and identify its network address with the server. Can the client reach the server on UDP port ${config.getRawConfig().gerbil.clients_start_port}?`
{ orgId: client.orgId }
); );
return; return;
} }
// NOTE: its important that the client here is the old client and the public key is the new key // NOTE: its important that the client here is the old client and the public key is the new key
const siteConfigurations = await buildSiteConfigurationForOlmClient( const siteConfigurations = await buildSiteConfigurationForOlmClient(
client, client,
publicKey, publicKey,

View File

@@ -5,8 +5,7 @@ import {
clients, clients,
clientSiteResources, clientSiteResources,
siteResources, siteResources,
apiKeyOrg, apiKeyOrg
primaryDb
} from "@server/db"; } from "@server/db";
import response from "@server/lib/response"; import response from "@server/lib/response";
import HttpCode from "@server/types/HttpCode"; import HttpCode from "@server/types/HttpCode";
@@ -221,12 +220,8 @@ export async function batchAddClientToSiteResources(
siteResourceId: siteResource.siteResourceId siteResourceId: siteResource.siteResourceId
}); });
} }
});
rebuildClientAssociationsFromClient(client, primaryDb).catch((e) => { await rebuildClientAssociationsFromClient(client, trx);
logger.error(
`Failed to rebuild client associations after batch adding site resources for client ${clientId}: ${e}`
);
}); });
return response(res, { return response(res, {

View File

@@ -10,8 +10,7 @@ import {
SiteResource, SiteResource,
siteResources, siteResources,
sites, sites,
userSiteResources, userSiteResources
primaryDb
} from "@server/db"; } from "@server/db";
import { getUniqueSiteResourceName } from "@server/db/names"; import { getUniqueSiteResourceName } from "@server/db/names";
import { import {
@@ -520,10 +519,12 @@ export async function createSiteResource(
// own transaction so it always executes on the primary — avoiding any // own transaction so it always executes on the primary — avoiding any
// replica-lag issues while still allowing the HTTP response to return // replica-lag issues while still allowing the HTTP response to return
// early. // early.
rebuildClientAssociationsFromSiteResource( db.transaction(async (trx) => {
newSiteResource!, await rebuildClientAssociationsFromSiteResource(
primaryDb newSiteResource!,
).catch((err) => { trx
);
}).catch((err) => {
logger.error( logger.error(
`Error rebuilding client associations for site resource ${newSiteResource!.siteResourceId}:`, `Error rebuilding client associations for site resource ${newSiteResource!.siteResourceId}:`,
err err

View File

@@ -1,6 +1,6 @@
import { Request, Response, NextFunction } from "express"; import { Request, Response, NextFunction } from "express";
import { z } from "zod"; import { z } from "zod";
import { db, newts, primaryDb, sites } from "@server/db"; import { db, newts, sites } from "@server/db";
import { siteResources } from "@server/db"; import { siteResources } from "@server/db";
import response from "@server/lib/response"; import response from "@server/lib/response";
import HttpCode from "@server/types/HttpCode"; import HttpCode from "@server/types/HttpCode";
@@ -73,10 +73,12 @@ export async function deleteSiteResource(
// own transaction so it always executes on the primary — avoiding any // own transaction so it always executes on the primary — avoiding any
// replica-lag issues while still allowing the HTTP response to return // replica-lag issues while still allowing the HTTP response to return
// early. // early.
rebuildClientAssociationsFromSiteResource( db.transaction(async (trx) => {
removedSiteResource, await rebuildClientAssociationsFromSiteResource(
primaryDb removedSiteResource,
).catch((err) => { trx
);
}).catch((err) => {
logger.error( logger.error(
`Error rebuilding client associations for site resource ${removedSiteResource!.siteResourceId}:`, `Error rebuilding client associations for site resource ${removedSiteResource!.siteResourceId}:`,
err err

View File

@@ -1,13 +1,7 @@
import { Request, Response, NextFunction } from "express"; import { Request, Response, NextFunction } from "express";
import { z } from "zod"; import { z } from "zod";
import { db, orgs, primaryDb } from "@server/db"; import { db, orgs } from "@server/db";
import { import { roles, userInviteRoles, userInvites, userOrgs, users } from "@server/db";
roles,
userInviteRoles,
userInvites,
userOrgs,
users
} from "@server/db";
import { eq, and, inArray } from "drizzle-orm"; import { eq, and, inArray } from "drizzle-orm";
import response from "@server/lib/response"; import response from "@server/lib/response";
import HttpCode from "@server/types/HttpCode"; import HttpCode from "@server/types/HttpCode";
@@ -152,7 +146,9 @@ export async function acceptInvite(
.from(userInviteRoles) .from(userInviteRoles)
.where(eq(userInviteRoles.inviteId, inviteId)); .where(eq(userInviteRoles.inviteId, inviteId));
const inviteRoleIds = [...new Set(inviteRoleRows.map((r) => r.roleId))]; const inviteRoleIds = [
...new Set(inviteRoleRows.map((r) => r.roleId))
];
if (inviteRoleIds.length === 0) { if (inviteRoleIds.length === 0) {
return next( return next(
createHttpError( createHttpError(
@@ -197,19 +193,13 @@ export async function acceptInvite(
.delete(userInvites) .delete(userInvites)
.where(eq(userInvites.inviteId, inviteId)); .where(eq(userInvites.inviteId, inviteId));
await calculateUserClientsForOrgs(existingUser[0].userId, trx);
logger.debug( logger.debug(
`User ${existingUser[0].userId} accepted invite to org ${existingInvite.orgId}` `User ${existingUser[0].userId} accepted invite to org ${existingInvite.orgId}`
); );
}); });
calculateUserClientsForOrgs(existingUser[0].userId, primaryDb).catch(
(e) => {
logger.error(
`Failed to calculate user clients after accepting invite for user ${existingUser[0].userId}: ${e}`
);
}
);
return response<AcceptInviteResponse>(res, { return response<AcceptInviteResponse>(res, {
data: { accepted: true, orgId: existingInvite.orgId }, data: { accepted: true, orgId: existingInvite.orgId },
success: true, success: true,

View File

@@ -1,7 +1,7 @@
import { Request, Response, NextFunction } from "express"; import { Request, Response, NextFunction } from "express";
import { z } from "zod"; import { z } from "zod";
import stoi from "@server/lib/stoi"; import stoi from "@server/lib/stoi";
import { clients, db, primaryDb, Client } from "@server/db"; import { clients, db } from "@server/db";
import { userOrgRoles, userOrgs, roles } from "@server/db"; import { userOrgRoles, userOrgs, roles } from "@server/db";
import { eq, and } from "drizzle-orm"; import { eq, and } from "drizzle-orm";
import response from "@server/lib/response"; import response from "@server/lib/response";
@@ -112,8 +112,6 @@ export async function addUserRoleLegacy(
); );
} }
let orgClientsToRebuild: Client[] = [];
await db.transaction(async (trx) => { await db.transaction(async (trx) => {
await trx await trx
.delete(userOrgRoles) .delete(userOrgRoles)
@@ -140,19 +138,11 @@ export async function addUserRoleLegacy(
) )
); );
orgClientsToRebuild = orgClients; for (const orgClient of orgClients) {
await rebuildClientAssociationsFromClient(orgClient, trx);
}
}); });
for (const orgClient of orgClientsToRebuild) {
rebuildClientAssociationsFromClient(orgClient, primaryDb).catch(
(e) => {
logger.error(
`Failed to rebuild client associations for client ${orgClient.clientId} after adding role: ${e}`
);
}
);
}
return response(res, { return response(res, {
data: { ...existingUser, roleId }, data: { ...existingUser, roleId },
success: true, success: true,

View File

@@ -1,6 +1,6 @@
import { Request, Response, NextFunction } from "express"; import { Request, Response, NextFunction } from "express";
import { z } from "zod"; import { z } from "zod";
import { db, primaryDb } from "@server/db"; import { db } from "@server/db";
import { users } from "@server/db"; import { users } from "@server/db";
import { eq } from "drizzle-orm"; import { eq } from "drizzle-orm";
import response from "@server/lib/response"; import response from "@server/lib/response";
@@ -53,12 +53,8 @@ export async function adminRemoveUser(
await db.transaction(async (trx) => { await db.transaction(async (trx) => {
await trx.delete(users).where(eq(users.userId, userId)); await trx.delete(users).where(eq(users.userId, userId));
});
calculateUserClientsForOrgs(userId, primaryDb).catch((e) => { await calculateUserClientsForOrgs(userId, trx);
logger.error(
`Failed to calculate user clients after removing user ${userId}: ${e}`
);
}); });
return response(res, { return response(res, {

View File

@@ -6,7 +6,7 @@ import createHttpError from "http-errors";
import logger from "@server/logger"; import logger from "@server/logger";
import { fromError } from "zod-validation-error"; import { fromError } from "zod-validation-error";
import { OpenAPITags, registry } from "@server/openApi"; import { OpenAPITags, registry } from "@server/openApi";
import { db, orgs, primaryDb } from "@server/db"; import { db, orgs } from "@server/db";
import { and, eq, inArray } from "drizzle-orm"; import { and, eq, inArray } from "drizzle-orm";
import { idp, idpOidcConfig, roles, userOrgs, users } from "@server/db"; import { idp, idpOidcConfig, roles, userOrgs, users } from "@server/db";
import { generateId } from "@server/auth/sessions/app"; import { generateId } from "@server/auth/sessions/app";
@@ -34,7 +34,8 @@ const bodySchema = z
roleId: z.number().int().positive().optional() roleId: z.number().int().positive().optional()
}) })
.refine( .refine(
(d) => (d.roleIds != null && d.roleIds.length > 0) || d.roleId != null, (d) =>
(d.roleIds != null && d.roleIds.length > 0) || d.roleId != null,
{ message: "roleIds or roleId is required", path: ["roleIds"] } { message: "roleIds or roleId is required", path: ["roleIds"] }
) )
.transform((data) => ({ .transform((data) => ({
@@ -99,14 +100,8 @@ export async function createOrgUser(
} }
const { orgId } = parsedParams.data; const { orgId } = parsedParams.data;
const { const { username, email, name, type, idpId, roleIds: uniqueRoleIds } =
username, parsedBody.data;
email,
name,
type,
idpId,
roleIds: uniqueRoleIds
} = parsedBody.data;
if (build == "saas") { if (build == "saas") {
const usage = await usageService.getUsage(orgId, FeatureId.USERS); const usage = await usageService.getUsage(orgId, FeatureId.USERS);
@@ -237,7 +232,6 @@ export async function createOrgUser(
); );
} }
let userIdForClients: string | undefined;
await db.transaction(async (trx) => { await db.transaction(async (trx) => {
const [existingUser] = await trx const [existingUser] = await trx
.select() .select()
@@ -276,7 +270,7 @@ export async function createOrgUser(
{ {
orgId, orgId,
userId: existingUser.userId, userId: existingUser.userId,
autoProvisioned: false autoProvisioned: false,
}, },
uniqueRoleIds, uniqueRoleIds,
trx trx
@@ -298,30 +292,20 @@ export async function createOrgUser(
}) })
.returning(); .returning();
await assignUserToOrg( await assignUserToOrg(
org, org,
{ {
orgId, orgId,
userId: newUser.userId, userId: newUser.userId,
autoProvisioned: false autoProvisioned: false,
}, },
uniqueRoleIds, uniqueRoleIds,
trx trx
); );
} }
userIdForClients = userId; await calculateUserClientsForOrgs(userId, trx);
}); });
if (userIdForClients) {
calculateUserClientsForOrgs(userIdForClients, primaryDb).catch(
(e) => {
logger.error(
`Failed to calculate user clients after creating org user: ${e}`
);
}
);
}
} else { } else {
return next( return next(
createHttpError(HttpCode.BAD_REQUEST, "User type is required") createHttpError(HttpCode.BAD_REQUEST, "User type is required")

View File

@@ -7,8 +7,7 @@ import {
siteResources, siteResources,
sites, sites,
UserOrg, UserOrg,
userSiteResources, userSiteResources
primaryDb
} from "@server/db"; } from "@server/db";
import { userOrgs, userResources, users, userSites } from "@server/db"; import { userOrgs, userResources, users, userSites } from "@server/db";
import { and, count, eq, exists, inArray } from "drizzle-orm"; import { and, count, eq, exists, inArray } from "drizzle-orm";
@@ -92,12 +91,25 @@ export async function removeUserOrg(
await db.transaction(async (trx) => { await db.transaction(async (trx) => {
await removeUserFromOrg(org, userId, trx); await removeUserFromOrg(org, userId, trx);
});
calculateUserClientsForOrgs(userId, primaryDb).catch((e) => { // if (build === "saas") {
logger.error( // const [rootUser] = await trx
`Failed to calculate user clients after removing user ${userId} from org ${orgId}: ${e}` // .select()
); // .from(users)
// .where(eq(users.userId, userId));
//
// const [leftInOrgs] = await trx
// .select({ count: count() })
// .from(userOrgs)
// .where(eq(userOrgs.userId, userId));
//
// // if the user is not an internal user and does not belong to any org, delete the entire user
// if (rootUser?.type !== UserType.Internal && !leftInOrgs.count) {
// await trx.delete(users).where(eq(users.userId, userId));
// }
// }
await calculateUserClientsForOrgs(userId, trx);
}); });
return response(res, { return response(res, {

View File

@@ -44,7 +44,7 @@ export default async function migration() {
await db.execute(sql`BEGIN`); await db.execute(sql`BEGIN`);
await db.execute(sql` await db.execute(sql`
CREATE TABLE IF NOT EXISTS "trialNotifications" ( CREATE TABLE "trialNotifications" (
"notificationId" serial PRIMARY KEY NOT NULL, "notificationId" serial PRIMARY KEY NOT NULL,
"subscriptionId" varchar(255) NOT NULL, "subscriptionId" varchar(255) NOT NULL,
"notificationType" varchar(50) NOT NULL, "notificationType" varchar(50) NOT NULL,
@@ -52,6 +52,10 @@ export default async function migration() {
); );
`); `);
await db.execute(sql`
ALTER TABLE "trialNotifications" ADD CONSTRAINT "trialNotifications_subscriptionId_subscriptions_subscriptionId_fk" FOREIGN KEY ("subscriptionId") REFERENCES "public"."subscriptions"("subscriptionId") ON DELETE cascade ON UPDATE no action;
`);
await db.execute(sql`COMMIT`); await db.execute(sql`COMMIT`);
console.log("Migrated database"); console.log("Migrated database");
} catch (e) { } catch (e) {

View File

@@ -16,7 +16,7 @@ export default async function migration() {
db.transaction(() => { db.transaction(() => {
db.prepare( db.prepare(
` `
CREATE TABLE IF NOT EXISTS 'trialNotifications' ( CREATE TABLE 'trialNotifications' (
'notificationId' integer PRIMARY KEY AUTOINCREMENT NOT NULL, 'notificationId' integer PRIMARY KEY AUTOINCREMENT NOT NULL,
'subscriptionId' text NOT NULL, 'subscriptionId' text NOT NULL,
'notificationType' text NOT NULL, 'notificationType' text NOT NULL,

View File

@@ -1,6 +1,6 @@
"use client"; "use client";
import React, { useState, useEffect, useCallback } from "react"; import { useState, useEffect, useCallback } from "react";
import { useParams } from "next/navigation"; import { useParams } from "next/navigation";
import { createApiClient, formatAxiosError } from "@app/lib/api"; import { createApiClient, formatAxiosError } from "@app/lib/api";
import { useEnvContext } from "@app/hooks/useEnvContext"; import { useEnvContext } from "@app/hooks/useEnvContext";
@@ -38,10 +38,7 @@ import {
HttpDestinationCredenza, HttpDestinationCredenza,
parseHttpConfig parseHttpConfig
} from "@app/components/HttpDestinationCredenza"; } from "@app/components/HttpDestinationCredenza";
import { import { S3DestinationCredenza } from "@app/components/S3DestinationCredenza";
S3DestinationCredenza,
parseS3Config
} from "@app/components/S3DestinationCredenza";
import { DatadogDestinationCredenza } from "@app/components/DatadogDestinationCredenza"; import { DatadogDestinationCredenza } from "@app/components/DatadogDestinationCredenza";
import { useTranslations } from "next-intl"; import { useTranslations } from "next-intl";
@@ -67,42 +64,6 @@ interface DestinationCardProps {
disabled?: boolean; disabled?: boolean;
} }
function getDestinationDisplay(destination: Destination): {
name: string;
typeLabel: string;
detail: string;
icon: React.ReactNode;
} {
if (destination.type === "s3") {
const cfg = parseS3Config(destination.config);
const detail = cfg.bucket
? `s3://${cfg.bucket}${cfg.prefix ? `/${cfg.prefix.replace(/^\/+/, "")}` : ""}`
: "";
return {
name: cfg.name,
typeLabel: "Amazon S3",
detail,
icon: (
<Image
src="/third-party/s3.png"
alt="Amazon S3"
width={16}
height={16}
className="rounded-sm"
/>
)
};
}
// Default: HTTP
const cfg = parseHttpConfig(destination.config);
return {
name: cfg.name,
typeLabel: "HTTP",
detail: cfg.url,
icon: <Globe className="h-3.5 w-3.5 text-black" />
};
}
function DestinationCard({ function DestinationCard({
destination, destination,
onToggle, onToggle,
@@ -112,25 +73,25 @@ function DestinationCard({
disabled = false disabled = false
}: DestinationCardProps) { }: DestinationCardProps) {
const t = useTranslations(); const t = useTranslations();
const { name, typeLabel, detail, icon } = const cfg = parseHttpConfig(destination.config);
getDestinationDisplay(destination);
return ( return (
<div className="relative flex flex-col rounded-lg border bg-card text-card-foreground p-5 gap-3"> <div className="relative flex flex-col rounded-lg border bg-card text-card-foreground p-5 gap-3">
{/* Top row: icon + name/type + toggle */} {/* Top row: icon + name/type + toggle */}
<div className="flex items-start justify-between gap-3"> <div className="flex items-start justify-between gap-3">
<div className="flex items-center gap-3 min-w-0"> <div className="flex items-center gap-3 min-w-0">
{/* Squirkle icon: gray outer → white inner → black globe */}
<div className="shrink-0 flex items-center justify-center w-10 h-10 rounded-2xl bg-muted"> <div className="shrink-0 flex items-center justify-center w-10 h-10 rounded-2xl bg-muted">
<div className="flex items-center justify-center w-6 h-6 rounded-xl bg-white shadow-sm"> <div className="flex items-center justify-center w-6 h-6 rounded-xl bg-white shadow-sm">
{icon} <Globe className="h-3.5 w-3.5 text-black" />
</div> </div>
</div> </div>
<div className="min-w-0"> <div className="min-w-0">
<p className="font-semibold text-sm leading-tight truncate"> <p className="font-semibold text-sm leading-tight truncate">
{name || t("streamingUnnamedDestination")} {cfg.name || t("streamingUnnamedDestination")}
</p> </p>
<p className="text-xs text-muted-foreground truncate mt-0.5"> <p className="text-xs text-muted-foreground truncate mt-0.5">
{typeLabel} HTTP
</p> </p>
</div> </div>
</div> </div>
@@ -144,9 +105,9 @@ function DestinationCard({
/> />
</div> </div>
{/* Detail preview (URL for HTTP, s3:// path for S3) */} {/* URL preview */}
<p className="text-xs text-muted-foreground truncate"> <p className="text-xs text-muted-foreground truncate">
{detail || ( {cfg.url || (
<span className="italic"> <span className="italic">
{t("streamingNoUrlConfigured")} {t("streamingNoUrlConfigured")}
</span> </span>
@@ -524,7 +485,7 @@ export default function StreamingDestinationsPage() {
if (!v) setDeleteTarget(null); if (!v) setDeleteTarget(null);
}} }}
string={ string={
getDestinationDisplay(deleteTarget).name || parseHttpConfig(deleteTarget.config).name ||
t("streamingDeleteDialogThisDestination") t("streamingDeleteDialogThisDestination")
} }
title={t("streamingDeleteTitle")} title={t("streamingDeleteTitle")}
@@ -532,7 +493,7 @@ export default function StreamingDestinationsPage() {
<p> <p>
{t("streamingDeleteDialogAreYouSure")}{" "} {t("streamingDeleteDialogAreYouSure")}{" "}
<span> <span>
{getDestinationDisplay(deleteTarget).name || {parseHttpConfig(deleteTarget.config).name ||
t("streamingDeleteDialogThisDestination")} t("streamingDeleteDialogThisDestination")}
</span> </span>
{t("streamingDeleteDialogPermanentlyRemoved")} {t("streamingDeleteDialogPermanentlyRemoved")}

View File

@@ -55,9 +55,7 @@ export default async function ProxyResourcesPage(
pagination = responseData.pagination; pagination = responseData.pagination;
} catch (e) {} } catch (e) {}
const siteIdParam = parsePositiveInt( const siteIdParam = parsePositiveInt(searchParams.get("siteId") ?? undefined);
searchParams.get("siteId") ?? undefined
);
let initialFilterSite: { let initialFilterSite: {
siteId: number; siteId: number;
@@ -124,7 +122,6 @@ export default async function ProxyResourcesPage(
domainId: resource.domainId || undefined, domainId: resource.domainId || undefined,
fullDomain: resource.fullDomain ?? null, fullDomain: resource.fullDomain ?? null,
ssl: resource.ssl, ssl: resource.ssl,
wildcard: resource.wildcard,
targets: resource.targets?.map((target) => ({ targets: resource.targets?.map((target) => ({
targetId: target.targetId, targetId: target.targetId,
ip: target.ip, ip: target.ip,

View File

@@ -96,7 +96,6 @@ export type ResourceRow = {
targets?: TargetHealth[]; targets?: TargetHealth[];
health?: "healthy" | "degraded" | "unhealthy" | "unknown"; health?: "healthy" | "degraded" | "unhealthy" | "unknown";
sites: ResourceSiteRow[]; sites: ResourceSiteRow[];
wildcard?: boolean;
}; };
function StatusIcon({ function StatusIcon({
@@ -571,14 +570,10 @@ export default function ProxyResourcesTable({
/> />
) : null} ) : null}
<div className=""> <div className="">
{!resourceRow.wildcard ? ( <CopyToClipboard
<CopyToClipboard text={resourceRow.domain}
text={resourceRow.domain} isLink={true}
isLink={true} />
/>
) : (
<span>{resourceRow.domain}</span>
)}
</div> </div>
</div> </div>
); );

View File

@@ -1,6 +1,6 @@
"use client"; "use client";
import { useState, useEffect } from "react";
import { import {
Credenza, Credenza,
CredenzaBody, CredenzaBody,
@@ -12,62 +12,13 @@ import {
CredenzaTitle CredenzaTitle
} from "@app/components/Credenza"; } from "@app/components/Credenza";
import { Button } from "@app/components/ui/button"; import { Button } from "@app/components/ui/button";
import { Input } from "@app/components/ui/input"; import { ContactSalesBanner } from "@app/components/ContactSalesBanner";
import { Label } from "@app/components/ui/label";
import { Switch } from "@app/components/ui/switch";
import { HorizontalTabs } from "@app/components/HorizontalTabs";
import { RadioGroup, RadioGroupItem } from "@app/components/ui/radio-group";
import { Checkbox } from "@app/components/ui/checkbox";
import { createApiClient, formatAxiosError } from "@app/lib/api";
import { useEnvContext } from "@app/hooks/useEnvContext";
import { toast } from "@app/hooks/useToast";
import { useTranslations } from "next-intl"; import { useTranslations } from "next-intl";
import { Destination } from "@app/components/HttpDestinationCredenza";
// ── Types ──────────────────────────────────────────────────────────────────────
export type S3PayloadFormat = "json_array" | "ndjson" | "csv";
export interface S3Config {
name: string;
accessKeyId: string;
secretAccessKey: string;
region: string;
bucket: string;
prefix: string;
endpoint: string;
format: S3PayloadFormat;
gzip: boolean;
}
// ── Helpers ────────────────────────────────────────────────────────────────────
export const defaultS3Config = (): S3Config => ({
name: "",
accessKeyId: "",
secretAccessKey: "",
region: "us-east-1",
bucket: "",
prefix: "",
endpoint: "",
format: "json_array",
gzip: false
});
export function parseS3Config(raw: string): S3Config {
try {
return { ...defaultS3Config(), ...JSON.parse(raw) };
} catch {
return defaultS3Config();
}
}
// ── Component ──────────────────────────────────────────────────────────────────
export interface S3DestinationCredenzaProps { export interface S3DestinationCredenzaProps {
open: boolean; open: boolean;
onOpenChange: (open: boolean) => void; onOpenChange: (open: boolean) => void;
editing: Destination | null; editing: any;
orgId: string; orgId: string;
onSaved: () => void; onSaved: () => void;
} }
@@ -77,84 +28,18 @@ export function S3DestinationCredenza({
onOpenChange, onOpenChange,
editing, editing,
orgId, orgId,
onSaved onSaved,
}: S3DestinationCredenzaProps) { }: S3DestinationCredenzaProps) {
const api = createApiClient(useEnvContext());
const t = useTranslations(); const t = useTranslations();
const [saving, setSaving] = useState(false);
const [cfg, setCfg] = useState<S3Config>(defaultS3Config());
const [sendAccessLogs, setSendAccessLogs] = useState(false);
const [sendActionLogs, setSendActionLogs] = useState(false);
const [sendConnectionLogs, setSendConnectionLogs] = useState(false);
const [sendRequestLogs, setSendRequestLogs] = useState(false);
useEffect(() => {
if (open) {
setCfg(editing ? parseS3Config(editing.config) : defaultS3Config());
setSendAccessLogs(editing?.sendAccessLogs ?? false);
setSendActionLogs(editing?.sendActionLogs ?? false);
setSendConnectionLogs(editing?.sendConnectionLogs ?? false);
setSendRequestLogs(editing?.sendRequestLogs ?? false);
}
}, [open, editing]);
const update = (patch: Partial<S3Config>) =>
setCfg((prev) => ({ ...prev, ...patch }));
const isValid =
cfg.name.trim() !== "" &&
cfg.accessKeyId.trim() !== "" &&
cfg.secretAccessKey.trim() !== "" &&
cfg.region.trim() !== "" &&
cfg.bucket.trim() !== "";
async function handleSave() {
if (!isValid) return;
setSaving(true);
try {
const payload = {
type: "s3",
config: JSON.stringify(cfg),
sendAccessLogs,
sendActionLogs,
sendConnectionLogs,
sendRequestLogs
};
if (editing) {
await api.post(
`/org/${orgId}/event-streaming-destination/${editing.destinationId}`,
payload
);
toast({ title: t("s3DestUpdatedSuccess") });
} else {
await api.put(
`/org/${orgId}/event-streaming-destination`,
payload
);
toast({ title: t("s3DestCreatedSuccess") });
}
onSaved();
onOpenChange(false);
} catch (e) {
toast({
variant: "destructive",
title: editing
? t("s3DestUpdateFailed")
: t("s3DestCreateFailed"),
description: formatAxiosError(e, t("streamingUnexpectedError"))
});
} finally {
setSaving(false);
}
}
return ( return (
<Credenza open={open} onOpenChange={onOpenChange}> <Credenza open={open} onOpenChange={onOpenChange}>
<CredenzaContent className="sm:max-w-2xl"> <CredenzaContent className="sm:max-w-2xl">
<CredenzaHeader> <CredenzaHeader>
<CredenzaTitle> <CredenzaTitle>
{editing ? t("S3DestEditTitle") : t("S3DestAddTitle")} {editing
? t("S3DestEditTitle")
: t("S3DestAddTitle")}
</CredenzaTitle> </CredenzaTitle>
<CredenzaDescription> <CredenzaDescription>
{editing {editing
@@ -164,367 +49,13 @@ export function S3DestinationCredenza({
</CredenzaHeader> </CredenzaHeader>
<CredenzaBody> <CredenzaBody>
<HorizontalTabs <ContactSalesBanner />
clientSide
items={[
{ title: t("s3DestTabSettings"), href: "" },
{ title: t("s3DestTabFormat"), href: "" },
{ title: t("httpDestTabLogs"), href: "" }
]}
>
{/* ── Settings tab ────────────────────────────── */}
<div className="space-y-6 mt-4 p-1">
{/* Name */}
<div className="space-y-2">
<Label htmlFor="s3-name">
{t("s3DestNameLabel")}
</Label>
<Input
id="s3-name"
placeholder={t("s3DestNamePlaceholder")}
value={cfg.name}
onChange={(e) =>
update({ name: e.target.value })
}
/>
</div>
{/* AWS Access Key ID */}
<div className="space-y-2">
<Label htmlFor="s3-access-key-id">
{t("s3DestAccessKeyIdLabel")}
</Label>
<Input
id="s3-access-key-id"
placeholder="AKIAIOSFODNN7EXAMPLE"
value={cfg.accessKeyId}
onChange={(e) =>
update({
accessKeyId: e.target.value
})
}
autoComplete="off"
/>
</div>
{/* AWS Secret Access Key */}
<div className="space-y-2">
<Label htmlFor="s3-secret-key">
{t("s3DestSecretAccessKeyLabel")}
</Label>
<Input
id="s3-secret-key"
type="password"
placeholder={t(
"s3DestSecretAccessKeyPlaceholder"
)}
value={cfg.secretAccessKey}
onChange={(e) =>
update({
secretAccessKey: e.target.value
})
}
autoComplete="new-password"
/>
</div>
{/* Region */}
<div className="space-y-2">
<Label htmlFor="s3-region">
{t("s3DestRegionLabel")}
</Label>
<Input
id="s3-region"
placeholder="us-east-1"
value={cfg.region}
onChange={(e) =>
update({ region: e.target.value })
}
/>
</div>
{/* Bucket */}
<div className="space-y-2">
<Label htmlFor="s3-bucket">
{t("s3DestBucketLabel")}
</Label>
<Input
id="s3-bucket"
placeholder="my-logs-bucket"
value={cfg.bucket}
onChange={(e) =>
update({ bucket: e.target.value })
}
/>
</div>
{/* Prefix */}
<div className="space-y-2">
<Label htmlFor="s3-prefix">
{t("s3DestPrefixLabel")}
</Label>
<Input
id="s3-prefix"
placeholder="pangolin/logs"
value={cfg.prefix}
onChange={(e) =>
update({ prefix: e.target.value })
}
/>
<p className="text-xs text-muted-foreground">
{t("s3DestPrefixDescription")}
</p>
</div>
{/* Custom endpoint (optional for S3-compatible storage) */}
<div className="space-y-2">
<Label htmlFor="s3-endpoint">
{t("s3DestEndpointLabel")}
</Label>
<Input
id="s3-endpoint"
placeholder="https://s3.example.com"
value={cfg.endpoint}
onChange={(e) =>
update({ endpoint: e.target.value })
}
/>
<p className="text-xs text-muted-foreground">
{t("s3DestEndpointDescription")}
</p>
</div>
</div>
{/* ── Format tab ───────────────────────────────── */}
<div className="space-y-6 mt-4 p-1">
{/* Gzip compression toggle */}
<div className="flex items-start gap-3 rounded-md border p-3">
<Switch
id="s3-gzip"
checked={cfg.gzip}
onCheckedChange={(v) => update({ gzip: v })}
className="mt-0.5"
/>
<div>
<Label
htmlFor="s3-gzip"
className="cursor-pointer font-medium"
>
{t("s3DestGzipLabel")}
</Label>
<p className="text-xs text-muted-foreground mt-0.5">
{t("s3DestGzipDescription")}
</p>
</div>
</div>
{/* Payload format selector */}
<div className="space-y-3">
<div>
<label className="font-medium block">
{t("s3DestFormatTitle")}
</label>
<p className="text-sm text-muted-foreground mt-0.5">
{t("s3DestFormatDescription")}
</p>
</div>
<RadioGroup
value={cfg.format}
onValueChange={(v) =>
update({
format: v as S3PayloadFormat
})
}
className="gap-2"
>
{/* JSON Array */}
<label className="flex items-start gap-3 rounded-md border p-3 cursor-pointer has-[:checked]:border-primary has-[:checked]:bg-primary/5">
<RadioGroupItem
value="json_array"
className="mt-0.5"
/>
<div>
<p className="text-sm font-medium leading-none">
{t(
"httpDestFormatJsonArrayTitle"
)}
</p>
<p className="text-xs text-muted-foreground mt-1">
{t(
"s3DestFormatJsonArrayDescription"
)}
</p>
</div>
</label>
{/* NDJSON */}
<label className="flex items-start gap-3 rounded-md border p-3 cursor-pointer has-[:checked]:border-primary has-[:checked]:bg-primary/5">
<RadioGroupItem
value="ndjson"
className="mt-0.5"
/>
<div>
<p className="text-sm font-medium leading-none">
{t("httpDestFormatNdjsonTitle")}
</p>
<p className="text-xs text-muted-foreground mt-1">
{t(
"s3DestFormatNdjsonDescription"
)}
</p>
</div>
</label>
{/* CSV */}
<label className="flex items-start gap-3 rounded-md border p-3 cursor-pointer has-[:checked]:border-primary has-[:checked]:bg-primary/5">
<RadioGroupItem
value="csv"
className="mt-0.5"
/>
<div>
<p className="text-sm font-medium leading-none">
{t("s3DestFormatCsvTitle")}
</p>
<p className="text-xs text-muted-foreground mt-1">
{t(
"s3DestFormatCsvDescription"
)}
</p>
</div>
</label>
</RadioGroup>
</div>
</div>
{/* ── Logs tab ──────────────────────────────────── */}
<div className="space-y-6 mt-4 p-1">
<div>
<label className="font-medium block">
{t("httpDestLogTypesTitle")}
</label>
<p className="text-sm text-muted-foreground mt-0.5">
{t("httpDestLogTypesDescription")}
</p>
</div>
<div className="space-y-3">
<div className="flex items-start gap-3 rounded-md border p-3">
<Checkbox
id="s3-log-access"
checked={sendAccessLogs}
onCheckedChange={(v) =>
setSendAccessLogs(v === true)
}
className="mt-0.5"
/>
<div>
<Label
htmlFor="s3-log-access"
className="cursor-pointer font-medium"
>
{t("httpDestAccessLogsTitle")}
</Label>
<p className="text-xs text-muted-foreground mt-0.5">
{t("httpDestAccessLogsDescription")}
</p>
</div>
</div>
<div className="flex items-start gap-3 rounded-md border p-3">
<Checkbox
id="s3-log-action"
checked={sendActionLogs}
onCheckedChange={(v) =>
setSendActionLogs(v === true)
}
className="mt-0.5"
/>
<div>
<Label
htmlFor="s3-log-action"
className="cursor-pointer font-medium"
>
{t("httpDestActionLogsTitle")}
</Label>
<p className="text-xs text-muted-foreground mt-0.5">
{t("httpDestActionLogsDescription")}
</p>
</div>
</div>
<div className="flex items-start gap-3 rounded-md border p-3">
<Checkbox
id="s3-log-connection"
checked={sendConnectionLogs}
onCheckedChange={(v) =>
setSendConnectionLogs(v === true)
}
className="mt-0.5"
/>
<div>
<Label
htmlFor="s3-log-connection"
className="cursor-pointer font-medium"
>
{t("httpDestConnectionLogsTitle")}
</Label>
<p className="text-xs text-muted-foreground mt-0.5">
{t(
"httpDestConnectionLogsDescription"
)}
</p>
</div>
</div>
<div className="flex items-start gap-3 rounded-md border p-3">
<Checkbox
id="s3-log-request"
checked={sendRequestLogs}
onCheckedChange={(v) =>
setSendRequestLogs(v === true)
}
className="mt-0.5"
/>
<div>
<Label
htmlFor="s3-log-request"
className="cursor-pointer font-medium"
>
{t("httpDestRequestLogsTitle")}
</Label>
<p className="text-xs text-muted-foreground mt-0.5">
{t(
"httpDestRequestLogsDescription"
)}
</p>
</div>
</div>
</div>
</div>
</HorizontalTabs>
</CredenzaBody> </CredenzaBody>
<CredenzaFooter> <CredenzaFooter>
<CredenzaClose asChild> <CredenzaClose asChild>
<Button <Button variant="outline">{t("cancel")}</Button>
type="button"
variant="outline"
disabled={saving}
>
{t("cancel")}
</Button>
</CredenzaClose> </CredenzaClose>
<Button
type="button"
onClick={handleSave}
loading={saving}
disabled={!isValid || saving}
>
{editing
? t("s3DestSaveChanges")
: t("s3DestCreateDestination")}
</Button>
</CredenzaFooter> </CredenzaFooter>
</CredenzaContent> </CredenzaContent>
</Credenza> </Credenza>