Merge branch 'dev' into resource-policies

This commit is contained in:
Owen
2026-05-12 21:12:40 -07:00
100 changed files with 4604 additions and 1452 deletions

View File

@@ -485,6 +485,133 @@ async function syncAcmeCertsFromHttp(endpoint: string): Promise<void> {
}
}
async function storeCertForDomain(
domain: string,
certPem: string,
keyPem: string,
validatedX509: crypto.X509Certificate
): Promise<void> {
const wildcard = domain.startsWith("*.");
const existing = await db
.select()
.from(certificates)
.where(eq(certificates.domain, domain))
.limit(1);
let oldCertPem: string | null = null;
let oldKeyPem: string | null = null;
if (existing.length > 0 && existing[0].certFile) {
try {
const storedCertPem = decrypt(
existing[0].certFile,
config.getRawConfig().server.secret!
);
const wildcardUnchanged = existing[0].wildcard === wildcard;
if (storedCertPem === certPem && wildcardUnchanged) {
return;
}
oldCertPem = storedCertPem;
if (existing[0].keyFile) {
try {
oldKeyPem = decrypt(
existing[0].keyFile,
config.getRawConfig().server.secret!
);
} catch (keyErr) {
logger.debug(
`acmeCertSync: could not decrypt stored key for ${domain}: ${keyErr}`
);
}
}
} catch (err) {
logger.debug(
`acmeCertSync: could not decrypt stored cert for ${domain}, will update: ${err}`
);
}
}
let expiresAt: number | null = null;
try {
expiresAt = Math.floor(
new Date(validatedX509.validTo).getTime() / 1000
);
} catch (err) {
logger.debug(
`acmeCertSync: could not parse cert expiry for ${domain}: ${err}`
);
}
const encryptedCert = encrypt(
certPem,
config.getRawConfig().server.secret!
);
const encryptedKey = encrypt(keyPem, config.getRawConfig().server.secret!);
const now = Math.floor(Date.now() / 1000);
const domainId = await findDomainId(domain);
if (domainId) {
logger.debug(
`acmeCertSync: resolved domainId "${domainId}" for cert domain "${domain}"`
);
} else {
logger.debug(
`acmeCertSync: no matching domain record found for cert domain "${domain}"`
);
}
if (existing.length > 0) {
logger.debug(
`acmeCertSync: updating existing certificate for ${domain} (expires ${expiresAt ? new Date(expiresAt * 1000).toISOString() : "unknown"})`
);
await db
.update(certificates)
.set({
certFile: encryptedCert,
keyFile: encryptedKey,
status: "valid",
expiresAt,
updatedAt: now,
wildcard,
...(domainId !== null && { domainId })
})
.where(eq(certificates.domain, domain));
logger.debug(
`acmeCertSync: updated certificate for ${domain} (expires ${expiresAt ? new Date(expiresAt * 1000).toISOString() : "unknown"})`
);
await pushCertUpdateToAffectedNewts(
domain,
domainId,
oldCertPem,
oldKeyPem
);
} else {
logger.debug(
`acmeCertSync: inserting new certificate for ${domain} (expires ${expiresAt ? new Date(expiresAt * 1000).toISOString() : "unknown"})`
);
await db.insert(certificates).values({
domain,
domainId,
certFile: encryptedCert,
keyFile: encryptedKey,
status: "valid",
expiresAt,
createdAt: now,
updatedAt: now,
wildcard
});
logger.debug(
`acmeCertSync: inserted new certificate for ${domain} (expires ${expiresAt ? new Date(expiresAt * 1000).toISOString() : "unknown"})`
);
await pushCertUpdateToAffectedNewts(domain, domainId, null, null);
}
}
function findAcmeJsonFiles(dirPath: string): string[] {
const results: string[] = [];
let entries: fs.Dirent[];
@@ -575,18 +702,16 @@ async function syncAcmeCerts(acmeJsonPath: string): Promise<void> {
}
for (const cert of allCerts) {
const domain = cert?.domain?.main;
const mainDomain = cert?.domain?.main;
if (!domain || typeof domain !== "string") {
if (!mainDomain || typeof mainDomain !== "string") {
logger.debug(`acmeCertSync: skipping cert with missing domain`);
continue;
}
const { wildcard } = detectWildcard(domain, cert.domain?.sans);
if (!cert.certificate || !cert.key) {
logger.debug(
`acmeCertSync: skipping cert for ${domain} - empty certificate or key field`
`acmeCertSync: skipping cert for ${mainDomain} - empty certificate or key field`
);
continue;
}
@@ -598,14 +723,14 @@ async function syncAcmeCerts(acmeJsonPath: string): Promise<void> {
keyPem = Buffer.from(cert.key, "base64").toString("utf8");
} catch (err) {
logger.debug(
`acmeCertSync: skipping cert for ${domain} - failed to base64-decode cert/key: ${err}`
`acmeCertSync: skipping cert for ${mainDomain} - failed to base64-decode cert/key: ${err}`
);
continue;
}
if (!certPem.trim() || !keyPem.trim()) {
logger.debug(
`acmeCertSync: skipping cert for ${domain} - blank PEM after base64 decode`
`acmeCertSync: skipping cert for ${mainDomain} - blank PEM after base64 decode`
);
continue;
}
@@ -616,7 +741,7 @@ async function syncAcmeCerts(acmeJsonPath: string): Promise<void> {
const firstCertPemForValidation = extractFirstCert(certPem);
if (!firstCertPemForValidation) {
logger.debug(
`acmeCertSync: skipping cert for ${domain} - no PEM certificate block found`
`acmeCertSync: skipping cert for ${mainDomain} - no PEM certificate block found`
);
continue;
}
@@ -628,7 +753,7 @@ async function syncAcmeCerts(acmeJsonPath: string): Promise<void> {
);
} catch (err) {
logger.debug(
`acmeCertSync: skipping cert for ${domain} - invalid X.509 certificate: ${err}`
`acmeCertSync: skipping cert for ${mainDomain} - invalid X.509 certificate: ${err}`
);
continue;
}
@@ -638,139 +763,40 @@ async function syncAcmeCerts(acmeJsonPath: string): Promise<void> {
crypto.createPrivateKey(keyPem);
} catch (err) {
logger.debug(
`acmeCertSync: skipping cert for ${domain} - invalid private key: ${err}`
`acmeCertSync: skipping cert for ${mainDomain} - invalid private key: ${err}`
);
continue;
}
// Check if cert already exists in DB
const existing = await db
.select()
.from(certificates)
.where(and(eq(certificates.domain, domain)))
.limit(1);
let oldCertPem: string | null = null;
let oldKeyPem: string | null = null;
if (existing.length > 0 && existing[0].certFile) {
try {
const storedCertPem = decrypt(
existing[0].certFile,
config.getRawConfig().server.secret!
);
const wildcardUnchanged = existing[0].wildcard === wildcard;
if (storedCertPem === certPem && wildcardUnchanged) {
// logger.debug(
// `acmeCertSync: cert for ${domain} is unchanged, skipping`
// );
continue;
// Collect all domains covered by this cert: main + every SAN.
// Each domain gets its own row in the certificates table so that
// lookups by any hostname on the cert succeed independently.
const allDomains = new Set<string>([mainDomain]);
if (Array.isArray(cert.domain?.sans)) {
for (const san of cert.domain.sans) {
if (typeof san === "string" && san.trim()) {
allDomains.add(san.trim());
}
// Cert has changed; capture old values so we can send a correct
// update message to the newt after the DB write.
oldCertPem = storedCertPem;
if (existing[0].keyFile) {
try {
oldKeyPem = decrypt(
existing[0].keyFile,
config.getRawConfig().server.secret!
);
} catch (keyErr) {
logger.debug(
`acmeCertSync: could not decrypt stored key for ${domain}: ${keyErr}`
);
}
}
} catch (err) {
// Decryption failure means we should proceed with the update
logger.debug(
`acmeCertSync: could not decrypt stored cert for ${domain}, will update: ${err}`
);
}
}
// Parse cert expiry from the validated X.509 certificate
let expiresAt: number | null = null;
try {
expiresAt = Math.floor(
new Date(validatedX509.validTo).getTime() / 1000
);
} catch (err) {
logger.debug(
`acmeCertSync: could not parse cert expiry for ${domain}: ${err}`
);
}
const encryptedCert = encrypt(
certPem,
config.getRawConfig().server.secret!
logger.debug(
`acmeCertSync: cert for ${mainDomain} covers ${allDomains.size} domain(s): ${[...allDomains].join(", ")}`
);
const encryptedKey = encrypt(
keyPem,
config.getRawConfig().server.secret!
);
const now = Math.floor(Date.now() / 1000);
const domainId = await findDomainId(domain);
if (domainId) {
logger.debug(
`acmeCertSync: resolved domainId "${domainId}" for cert domain "${domain}"`
);
} else {
logger.debug(
`acmeCertSync: no matching domain record found for cert domain "${domain}"`
);
}
if (existing.length > 0) {
logger.debug(
`acmeCertSync: updating existing certificate for ${domain} (expires ${expiresAt ? new Date(expiresAt * 1000).toISOString() : "unknown"})`
);
await db
.update(certificates)
.set({
certFile: encryptedCert,
keyFile: encryptedKey,
status: "valid",
expiresAt,
updatedAt: now,
wildcard,
...(domainId !== null && { domainId })
})
.where(eq(certificates.domain, domain));
logger.debug(
`acmeCertSync: updated certificate for ${domain} (expires ${expiresAt ? new Date(expiresAt * 1000).toISOString() : "unknown"})`
);
await pushCertUpdateToAffectedNewts(
domain,
domainId,
oldCertPem,
oldKeyPem
);
} else {
logger.debug(
`acmeCertSync: inserting new certificate for ${domain} (expires ${expiresAt ? new Date(expiresAt * 1000).toISOString() : "unknown"})`
);
await db.insert(certificates).values({
domain,
domainId,
certFile: encryptedCert,
keyFile: encryptedKey,
status: "valid",
expiresAt,
createdAt: now,
updatedAt: now,
wildcard
});
logger.debug(
`acmeCertSync: inserted new certificate for ${domain} (expires ${expiresAt ? new Date(expiresAt * 1000).toISOString() : "unknown"})`
);
// For a brand-new cert, push to any SSL resources that were waiting for it
await pushCertUpdateToAffectedNewts(domain, domainId, null, null);
for (const domain of allDomains) {
try {
await storeCertForDomain(
domain,
certPem,
keyPem,
validatedX509
);
} catch (err) {
logger.error(
`acmeCertSync: error storing cert for domain "${domain}": ${err}`
);
}
}
}
}

View File

@@ -29,7 +29,10 @@ import { decrypt } from "@server/lib/crypto";
import logger from "@server/logger";
import { sendAlertWebhook } from "./sendAlertWebhook";
import { sendAlertEmail } from "./sendAlertEmail";
import { AlertContext, WebhookAlertConfig } from "@server/routers/alertRule/types";
import {
AlertContext,
WebhookAlertConfig
} from "@server/routers/alertRule/types";
/**
* Core alert processing pipeline.
@@ -99,7 +102,10 @@ export async function processAlerts(context: AlertContext): Promise<void> {
baseConditions,
or(
eq(alertRules.allHealthChecks, true),
eq(alertHealthChecks.healthCheckId, context.healthCheckId)
eq(
alertHealthChecks.healthCheckId,
context.healthCheckId
)
)
)
);
@@ -208,14 +214,19 @@ async function processRule(
for (const action of emailActions) {
try {
const recipients = await resolveEmailRecipients(action.emailActionId);
const recipients = await resolveEmailRecipients(
action.emailActionId
);
if (recipients.length > 0) {
await sendAlertEmail(recipients, context);
await db
.update(alertEmailActions)
.set({ lastSentAt: now })
.where(
eq(alertEmailActions.emailActionId, action.emailActionId)
eq(
alertEmailActions.emailActionId,
action.emailActionId
)
);
}
} catch (err) {
@@ -269,7 +280,7 @@ async function processRule(
)
);
} catch (err) {
logger.error(
logger.warn(
`processAlerts: failed to send alert webhook for action ${action.webhookActionId}`,
err
);
@@ -289,7 +300,9 @@ async function processRule(
* - All users in a role (by `roleId`, resolved via `userOrgRoles`)
* - Direct external email addresses
*/
async function resolveEmailRecipients(emailActionId: number): Promise<string[]> {
async function resolveEmailRecipients(
emailActionId: number
): Promise<string[]> {
const rows = await db
.select()
.from(alertEmailRecipients)

View File

@@ -236,15 +236,43 @@ interface TemplateContext {
}
/**
* Render a body template with {{event}}, {{timestamp}}, {{status}}, and
* {{data}} placeholders, mirroring the logic in HttpLogDestination.
* Render a body template with {{event}}, {{timestamp}}, {{status}}, {{data}},
* and individual data-field placeholders (e.g. {{orgId}}, {{siteId}}, …).
*
* {{data}} is replaced first (as raw JSON) so that any literal "{{…}}"
* strings inside data values are not re-expanded.
* Replacement order:
* 1. {{data}} → raw JSON of the full data object (prevents re-expansion of
* nested values that might look like placeholders).
* 2. Top-level scalar fields from data (string values are JSON-escaped;
* numbers and booleans are rendered as-is). Unknown placeholders are
* left untouched.
* 3. The fixed top-level keys: event, timestamp, status.
*/
function renderTemplate(template: string, ctx: TemplateContext): string {
const rendered = template
.replace(/\{\{data\}\}/g, JSON.stringify(ctx.data))
// Step 1 expand {{data}} first so its contents are already serialised
// and won't be touched by later passes.
let rendered = template.replace(/\{\{data\}\}/g, JSON.stringify(ctx.data));
// Step 2 expand individual data fields. Only replace placeholders whose
// key actually exists in ctx.data; leave everything else as-is.
for (const [key, value] of Object.entries(ctx.data)) {
if (value === null || value === undefined) continue;
const placeholder = new RegExp(
`\\{\\{${key.replace(/[.*+?^${}()|[\]\\]/g, "\\$&")}\\}\\}`,
"g"
);
let serialised: string;
if (typeof value === "string") {
serialised = escapeJsonString(value);
} else if (typeof value === "number" || typeof value === "boolean") {
serialised = String(value);
} else {
serialised = escapeJsonString(JSON.stringify(value));
}
rendered = rendered.replace(placeholder, serialised);
}
// Step 3 expand the fixed top-level keys.
rendered = rendered
.replace(/\{\{event\}\}/g, escapeJsonString(ctx.event))
.replace(/\{\{timestamp\}\}/g, escapeJsonString(ctx.timestamp))
.replace(/\{\{status\}\}/g, escapeJsonString(ctx.status));

View File

@@ -97,6 +97,13 @@ export class PrivateConfig {
);
}
process.env.BRANDING_HIDE_POWERED_BY =
this.rawPrivateConfig.branding?.hide_powered_by === true ||
this.rawPrivateConfig.branding?.resource_auth_page
?.hide_powered_by === true
? "true"
: "false";
process.env.LOGIN_PAGE_SUBTITLE_TEXT =
this.rawPrivateConfig.branding?.login_page?.subtitle_text || "";

View File

@@ -46,6 +46,7 @@ export interface ConnectionLogRecord {
orgId: string;
siteId: number;
clientId: number | null;
clientEndpoint: string | null;
userId: string | null;
sourceAddr: string;
destAddr: string;

View File

@@ -30,10 +30,12 @@ import {
LOG_TYPES,
LogEvent,
DestinationFailureState,
HttpConfig
HttpConfig,
S3Config
} from "./types";
import { LogDestinationProvider } from "./providers/LogDestinationProvider";
import { HttpLogDestination } from "./providers/HttpLogDestination";
import { S3LogDestination } from "./providers/S3LogDestination";
import type { EventStreamingDestination } from "@server/db";
// ---------------------------------------------------------------------------
@@ -72,11 +74,11 @@ const MAX_CATCHUP_BATCHES = 20;
* After the last entry the max value is re-used.
*/
const BACKOFF_SCHEDULE_MS = [
60_000, // 1 min (failure 1)
2 * 60_000, // 2 min (failure 2)
5 * 60_000, // 5 min (failure 3)
10 * 60_000, // 10 min (failure 4)
30 * 60_000 // 30 min (failure 5+)
60_000, // 1 min (failure 1)
2 * 60_000, // 2 min (failure 2)
5 * 60_000, // 5 min (failure 3)
10 * 60_000, // 10 min (failure 4)
30 * 60_000 // 30 min (failure 5+)
];
/**
@@ -204,7 +206,10 @@ export class LogStreamingManager {
this.pollTimer = null;
this.runPoll()
.catch((err) =>
logger.error("LogStreamingManager: unexpected poll error", err)
logger.error(
"LogStreamingManager: unexpected poll error",
err
)
)
.finally(() => {
if (this.isRunning) {
@@ -275,10 +280,13 @@ export class LogStreamingManager {
}
// Decrypt and parse config skip destination if either step fails
let configFromDb: HttpConfig;
let configFromDb: unknown;
try {
const decryptedConfig = decrypt(dest.config, config.getRawConfig().server.secret!);
configFromDb = JSON.parse(decryptedConfig) as HttpConfig;
const decryptedConfig = decrypt(
dest.config,
config.getRawConfig().server.secret!
);
configFromDb = JSON.parse(decryptedConfig);
} catch (err) {
logger.error(
`LogStreamingManager: destination ${dest.destinationId} has invalid or undecryptable config`,
@@ -305,6 +313,7 @@ export class LogStreamingManager {
if (enabledTypes.length === 0) return;
let anyFailure = false;
let firstError: string | null = null;
for (const logType of enabledTypes) {
if (!this.isRunning) break;
@@ -312,6 +321,10 @@ export class LogStreamingManager {
await this.processLogType(dest, provider, logType);
} catch (err) {
anyFailure = true;
if (firstError === null) {
firstError =
err instanceof Error ? err.message : String(err);
}
logger.error(
`LogStreamingManager: failed to process "${logType}" logs ` +
`for destination ${dest.destinationId}`,
@@ -322,6 +335,10 @@ export class LogStreamingManager {
if (anyFailure) {
this.recordFailure(dest.destinationId);
await this.setDestinationError(
dest.destinationId,
firstError ?? "Unknown error"
);
} else {
// Any success resets the failure/back-off state
if (this.failures.has(dest.destinationId)) {
@@ -330,6 +347,7 @@ export class LogStreamingManager {
`LogStreamingManager: destination ${dest.destinationId} recovered`
);
}
await this.clearDestinationError(dest.destinationId);
}
}
@@ -362,7 +380,10 @@ export class LogStreamingManager {
.from(eventStreamingCursors)
.where(
and(
eq(eventStreamingCursors.destinationId, dest.destinationId),
eq(
eventStreamingCursors.destinationId,
dest.destinationId
),
eq(eventStreamingCursors.logType, logType)
)
)
@@ -431,9 +452,7 @@ export class LogStreamingManager {
if (rows.length === 0) break;
const events = rows.map((row) =>
this.rowToLogEvent(logType, row)
);
const events = rows.map((row) => this.rowToLogEvent(logType, row));
// Throws on failure caught by the caller which applies back-off
await provider.send(events);
@@ -677,8 +696,7 @@ export class LogStreamingManager {
break;
}
const orgId =
typeof row.orgId === "string" ? row.orgId : "";
const orgId = typeof row.orgId === "string" ? row.orgId : "";
return {
id: row.id,
@@ -708,6 +726,8 @@ export class LogStreamingManager {
switch (type) {
case "http":
return new HttpLogDestination(config as HttpConfig);
case "s3":
return new S3LogDestination(config as S3Config);
// Future providers:
// case "datadog": return new DatadogLogDestination(config as DatadogConfig);
default:
@@ -749,6 +769,45 @@ export class LogStreamingManager {
// DB helpers
// -------------------------------------------------------------------------
private async setDestinationError(
destinationId: number,
errorMessage: string
): Promise<void> {
// Truncate to 1000 chars so it fits comfortably in the text column.
const truncated = errorMessage.slice(0, 1000);
try {
await db
.update(eventStreamingDestinations)
.set({ lastError: truncated, lastErrorAt: Date.now() })
.where(
eq(eventStreamingDestinations.destinationId, destinationId)
);
} catch (err) {
logger.warn(
`LogStreamingManager: could not persist error status for destination ${destinationId}`,
err
);
}
}
private async clearDestinationError(destinationId: number): Promise<void> {
try {
// Only update if there is actually an error stored, to avoid
// unnecessary writes on every successful poll cycle.
await db
.update(eventStreamingDestinations)
.set({ lastError: null, lastErrorAt: null })
.where(
eq(eventStreamingDestinations.destinationId, destinationId)
);
} catch (err) {
logger.warn(
`LogStreamingManager: could not clear error status for destination ${destinationId}`,
err
);
}
}
private async loadEnabledDestinations(): Promise<
EventStreamingDestination[]
> {

View File

@@ -0,0 +1,279 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.
* You may not use this file except in compliance with the License.
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
*
* This file is not licensed under the AGPLv3.
*/
import { S3Client, PutObjectCommand } from "@aws-sdk/client-s3";
import { gzip as gzipCallback } from "zlib";
import { promisify } from "util";
import { randomUUID } from "crypto";
import logger from "@server/logger";
import { LogEvent, S3Config, S3PayloadFormat } from "../types";
import { LogDestinationProvider } from "./LogDestinationProvider";
const gzipAsync = promisify(gzipCallback);
// ---------------------------------------------------------------------------
// Constants
// ---------------------------------------------------------------------------
/** Maximum time (ms) to wait for a single S3 PutObject response. */
const REQUEST_TIMEOUT_MS = 60_000;
/** Default payload format when none is specified in the config. */
const DEFAULT_FORMAT: S3PayloadFormat = "json_array";
// ---------------------------------------------------------------------------
// S3LogDestination
// ---------------------------------------------------------------------------
/**
* Forwards a batch of log events to an S3-compatible object store by
* uploading a single object per `send()` call.
*
* **Object key layout**
* ```
* {prefix}/{logType}/{YYYY}/{MM}/{DD}/{HH}-{mm}-{ss}-{uuid}.{ext}[.gz]
* ```
* - `prefix` from `config.prefix` (default: empty key starts at logType)
* - `logType` one of "request", "action", "access", "connection"
* - Date components are derived from the upload time (UTC)
* - `ext` `json` | `ndjson` | `csv`
* - `.gz` appended when `config.gzip` is true
*
* **Payload formats** (controlled by `config.format`):
* - `json_array` (default) body is a JSON array of event objects.
* - `ndjson` one JSON object per line (newline-delimited).
* - `csv` RFC-4180 CSV with a header row; columns are the
* union of all field names in the batch's event data.
*
* **Compression**: when `config.gzip` is `true` the body is gzip-compressed
* before upload and `Content-Encoding: gzip` is set on the object.
*
* **Custom endpoint**: set `config.endpoint` to target any S3-compatible
* storage service (e.g. MinIO, Cloudflare R2).
*/
export class S3LogDestination implements LogDestinationProvider {
readonly type = "s3";
private readonly config: S3Config;
constructor(config: S3Config) {
this.config = config;
}
// -----------------------------------------------------------------------
// LogDestinationProvider implementation
// -----------------------------------------------------------------------
async send(events: LogEvent[]): Promise<void> {
if (events.length === 0) return;
const format = this.config.format ?? DEFAULT_FORMAT;
const useGzip = this.config.gzip ?? false;
const logType = events[0].logType;
const rawBody = this.serialize(events, format);
const bodyBuffer = Buffer.from(rawBody, "utf-8");
let uploadBody: Buffer;
let contentEncoding: string | undefined;
if (useGzip) {
uploadBody = (await gzipAsync(bodyBuffer)) as Buffer;
contentEncoding = "gzip";
} else {
uploadBody = bodyBuffer;
}
const key = this.buildObjectKey(logType, format, useGzip);
const contentType = this.contentType(format);
const clientConfig: ConstructorParameters<typeof S3Client>[0] = {
region: this.config.region,
credentials: {
accessKeyId: this.config.accessKeyId,
secretAccessKey: this.config.secretAccessKey
},
requestHandler: {
requestTimeout: REQUEST_TIMEOUT_MS
}
};
if (this.config.endpoint?.trim()) {
clientConfig.endpoint = this.config.endpoint.trim();
}
const client = new S3Client(clientConfig);
try {
await client.send(
new PutObjectCommand({
Bucket: this.config.bucket,
Key: key,
Body: uploadBody,
ContentType: contentType,
...(contentEncoding
? { ContentEncoding: contentEncoding }
: {})
})
);
} catch (err: unknown) {
const msg = err instanceof Error ? err.message : String(err);
throw new Error(
`S3LogDestination: failed to upload object "${key}" ` +
`to bucket "${this.config.bucket}" ${msg}`
);
}
}
// -----------------------------------------------------------------------
// Internal helpers
// -----------------------------------------------------------------------
/**
* Construct a unique S3 object key for the given log type and format.
* Keys are partitioned by logType and date so they can be queried or
* lifecycle-managed independently.
*/
private buildObjectKey(
logType: string,
format: S3PayloadFormat,
gzip: boolean
): string {
const now = new Date();
const year = now.getUTCFullYear();
const month = String(now.getUTCMonth() + 1).padStart(2, "0");
const day = String(now.getUTCDate()).padStart(2, "0");
const hh = String(now.getUTCHours()).padStart(2, "0");
const mm = String(now.getUTCMinutes()).padStart(2, "0");
const ss = String(now.getUTCSeconds()).padStart(2, "0");
const uid = randomUUID();
const ext =
format === "csv" ? "csv" : format === "ndjson" ? "ndjson" : "json";
const fileName = `${hh}-${mm}-${ss}-${uid}.${ext}${gzip ? ".gz" : ""}`;
const rawPrefix = (this.config.prefix ?? "").trim().replace(/\/+$/, "");
const parts = [
rawPrefix,
logType,
`${year}/${month}/${day}`,
fileName
].filter((p) => p !== "");
return parts.join("/");
}
private contentType(format: S3PayloadFormat): string {
switch (format) {
case "csv":
return "text/csv; charset=utf-8";
case "ndjson":
return "application/x-ndjson";
default:
return "application/json";
}
}
private serialize(events: LogEvent[], format: S3PayloadFormat): string {
switch (format) {
case "json_array":
return JSON.stringify(events.map(toPayload));
case "ndjson":
return events
.map((e) => JSON.stringify(toPayload(e)))
.join("\n");
case "csv":
return toCsv(events);
}
}
}
// ---------------------------------------------------------------------------
// Payload helpers
// ---------------------------------------------------------------------------
function toPayload(event: LogEvent): unknown {
return {
event: event.logType,
timestamp: new Date(event.timestamp * 1000).toISOString(),
data: event.data
};
}
/**
* Convert a batch of events to RFC-4180 CSV.
*
* The column set is the union of `event`, `timestamp`, and all keys present in
* `event.data` across the batch, preserving insertion order. Values that
* contain commas, double-quotes, or newlines are quoted and escaped.
*/
function toCsv(events: LogEvent[]): string {
if (events.length === 0) return "";
// Collect all unique data keys in stable order
const keySet = new LinkedSet<string>();
keySet.add("event");
keySet.add("timestamp");
for (const e of events) {
for (const k of Object.keys(e.data)) {
keySet.add(k);
}
}
const headers = keySet.toArray();
const rows: string[] = [headers.map(csvEscape).join(",")];
for (const e of events) {
const flat: Record<string, unknown> = {
event: e.logType,
timestamp: new Date(e.timestamp * 1000).toISOString(),
...e.data
};
rows.push(
headers.map((h) => csvEscape(flattenValue(flat[h]))).join(",")
);
}
return rows.join("\n");
}
/** Flatten a value to a plain string suitable for a CSV cell. */
function flattenValue(value: unknown): string {
if (value === null || value === undefined) return "";
if (typeof value === "object") return JSON.stringify(value);
return String(value);
}
/** RFC-4180 CSV escaping. */
function csvEscape(value: string): string {
if (/[",\n\r]/.test(value)) {
return `"${value.replace(/"/g, '""')}"`;
}
return value;
}
// ---------------------------------------------------------------------------
// Minimal ordered set (preserves insertion order, deduplicates)
// ---------------------------------------------------------------------------
class LinkedSet<T> {
private readonly map = new Map<T, true>();
add(value: T): void {
this.map.set(value, true);
}
toArray(): T[] {
return Array.from(this.map.keys());
}
}

View File

@@ -107,6 +107,40 @@ export interface HttpConfig {
bodyTemplate?: string;
}
// ---------------------------------------------------------------------------
// S3 destination configuration
// ---------------------------------------------------------------------------
/**
* Controls how the batch of events is serialised into each S3 object.
*
* - `json_array` `[{…}, {…}]` default; each object is a JSON array.
* - `ndjson` `{…}\n{…}` newline-delimited JSON, one object per line.
* - `csv` RFC-4180 CSV with a header row derived from the event fields.
*/
export type S3PayloadFormat = "json_array" | "ndjson" | "csv";
export interface S3Config {
/** Human-readable label for the destination */
name: string;
/** AWS Access Key ID */
accessKeyId: string;
/** AWS Secret Access Key */
secretAccessKey: string;
/** AWS region (e.g. "us-east-1") */
region: string;
/** Target S3 bucket name */
bucket: string;
/** Optional key prefix appended before the auto-generated path */
prefix?: string;
/** Override the S3 endpoint for S3-compatible storage (e.g. MinIO, R2) */
endpoint?: string;
/** How events are serialised into each object. Defaults to "json_array". */
format: S3PayloadFormat;
/** Whether to gzip-compress the object before upload. */
gzip: boolean;
}
// ---------------------------------------------------------------------------
// Per-destination per-log-type cursor (reflects the DB table)
// ---------------------------------------------------------------------------

View File

@@ -141,6 +141,7 @@ export const privateConfigSchema = z
)
.optional(),
hide_auth_layout_footer: z.boolean().optional().default(false),
hide_powered_by: z.boolean().optional(),
login_page: z
.object({
subtitle_text: z.string().optional()

View File

@@ -124,15 +124,11 @@ function getWhere(data: Q) {
data.clientId
? eq(connectionAuditLog.clientId, data.clientId)
: undefined,
data.siteId
? eq(connectionAuditLog.siteId, data.siteId)
: undefined,
data.siteId ? eq(connectionAuditLog.siteId, data.siteId) : undefined,
data.siteResourceId
? eq(connectionAuditLog.siteResourceId, data.siteResourceId)
: undefined,
data.userId
? eq(connectionAuditLog.userId, data.userId)
: undefined
data.userId ? eq(connectionAuditLog.userId, data.userId) : undefined
);
}
@@ -144,6 +140,7 @@ export function queryConnection(data: Q) {
orgId: connectionAuditLog.orgId,
siteId: connectionAuditLog.siteId,
clientId: connectionAuditLog.clientId,
clientEndpoint: connectionAuditLog.clientEndpoint,
userId: connectionAuditLog.userId,
sourceAddr: connectionAuditLog.sourceAddr,
destAddr: connectionAuditLog.destAddr,
@@ -203,10 +200,7 @@ async function enrichWithDetails(
];
// Fetch resource details from main database
const resourceMap = new Map<
number,
{ name: string; niceId: string }
>();
const resourceMap = new Map<number, { name: string; niceId: string }>();
if (siteResourceIds.length > 0) {
const resourceDetails = await primaryDb
.select({
@@ -268,10 +262,7 @@ async function enrichWithDetails(
}
// Fetch user details from main database
const userMap = new Map<
string,
{ email: string | null }
>();
const userMap = new Map<string, { email: string | null }>();
if (userIds.length > 0) {
const userDetails = await primaryDb
.select({
@@ -290,29 +281,25 @@ async function enrichWithDetails(
return logs.map((log) => ({
...log,
resourceName: log.siteResourceId
? resourceMap.get(log.siteResourceId)?.name ?? null
? (resourceMap.get(log.siteResourceId)?.name ?? null)
: null,
resourceNiceId: log.siteResourceId
? resourceMap.get(log.siteResourceId)?.niceId ?? null
: null,
siteName: log.siteId
? siteMap.get(log.siteId)?.name ?? null
? (resourceMap.get(log.siteResourceId)?.niceId ?? null)
: null,
siteName: log.siteId ? (siteMap.get(log.siteId)?.name ?? null) : null,
siteNiceId: log.siteId
? siteMap.get(log.siteId)?.niceId ?? null
? (siteMap.get(log.siteId)?.niceId ?? null)
: null,
clientName: log.clientId
? clientMap.get(log.clientId)?.name ?? null
? (clientMap.get(log.clientId)?.name ?? null)
: null,
clientNiceId: log.clientId
? clientMap.get(log.clientId)?.niceId ?? null
? (clientMap.get(log.clientId)?.niceId ?? null)
: null,
clientType: log.clientId
? clientMap.get(log.clientId)?.type ?? null
? (clientMap.get(log.clientId)?.type ?? null)
: null,
userEmail: log.userId
? userMap.get(log.userId)?.email ?? null
: null
userEmail: log.userId ? (userMap.get(log.userId)?.email ?? null) : null
}));
}
@@ -521,4 +508,4 @@ export async function queryConnectionAuditLogs(
createHttpError(HttpCode.INTERNAL_SERVER_ERROR, "An error occurred")
);
}
}
}

View File

@@ -51,6 +51,8 @@ export type ListEventStreamingDestinationsResponse = {
type: string;
config: string;
enabled: boolean;
lastError: string | null;
lastErrorAt: number | null;
createdAt: number;
updatedAt: number;
sendConnectionLogs: boolean;
@@ -79,7 +81,8 @@ async function query(orgId: string, limit: number, offset: number) {
registry.registerPath({
method: "get",
path: "/org/{orgId}/event-streaming-destination",
description: "List all event streaming destinations for a specific organization.",
description:
"List all event streaming destinations for a specific organization.",
tags: [OpenAPITags.Org],
request: {
query: querySchema,

View File

@@ -11,7 +11,7 @@
* This file is not licensed under the AGPLv3.
*/
import { db } from "@server/db";
import { clientSitesAssociationsCache, db } from "@server/db";
import { MessageHandler } from "@server/routers/ws";
import { sites, Newt, clients, orgs } from "@server/db";
import { and, eq, inArray } from "drizzle-orm";
@@ -146,7 +146,11 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => {
// each unique sourceAddr + the org's CIDR suffix and do a targeted IN query.
const ipToClient = new Map<
string,
{ clientId: number; userId: string | null }
{
clientId: number;
userId: string | null;
clientEndpoint: string | null;
}
>();
if (cidrSuffix) {
@@ -172,9 +176,21 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => {
.select({
clientId: clients.clientId,
userId: clients.userId,
subnet: clients.subnet
subnet: clients.subnet,
clientEndpoint: clientSitesAssociationsCache.endpoint
})
.from(clients)
.leftJoin(
// this should be one to one
clientSitesAssociationsCache,
and(
eq(
clients.clientId,
clientSitesAssociationsCache.clientId
),
eq(clientSitesAssociationsCache.siteId, newt.siteId)
)
)
.where(
and(
eq(clients.orgId, orgId),
@@ -189,7 +205,8 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => {
);
ipToClient.set(ip, {
clientId: c.clientId,
userId: c.userId
userId: c.userId,
clientEndpoint: c.clientEndpoint
});
}
}
@@ -234,6 +251,7 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => {
orgId,
siteId: newt.siteId,
clientId: clientInfo?.clientId ?? null,
clientEndpoint: clientInfo?.clientEndpoint ?? null,
userId: clientInfo?.userId ?? null,
sourceAddr: session.sourceAddr,
destAddr: session.destAddr,

View File

@@ -14,7 +14,7 @@
import { Request, Response, NextFunction } from "express";
import { z } from "zod";
import stoi from "@server/lib/stoi";
import { clients, db } from "@server/db";
import { clients, db, primaryDb, Client } from "@server/db";
import { userOrgRoles, userOrgs, roles } from "@server/db";
import { eq, and } from "drizzle-orm";
import response from "@server/lib/response";
@@ -98,15 +98,6 @@ export async function addUserRole(
);
}
if (existingUser[0].isOwner) {
return next(
createHttpError(
HttpCode.FORBIDDEN,
"Cannot change the role of the owner of the organization"
)
);
}
const roleExists = await db
.select()
.from(roles)
@@ -122,8 +113,12 @@ export async function addUserRole(
);
}
let newUserRole: { userId: string; orgId: string; roleId: number } | null =
null;
let newUserRole: {
userId: string;
orgId: string;
roleId: number;
} | null = null;
let orgClientsToRebuild: Client[] = [];
await db.transaction(async (trx) => {
const inserted = await trx
.insert(userOrgRoles)
@@ -149,11 +144,19 @@ export async function addUserRole(
)
);
for (const orgClient of orgClients) {
await rebuildClientAssociationsFromClient(orgClient, trx);
}
orgClientsToRebuild = orgClients;
});
for (const orgClient of orgClientsToRebuild) {
rebuildClientAssociationsFromClient(orgClient, primaryDb).catch(
(e) => {
logger.error(
`Failed to rebuild client associations for client ${orgClient.clientId} after adding role: ${e}`
);
}
);
}
return response(res, {
data: newUserRole ?? { userId, orgId: role.orgId, roleId },
success: true,

View File

@@ -14,7 +14,7 @@
import { Request, Response, NextFunction } from "express";
import { z } from "zod";
import stoi from "@server/lib/stoi";
import { db } from "@server/db";
import { db, primaryDb, Client } from "@server/db";
import { userOrgRoles, userOrgs, roles, clients } from "@server/db";
import { eq, and } from "drizzle-orm";
import response from "@server/lib/response";
@@ -98,11 +98,11 @@ export async function removeUserRole(
);
}
if (existingUser.isOwner) {
if (existingUser.isOwner && role.isAdmin === true) {
return next(
createHttpError(
HttpCode.FORBIDDEN,
"Cannot change the roles of the owner of the organization"
"Cannot remove the administrator role from the organization owner"
)
);
}
@@ -129,6 +129,7 @@ export async function removeUserRole(
}
}
let orgClientsToRebuild: Client[] = [];
await db.transaction(async (trx) => {
await trx
.delete(userOrgRoles)
@@ -150,11 +151,19 @@ export async function removeUserRole(
)
);
for (const orgClient of orgClients) {
await rebuildClientAssociationsFromClient(orgClient, trx);
}
orgClientsToRebuild = orgClients;
});
for (const orgClient of orgClientsToRebuild) {
rebuildClientAssociationsFromClient(orgClient, primaryDb).catch(
(e) => {
logger.error(
`Failed to rebuild client associations for client ${orgClient.clientId} after removing role: ${e}`
);
}
);
}
return response(res, {
data: { userId, orgId: role.orgId, roleId },
success: true,

View File

@@ -13,7 +13,7 @@
import { Request, Response, NextFunction } from "express";
import { z } from "zod";
import { clients, db } from "@server/db";
import { clients, db, primaryDb, Client } from "@server/db";
import { userOrgRoles, userOrgs, roles } from "@server/db";
import { eq, and, inArray } from "drizzle-orm";
import response from "@server/lib/response";
@@ -87,17 +87,8 @@ export async function setUserOrgRoles(
);
}
if (existingUser.isOwner) {
return next(
createHttpError(
HttpCode.FORBIDDEN,
"Cannot change the roles of the owner of the organization"
)
);
}
const orgRoles = await db
.select({ roleId: roles.roleId })
.select({ roleId: roles.roleId, isAdmin: roles.isAdmin })
.from(roles)
.where(
and(
@@ -115,6 +106,19 @@ export async function setUserOrgRoles(
);
}
if (existingUser.isOwner) {
const hasAdminRole = orgRoles.some((r) => r.isAdmin === true);
if (!hasAdminRole) {
return next(
createHttpError(
HttpCode.FORBIDDEN,
"The organization owner must retain an administrator role"
)
);
}
}
let orgClientsToRebuild: Client[] = [];
await db.transaction(async (trx) => {
await trx
.delete(userOrgRoles)
@@ -142,11 +146,19 @@ export async function setUserOrgRoles(
and(eq(clients.userId, userId), eq(clients.orgId, orgId))
);
for (const orgClient of orgClients) {
await rebuildClientAssociationsFromClient(orgClient, trx);
}
orgClientsToRebuild = orgClients;
});
for (const orgClient of orgClientsToRebuild) {
rebuildClientAssociationsFromClient(orgClient, primaryDb).catch(
(e) => {
logger.error(
`Failed to rebuild client associations for client ${orgClient.clientId} after setting roles: ${e}`
);
}
);
}
return response(res, {
data: { userId, orgId, roleIds: uniqueRoleIds },
success: true,