mirror of
https://github.com/fosrl/pangolin.git
synced 2026-04-16 06:46:37 +00:00
Transititioning the hc table and firing the alerts
This commit is contained in:
@@ -201,6 +201,7 @@ export async function buildTargetConfigurationForNewtClient(siteId: number) {
|
||||
internalPort: targets.internalPort,
|
||||
enabled: targets.enabled,
|
||||
protocol: resources.protocol,
|
||||
hcId: targetHealthCheck.targetHealthCheckId,
|
||||
hcEnabled: targetHealthCheck.hcEnabled,
|
||||
hcPath: targetHealthCheck.hcPath,
|
||||
hcScheme: targetHealthCheck.hcScheme,
|
||||
@@ -272,6 +273,7 @@ export async function buildTargetConfigurationForNewtClient(siteId: number) {
|
||||
|
||||
return {
|
||||
id: target.targetId,
|
||||
hcId: target.hcId,
|
||||
hcEnabled: target.hcEnabled,
|
||||
hcPath: target.hcPath,
|
||||
hcScheme: target.hcScheme,
|
||||
|
||||
@@ -4,7 +4,7 @@ import {
|
||||
} from "#dynamic/routers/ws";
|
||||
import { eq, lt, isNull, and, or, ne, not } from "drizzle-orm";
|
||||
import logger from "@server/logger";
|
||||
import { fireSiteOfflineAlert } from "#dynamic/lib/alerts";
|
||||
import { fireSiteOfflineAlert, fireSiteOnlineAlert } from "#dynamic/lib/alerts";
|
||||
|
||||
// Track if the offline checker interval is running
|
||||
let offlineCheckerInterval: NodeJS.Timeout | null = null;
|
||||
@@ -101,6 +101,8 @@ export const startNewtOfflineChecker = (): void => {
|
||||
.targetHealthCheckId
|
||||
)
|
||||
);
|
||||
|
||||
// TODO: should we be firing an alert here when the health check goes to unknown?
|
||||
}
|
||||
|
||||
await fireSiteOfflineAlert(staleSite.orgId, staleSite.siteId, staleSite.name);
|
||||
@@ -111,6 +113,8 @@ export const startNewtOfflineChecker = (): void => {
|
||||
const allWireguardSites = await db
|
||||
.select({
|
||||
siteId: sites.siteId,
|
||||
orgId: sites.orgId,
|
||||
name: sites.name,
|
||||
online: sites.online,
|
||||
lastBandwidthUpdate: sites.lastBandwidthUpdate
|
||||
})
|
||||
@@ -142,6 +146,8 @@ export const startNewtOfflineChecker = (): void => {
|
||||
.update(sites)
|
||||
.set({ online: false })
|
||||
.where(eq(sites.siteId, site.siteId));
|
||||
|
||||
await fireSiteOfflineAlert(site.orgId, site.siteId, site.name);
|
||||
} else if (
|
||||
lastBandwidthUpdate >= wireguardOfflineThreshold &&
|
||||
!site.online
|
||||
@@ -154,6 +160,8 @@ export const startNewtOfflineChecker = (): void => {
|
||||
.update(sites)
|
||||
.set({ online: true })
|
||||
.where(eq(sites.siteId, site.siteId));
|
||||
|
||||
await fireSiteOnlineAlert(site.orgId, site.siteId, site.name);
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
import { db } from "@server/db";
|
||||
import { sites, clients, olms } from "@server/db";
|
||||
import { inArray } from "drizzle-orm";
|
||||
import { and, eq, inArray } from "drizzle-orm";
|
||||
import logger from "@server/logger";
|
||||
import { fireSiteOnlineAlert } from "#dynamic/lib/alerts";
|
||||
|
||||
/**
|
||||
* Ping Accumulator
|
||||
@@ -110,15 +111,44 @@ async function flushSitePingsToDb(): Promise<void> {
|
||||
const siteIds = batch.map(([id]) => id);
|
||||
|
||||
try {
|
||||
await withRetry(async () => {
|
||||
await db
|
||||
const newlyOnlineSites = await withRetry(async () => {
|
||||
// Only update sites that were offline — these are the
|
||||
// offline→online transitions. .returning() gives us exactly
|
||||
// the site IDs that changed state.
|
||||
const transitioned = await db
|
||||
.update(sites)
|
||||
.set({
|
||||
online: true,
|
||||
lastPing: maxTimestamp
|
||||
})
|
||||
.where(inArray(sites.siteId, siteIds));
|
||||
.where(
|
||||
and(
|
||||
inArray(sites.siteId, siteIds),
|
||||
eq(sites.online, false)
|
||||
)
|
||||
)
|
||||
.returning({ siteId: sites.siteId, orgId: sites.orgId, name: sites.name });
|
||||
|
||||
// Update lastPing for sites that were already online.
|
||||
// After the update above, the newly-online sites now have
|
||||
// online = true, so this catches all remaining sites in the
|
||||
// batch and keeps lastPing current for them too.
|
||||
await db
|
||||
.update(sites)
|
||||
.set({ lastPing: maxTimestamp })
|
||||
.where(
|
||||
and(
|
||||
inArray(sites.siteId, siteIds),
|
||||
eq(sites.online, true)
|
||||
)
|
||||
);
|
||||
|
||||
return transitioned;
|
||||
}, "flushSitePingsToDb");
|
||||
|
||||
for (const site of newlyOnlineSites) {
|
||||
await fireSiteOnlineAlert(site.orgId, site.siteId, site.name);
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`Failed to flush site ping batch (${batch.length} sites), re-queuing for next cycle`,
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import { Target, TargetHealthCheck, db, targetHealthCheck } from "@server/db";
|
||||
import { Target, TargetHealthCheck } from "@server/db";
|
||||
import { sendToClient } from "#dynamic/routers/ws";
|
||||
import logger from "@server/logger";
|
||||
import { eq, inArray } from "drizzle-orm";
|
||||
import { canCompress } from "@server/lib/clientVersionChecks";
|
||||
|
||||
export async function addTargets(
|
||||
@@ -18,17 +17,23 @@ export async function addTargets(
|
||||
}:${target.port}`;
|
||||
});
|
||||
|
||||
await sendToClient(newtId, {
|
||||
type: `newt/${protocol}/add`,
|
||||
data: {
|
||||
targets: payloadTargets
|
||||
}
|
||||
}, { incrementConfigVersion: true, compress: canCompress(version, "newt") });
|
||||
await sendToClient(
|
||||
newtId,
|
||||
{
|
||||
type: `newt/${protocol}/add`,
|
||||
data: {
|
||||
targets: payloadTargets
|
||||
}
|
||||
},
|
||||
{ incrementConfigVersion: true, compress: canCompress(version, "newt") }
|
||||
);
|
||||
|
||||
// Create a map for quick lookup
|
||||
const healthCheckMap = new Map<number, TargetHealthCheck>();
|
||||
healthCheckData.forEach((hc) => {
|
||||
healthCheckMap.set(hc.targetId, hc);
|
||||
if (hc.targetId !== null) {
|
||||
healthCheckMap.set(hc.targetId, hc);
|
||||
}
|
||||
});
|
||||
|
||||
const healthCheckTargets = targets.map((target) => {
|
||||
@@ -79,6 +84,7 @@ export async function addTargets(
|
||||
|
||||
return {
|
||||
id: target.targetId,
|
||||
hcId: hc.targetHealthCheckId,
|
||||
hcEnabled: hc.hcEnabled,
|
||||
hcPath: hc.hcPath,
|
||||
hcScheme: hc.hcScheme,
|
||||
@@ -102,12 +108,16 @@ export async function addTargets(
|
||||
(target) => target !== null
|
||||
);
|
||||
|
||||
await sendToClient(newtId, {
|
||||
type: `newt/healthcheck/add`,
|
||||
data: {
|
||||
targets: validHealthCheckTargets
|
||||
}
|
||||
}, { incrementConfigVersion: true, compress: canCompress(version, "newt") });
|
||||
await sendToClient(
|
||||
newtId,
|
||||
{
|
||||
type: `newt/healthcheck/add`,
|
||||
data: {
|
||||
targets: validHealthCheckTargets
|
||||
}
|
||||
},
|
||||
{ incrementConfigVersion: true, compress: canCompress(version, "newt") }
|
||||
);
|
||||
}
|
||||
|
||||
export async function removeTargets(
|
||||
@@ -123,21 +133,29 @@ export async function removeTargets(
|
||||
}:${target.port}`;
|
||||
});
|
||||
|
||||
await sendToClient(newtId, {
|
||||
type: `newt/${protocol}/remove`,
|
||||
data: {
|
||||
targets: payloadTargets
|
||||
}
|
||||
}, { incrementConfigVersion: true });
|
||||
await sendToClient(
|
||||
newtId,
|
||||
{
|
||||
type: `newt/${protocol}/remove`,
|
||||
data: {
|
||||
targets: payloadTargets
|
||||
}
|
||||
},
|
||||
{ incrementConfigVersion: true }
|
||||
);
|
||||
|
||||
const healthCheckTargets = targets.map((target) => {
|
||||
return target.targetId;
|
||||
});
|
||||
|
||||
await sendToClient(newtId, {
|
||||
type: `newt/healthcheck/remove`,
|
||||
data: {
|
||||
ids: healthCheckTargets
|
||||
}
|
||||
}, { incrementConfigVersion: true, compress: canCompress(version, "newt") });
|
||||
await sendToClient(
|
||||
newtId,
|
||||
{
|
||||
type: `newt/healthcheck/remove`,
|
||||
data: {
|
||||
ids: healthCheckTargets
|
||||
}
|
||||
},
|
||||
{ incrementConfigVersion: true, compress: canCompress(version, "newt") }
|
||||
);
|
||||
}
|
||||
|
||||
@@ -275,8 +275,8 @@ export async function createTarget(
|
||||
|
||||
return response<CreateTargetResponse>(res, {
|
||||
data: {
|
||||
...newTarget[0],
|
||||
...healthCheck[0]
|
||||
...healthCheck[0],
|
||||
...newTarget[0]
|
||||
},
|
||||
success: true,
|
||||
error: false,
|
||||
|
||||
@@ -15,8 +15,8 @@ const getTargetSchema = z.strictObject({
|
||||
});
|
||||
|
||||
type GetTargetResponse = Target &
|
||||
Omit<TargetHealthCheck, "hcHeaders"> & {
|
||||
hcHeaders: { name: string; value: string }[] | null;
|
||||
Partial<Omit<TargetHealthCheck, "hcHeaders" | "targetId">> & {
|
||||
hcHeaders: { name: string; value: string }[] | null | undefined;
|
||||
};
|
||||
|
||||
registry.registerPath({
|
||||
@@ -70,20 +70,19 @@ export async function getTarget(
|
||||
.limit(1);
|
||||
|
||||
// Parse hcHeaders from JSON string back to array
|
||||
let parsedHcHeaders = null;
|
||||
let parsedHcHeaders: { name: string; value: string }[] | null = null;
|
||||
if (targetHc?.hcHeaders) {
|
||||
try {
|
||||
parsedHcHeaders = JSON.parse(targetHc.hcHeaders);
|
||||
} catch (error) {
|
||||
// If parsing fails, keep as string for backward compatibility
|
||||
parsedHcHeaders = targetHc.hcHeaders;
|
||||
// If parsing fails, keep as null for safety
|
||||
}
|
||||
}
|
||||
|
||||
return response<GetTargetResponse>(res, {
|
||||
data: {
|
||||
...target[0],
|
||||
...targetHc,
|
||||
...target[0],
|
||||
hcHeaders: parsedHcHeaders
|
||||
},
|
||||
success: true,
|
||||
|
||||
@@ -3,7 +3,10 @@ import { MessageHandler } from "@server/routers/ws";
|
||||
import { Newt } from "@server/db";
|
||||
import { eq, and } from "drizzle-orm";
|
||||
import logger from "@server/logger";
|
||||
import { unknown } from "zod";
|
||||
import {
|
||||
fireHealthCheckHealthyAlert,
|
||||
fireHealthCheckNotHealthyAlert
|
||||
} from "#dynamic/lib/alerts";
|
||||
|
||||
interface TargetHealthStatus {
|
||||
status: string;
|
||||
@@ -11,7 +14,7 @@ interface TargetHealthStatus {
|
||||
checkCount: number;
|
||||
lastError?: string;
|
||||
config: {
|
||||
id: string;
|
||||
id: string; // this could be the hc id or the target id, depending on the version of newt
|
||||
hcEnabled: boolean;
|
||||
hcPath?: string;
|
||||
hcScheme?: string;
|
||||
@@ -23,6 +26,9 @@ interface TargetHealthStatus {
|
||||
hcTimeout?: number;
|
||||
hcHeaders?: any;
|
||||
hcMethod?: string;
|
||||
hcTlsServerName?: string;
|
||||
hcHealthyThreshold?: number;
|
||||
hcUnhealthyThreshold?: number;
|
||||
};
|
||||
}
|
||||
|
||||
@@ -78,6 +84,10 @@ export const handleHealthcheckStatusMessage: MessageHandler = async (
|
||||
.select({
|
||||
targetId: targets.targetId,
|
||||
siteId: targets.siteId,
|
||||
orgId: targetHealthCheck.orgId,
|
||||
targetHealthCheckId: targetHealthCheck.targetHealthCheckId,
|
||||
resourceOrgId: resources.orgId,
|
||||
name: targetHealthCheck.name,
|
||||
hcStatus: targetHealthCheck.hcHealth
|
||||
})
|
||||
.from(targets)
|
||||
@@ -86,7 +96,10 @@ export const handleHealthcheckStatusMessage: MessageHandler = async (
|
||||
eq(targets.resourceId, resources.resourceId)
|
||||
)
|
||||
.innerJoin(sites, eq(targets.siteId, sites.siteId))
|
||||
.innerJoin(targetHealthCheck, eq(targets.targetId, targetHealthCheck.targetId))
|
||||
.innerJoin(
|
||||
targetHealthCheck,
|
||||
eq(targets.targetId, targetHealthCheck.targetId)
|
||||
)
|
||||
.where(
|
||||
and(
|
||||
eq(targets.targetId, targetIdNum),
|
||||
@@ -123,6 +136,21 @@ export const handleHealthcheckStatusMessage: MessageHandler = async (
|
||||
.where(eq(targetHealthCheck.targetId, targetIdNum))
|
||||
.execute();
|
||||
|
||||
// because we are checking above if there was a change we can fire the alert here because it changed
|
||||
if (healthStatus.status === "unhealthy") {
|
||||
await fireHealthCheckHealthyAlert(
|
||||
targetCheck.orgId || targetCheck.resourceOrgId, // for backwards compatibility, check both orgId fields because the target health checks dont have the orgId
|
||||
targetCheck.targetHealthCheckId,
|
||||
targetCheck.name
|
||||
);
|
||||
} else if (healthStatus.status === "healthy") {
|
||||
await fireHealthCheckNotHealthyAlert(
|
||||
targetCheck.orgId || targetCheck.resourceOrgId, // for backwards compatibility, check both orgId fields because the target health checks dont have the orgId
|
||||
targetCheck.targetHealthCheckId,
|
||||
targetCheck.name
|
||||
);
|
||||
}
|
||||
|
||||
logger.debug(
|
||||
`Updated health status for target ${targetId} to ${healthStatus.status}`
|
||||
);
|
||||
|
||||
Reference in New Issue
Block a user