From a04e2a5e009eb0bdddc0da4d1fe07ebe151fe595 Mon Sep 17 00:00:00 2001 From: Owen Date: Wed, 15 Apr 2026 17:46:04 -0700 Subject: [PATCH] Transititioning the hc table and firing the alerts --- server/db/pg/schema/schema.ts | 10 ++- server/db/sqlite/schema/schema.ts | 5 +- server/routers/newt/buildConfiguration.ts | 2 + server/routers/newt/offlineChecker.ts | 10 ++- server/routers/newt/pingAccumulator.ts | 38 ++++++++-- server/routers/newt/targets.ts | 72 ++++++++++++------- server/routers/target/createTarget.ts | 4 +- server/routers/target/getTarget.ts | 11 ++- .../target/handleHealthcheckStatusMessage.ts | 34 ++++++++- 9 files changed, 139 insertions(+), 47 deletions(-) diff --git a/server/db/pg/schema/schema.ts b/server/db/pg/schema/schema.ts index c542af33b..f39a125a3 100644 --- a/server/db/pg/schema/schema.ts +++ b/server/db/pg/schema/schema.ts @@ -186,9 +186,13 @@ export const targets = pgTable("targets", { export const targetHealthCheck = pgTable("targetHealthCheck", { targetHealthCheckId: serial("targetHealthCheckId").primaryKey(), - targetId: integer("targetId") - .notNull() - .references(() => targets.targetId, { onDelete: "cascade" }), + targetId: integer("targetId").references(() => targets.targetId, { + onDelete: "cascade" + }), + orgId: varchar("orgId").references(() => orgs.orgId, { + onDelete: "cascade" + }), + name: varchar("name"), hcEnabled: boolean("hcEnabled").notNull().default(false), hcPath: varchar("hcPath"), hcScheme: varchar("hcScheme"), diff --git a/server/db/sqlite/schema/schema.ts b/server/db/sqlite/schema/schema.ts index 5ec932c3c..9939f0309 100644 --- a/server/db/sqlite/schema/schema.ts +++ b/server/db/sqlite/schema/schema.ts @@ -210,8 +210,11 @@ export const targetHealthCheck = sqliteTable("targetHealthCheck", { autoIncrement: true }), targetId: integer("targetId") - .notNull() .references(() => targets.targetId, { onDelete: "cascade" }), + orgId: text("orgId").references(() => orgs.orgId, { + onDelete: "cascade" + }), + name: text("name").notNull(), hcEnabled: integer("hcEnabled", { mode: "boolean" }) .notNull() .default(false), diff --git a/server/routers/newt/buildConfiguration.ts b/server/routers/newt/buildConfiguration.ts index fc0abd9cf..28b6373e0 100644 --- a/server/routers/newt/buildConfiguration.ts +++ b/server/routers/newt/buildConfiguration.ts @@ -201,6 +201,7 @@ export async function buildTargetConfigurationForNewtClient(siteId: number) { internalPort: targets.internalPort, enabled: targets.enabled, protocol: resources.protocol, + hcId: targetHealthCheck.targetHealthCheckId, hcEnabled: targetHealthCheck.hcEnabled, hcPath: targetHealthCheck.hcPath, hcScheme: targetHealthCheck.hcScheme, @@ -272,6 +273,7 @@ export async function buildTargetConfigurationForNewtClient(siteId: number) { return { id: target.targetId, + hcId: target.hcId, hcEnabled: target.hcEnabled, hcPath: target.hcPath, hcScheme: target.hcScheme, diff --git a/server/routers/newt/offlineChecker.ts b/server/routers/newt/offlineChecker.ts index 20e924efa..3343a92fd 100644 --- a/server/routers/newt/offlineChecker.ts +++ b/server/routers/newt/offlineChecker.ts @@ -4,7 +4,7 @@ import { } from "#dynamic/routers/ws"; import { eq, lt, isNull, and, or, ne, not } from "drizzle-orm"; import logger from "@server/logger"; -import { fireSiteOfflineAlert } from "#dynamic/lib/alerts"; +import { fireSiteOfflineAlert, fireSiteOnlineAlert } from "#dynamic/lib/alerts"; // Track if the offline checker interval is running let offlineCheckerInterval: NodeJS.Timeout | null = null; @@ -101,6 +101,8 @@ export const startNewtOfflineChecker = (): void => { .targetHealthCheckId ) ); + + // TODO: should we be firing an alert here when the health check goes to unknown? } await fireSiteOfflineAlert(staleSite.orgId, staleSite.siteId, staleSite.name); @@ -111,6 +113,8 @@ export const startNewtOfflineChecker = (): void => { const allWireguardSites = await db .select({ siteId: sites.siteId, + orgId: sites.orgId, + name: sites.name, online: sites.online, lastBandwidthUpdate: sites.lastBandwidthUpdate }) @@ -142,6 +146,8 @@ export const startNewtOfflineChecker = (): void => { .update(sites) .set({ online: false }) .where(eq(sites.siteId, site.siteId)); + + await fireSiteOfflineAlert(site.orgId, site.siteId, site.name); } else if ( lastBandwidthUpdate >= wireguardOfflineThreshold && !site.online @@ -154,6 +160,8 @@ export const startNewtOfflineChecker = (): void => { .update(sites) .set({ online: true }) .where(eq(sites.siteId, site.siteId)); + + await fireSiteOnlineAlert(site.orgId, site.siteId, site.name); } } } catch (error) { diff --git a/server/routers/newt/pingAccumulator.ts b/server/routers/newt/pingAccumulator.ts index fe2cde216..8f2154c39 100644 --- a/server/routers/newt/pingAccumulator.ts +++ b/server/routers/newt/pingAccumulator.ts @@ -1,7 +1,8 @@ import { db } from "@server/db"; import { sites, clients, olms } from "@server/db"; -import { inArray } from "drizzle-orm"; +import { and, eq, inArray } from "drizzle-orm"; import logger from "@server/logger"; +import { fireSiteOnlineAlert } from "#dynamic/lib/alerts"; /** * Ping Accumulator @@ -110,15 +111,44 @@ async function flushSitePingsToDb(): Promise { const siteIds = batch.map(([id]) => id); try { - await withRetry(async () => { - await db + const newlyOnlineSites = await withRetry(async () => { + // Only update sites that were offline — these are the + // offline→online transitions. .returning() gives us exactly + // the site IDs that changed state. + const transitioned = await db .update(sites) .set({ online: true, lastPing: maxTimestamp }) - .where(inArray(sites.siteId, siteIds)); + .where( + and( + inArray(sites.siteId, siteIds), + eq(sites.online, false) + ) + ) + .returning({ siteId: sites.siteId, orgId: sites.orgId, name: sites.name }); + + // Update lastPing for sites that were already online. + // After the update above, the newly-online sites now have + // online = true, so this catches all remaining sites in the + // batch and keeps lastPing current for them too. + await db + .update(sites) + .set({ lastPing: maxTimestamp }) + .where( + and( + inArray(sites.siteId, siteIds), + eq(sites.online, true) + ) + ); + + return transitioned; }, "flushSitePingsToDb"); + + for (const site of newlyOnlineSites) { + await fireSiteOnlineAlert(site.orgId, site.siteId, site.name); + } } catch (error) { logger.error( `Failed to flush site ping batch (${batch.length} sites), re-queuing for next cycle`, diff --git a/server/routers/newt/targets.ts b/server/routers/newt/targets.ts index afc983472..cd0814bb8 100644 --- a/server/routers/newt/targets.ts +++ b/server/routers/newt/targets.ts @@ -1,7 +1,6 @@ -import { Target, TargetHealthCheck, db, targetHealthCheck } from "@server/db"; +import { Target, TargetHealthCheck } from "@server/db"; import { sendToClient } from "#dynamic/routers/ws"; import logger from "@server/logger"; -import { eq, inArray } from "drizzle-orm"; import { canCompress } from "@server/lib/clientVersionChecks"; export async function addTargets( @@ -18,17 +17,23 @@ export async function addTargets( }:${target.port}`; }); - await sendToClient(newtId, { - type: `newt/${protocol}/add`, - data: { - targets: payloadTargets - } - }, { incrementConfigVersion: true, compress: canCompress(version, "newt") }); + await sendToClient( + newtId, + { + type: `newt/${protocol}/add`, + data: { + targets: payloadTargets + } + }, + { incrementConfigVersion: true, compress: canCompress(version, "newt") } + ); // Create a map for quick lookup const healthCheckMap = new Map(); healthCheckData.forEach((hc) => { - healthCheckMap.set(hc.targetId, hc); + if (hc.targetId !== null) { + healthCheckMap.set(hc.targetId, hc); + } }); const healthCheckTargets = targets.map((target) => { @@ -79,6 +84,7 @@ export async function addTargets( return { id: target.targetId, + hcId: hc.targetHealthCheckId, hcEnabled: hc.hcEnabled, hcPath: hc.hcPath, hcScheme: hc.hcScheme, @@ -102,12 +108,16 @@ export async function addTargets( (target) => target !== null ); - await sendToClient(newtId, { - type: `newt/healthcheck/add`, - data: { - targets: validHealthCheckTargets - } - }, { incrementConfigVersion: true, compress: canCompress(version, "newt") }); + await sendToClient( + newtId, + { + type: `newt/healthcheck/add`, + data: { + targets: validHealthCheckTargets + } + }, + { incrementConfigVersion: true, compress: canCompress(version, "newt") } + ); } export async function removeTargets( @@ -123,21 +133,29 @@ export async function removeTargets( }:${target.port}`; }); - await sendToClient(newtId, { - type: `newt/${protocol}/remove`, - data: { - targets: payloadTargets - } - }, { incrementConfigVersion: true }); + await sendToClient( + newtId, + { + type: `newt/${protocol}/remove`, + data: { + targets: payloadTargets + } + }, + { incrementConfigVersion: true } + ); const healthCheckTargets = targets.map((target) => { return target.targetId; }); - await sendToClient(newtId, { - type: `newt/healthcheck/remove`, - data: { - ids: healthCheckTargets - } - }, { incrementConfigVersion: true, compress: canCompress(version, "newt") }); + await sendToClient( + newtId, + { + type: `newt/healthcheck/remove`, + data: { + ids: healthCheckTargets + } + }, + { incrementConfigVersion: true, compress: canCompress(version, "newt") } + ); } diff --git a/server/routers/target/createTarget.ts b/server/routers/target/createTarget.ts index 129a70abf..a4d2e7e54 100644 --- a/server/routers/target/createTarget.ts +++ b/server/routers/target/createTarget.ts @@ -275,8 +275,8 @@ export async function createTarget( return response(res, { data: { - ...newTarget[0], - ...healthCheck[0] + ...healthCheck[0], + ...newTarget[0] }, success: true, error: false, diff --git a/server/routers/target/getTarget.ts b/server/routers/target/getTarget.ts index 749e1399b..281c39906 100644 --- a/server/routers/target/getTarget.ts +++ b/server/routers/target/getTarget.ts @@ -15,8 +15,8 @@ const getTargetSchema = z.strictObject({ }); type GetTargetResponse = Target & - Omit & { - hcHeaders: { name: string; value: string }[] | null; + Partial> & { + hcHeaders: { name: string; value: string }[] | null | undefined; }; registry.registerPath({ @@ -70,20 +70,19 @@ export async function getTarget( .limit(1); // Parse hcHeaders from JSON string back to array - let parsedHcHeaders = null; + let parsedHcHeaders: { name: string; value: string }[] | null = null; if (targetHc?.hcHeaders) { try { parsedHcHeaders = JSON.parse(targetHc.hcHeaders); } catch (error) { - // If parsing fails, keep as string for backward compatibility - parsedHcHeaders = targetHc.hcHeaders; + // If parsing fails, keep as null for safety } } return response(res, { data: { - ...target[0], ...targetHc, + ...target[0], hcHeaders: parsedHcHeaders }, success: true, diff --git a/server/routers/target/handleHealthcheckStatusMessage.ts b/server/routers/target/handleHealthcheckStatusMessage.ts index 7ea1730ce..ef2244c39 100644 --- a/server/routers/target/handleHealthcheckStatusMessage.ts +++ b/server/routers/target/handleHealthcheckStatusMessage.ts @@ -3,7 +3,10 @@ import { MessageHandler } from "@server/routers/ws"; import { Newt } from "@server/db"; import { eq, and } from "drizzle-orm"; import logger from "@server/logger"; -import { unknown } from "zod"; +import { + fireHealthCheckHealthyAlert, + fireHealthCheckNotHealthyAlert +} from "#dynamic/lib/alerts"; interface TargetHealthStatus { status: string; @@ -11,7 +14,7 @@ interface TargetHealthStatus { checkCount: number; lastError?: string; config: { - id: string; + id: string; // this could be the hc id or the target id, depending on the version of newt hcEnabled: boolean; hcPath?: string; hcScheme?: string; @@ -23,6 +26,9 @@ interface TargetHealthStatus { hcTimeout?: number; hcHeaders?: any; hcMethod?: string; + hcTlsServerName?: string; + hcHealthyThreshold?: number; + hcUnhealthyThreshold?: number; }; } @@ -78,6 +84,10 @@ export const handleHealthcheckStatusMessage: MessageHandler = async ( .select({ targetId: targets.targetId, siteId: targets.siteId, + orgId: targetHealthCheck.orgId, + targetHealthCheckId: targetHealthCheck.targetHealthCheckId, + resourceOrgId: resources.orgId, + name: targetHealthCheck.name, hcStatus: targetHealthCheck.hcHealth }) .from(targets) @@ -86,7 +96,10 @@ export const handleHealthcheckStatusMessage: MessageHandler = async ( eq(targets.resourceId, resources.resourceId) ) .innerJoin(sites, eq(targets.siteId, sites.siteId)) - .innerJoin(targetHealthCheck, eq(targets.targetId, targetHealthCheck.targetId)) + .innerJoin( + targetHealthCheck, + eq(targets.targetId, targetHealthCheck.targetId) + ) .where( and( eq(targets.targetId, targetIdNum), @@ -123,6 +136,21 @@ export const handleHealthcheckStatusMessage: MessageHandler = async ( .where(eq(targetHealthCheck.targetId, targetIdNum)) .execute(); + // because we are checking above if there was a change we can fire the alert here because it changed + if (healthStatus.status === "unhealthy") { + await fireHealthCheckHealthyAlert( + targetCheck.orgId || targetCheck.resourceOrgId, // for backwards compatibility, check both orgId fields because the target health checks dont have the orgId + targetCheck.targetHealthCheckId, + targetCheck.name + ); + } else if (healthStatus.status === "healthy") { + await fireHealthCheckNotHealthyAlert( + targetCheck.orgId || targetCheck.resourceOrgId, // for backwards compatibility, check both orgId fields because the target health checks dont have the orgId + targetCheck.targetHealthCheckId, + targetCheck.name + ); + } + logger.debug( `Updated health status for target ${targetId} to ${healthStatus.status}` );