mirror of
https://github.com/fosrl/pangolin.git
synced 2026-03-06 18:56:39 +00:00
Add ping; working on newt tunnel reliability
This commit is contained in:
@@ -153,6 +153,7 @@ export const clients = sqliteTable("clients", {
|
|||||||
megabytesIn: integer("bytesIn"),
|
megabytesIn: integer("bytesIn"),
|
||||||
megabytesOut: integer("bytesOut"),
|
megabytesOut: integer("bytesOut"),
|
||||||
lastBandwidthUpdate: text("lastBandwidthUpdate"),
|
lastBandwidthUpdate: text("lastBandwidthUpdate"),
|
||||||
|
lastPing: text("lastPing"),
|
||||||
type: text("type").notNull(), // "olm"
|
type: text("type").notNull(), // "olm"
|
||||||
online: integer("online", { mode: "boolean" }).notNull().default(false),
|
online: integer("online", { mode: "boolean" }).notNull().default(false),
|
||||||
endpoint: text("endpoint"),
|
endpoint: text("endpoint"),
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
import { handleNewtRegisterMessage, handleReceiveBandwidthMessage } from "./newt";
|
import { handleNewtRegisterMessage, handleReceiveBandwidthMessage, handleGetConfigMessage } from "./newt";
|
||||||
import { handleOlmRegisterMessage, handleOlmRelayMessage } from "./olm";
|
import { handleOlmRegisterMessage, handleOlmRelayMessage, handleOlmPingMessage } from "./olm";
|
||||||
import { handleGetConfigMessage } from "./newt/handleGetConfigMessage";
|
|
||||||
import { MessageHandler } from "./ws";
|
import { MessageHandler } from "./ws";
|
||||||
|
|
||||||
export const messageHandlers: Record<string, MessageHandler> = {
|
export const messageHandlers: Record<string, MessageHandler> = {
|
||||||
@@ -8,5 +7,6 @@ export const messageHandlers: Record<string, MessageHandler> = {
|
|||||||
"olm/wg/register": handleOlmRegisterMessage,
|
"olm/wg/register": handleOlmRegisterMessage,
|
||||||
"newt/wg/get-config": handleGetConfigMessage,
|
"newt/wg/get-config": handleGetConfigMessage,
|
||||||
"newt/receive-bandwidth": handleReceiveBandwidthMessage,
|
"newt/receive-bandwidth": handleReceiveBandwidthMessage,
|
||||||
"olm/wg/relay": handleOlmRelayMessage
|
"olm/wg/relay": handleOlmRelayMessage,
|
||||||
|
"olm/ping": handleOlmPingMessage
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -105,18 +105,22 @@ export const handleGetConfigMessage: MessageHandler = async (context) => {
|
|||||||
.innerJoin(clientSites, eq(clients.clientId, clientSites.clientId))
|
.innerJoin(clientSites, eq(clients.clientId, clientSites.clientId))
|
||||||
.where(eq(clientSites.siteId, siteId));
|
.where(eq(clientSites.siteId, siteId));
|
||||||
|
|
||||||
const now = new Date().getTime() / 1000;
|
|
||||||
const peers = await Promise.all(
|
const peers = await Promise.all(
|
||||||
clientsRes
|
clientsRes
|
||||||
.filter((client) => {
|
.filter((client) => {
|
||||||
// This filter wasn't returning anything - fixed to properly filter clients
|
if (!client.clients.pubKey) {
|
||||||
if (
|
|
||||||
!client.clients.lastHolePunch ||
|
|
||||||
now - client.clients.lastHolePunch > 6
|
|
||||||
) {
|
|
||||||
logger.warn("Client last hole punch is too old");
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
if (!client.clients.subnet) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!client.clients.endpoint) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!client.clients.online) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
})
|
})
|
||||||
.map(async (client) => {
|
.map(async (client) => {
|
||||||
|
|||||||
@@ -37,22 +37,6 @@ export const handleReceiveBandwidthMessage: MessageHandler = async (context) =>
|
|||||||
if (!client) {
|
if (!client) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let online = client.online;
|
|
||||||
|
|
||||||
// if the bandwidth for the client is > 0 then set it to online. if it has been less than 0 (no update) for 5 minutes then set it to offline
|
|
||||||
if (bytesIn > 0) { // only track the bytes in because we are always sending bytes out with persistent keep alive
|
|
||||||
online = true;
|
|
||||||
} else if (client.lastBandwidthUpdate) {
|
|
||||||
const lastBandwidthUpdate = new Date(
|
|
||||||
client.lastBandwidthUpdate
|
|
||||||
);
|
|
||||||
const currentTime = new Date();
|
|
||||||
const diff =
|
|
||||||
currentTime.getTime() - lastBandwidthUpdate.getTime();
|
|
||||||
if (diff < 300000) {
|
|
||||||
online = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update the client's bandwidth usage
|
// Update the client's bandwidth usage
|
||||||
await trx
|
await trx
|
||||||
@@ -61,7 +45,6 @@ export const handleReceiveBandwidthMessage: MessageHandler = async (context) =>
|
|||||||
megabytesOut: (client.megabytesIn || 0) + bytesIn,
|
megabytesOut: (client.megabytesIn || 0) + bytesIn,
|
||||||
megabytesIn: (client.megabytesOut || 0) + bytesOut,
|
megabytesIn: (client.megabytesOut || 0) + bytesOut,
|
||||||
lastBandwidthUpdate: new Date().toISOString(),
|
lastBandwidthUpdate: new Date().toISOString(),
|
||||||
online
|
|
||||||
})
|
})
|
||||||
.where(eq(clients.clientId, client.clientId));
|
.where(eq(clients.clientId, client.clientId));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
export * from "./createNewt";
|
export * from "./createNewt";
|
||||||
export * from "./getNewtToken";
|
export * from "./getNewtToken";
|
||||||
export * from "./handleNewtRegisterMessage";
|
export * from "./handleNewtRegisterMessage";
|
||||||
export* from "./handleReceiveBandwidthMessage";
|
export * from "./handleReceiveBandwidthMessage";
|
||||||
|
export * from "./handleGetConfigMessage";
|
||||||
94
server/routers/olm/handleOlmPingMessage.ts
Normal file
94
server/routers/olm/handleOlmPingMessage.ts
Normal file
@@ -0,0 +1,94 @@
|
|||||||
|
import db from "@server/db";
|
||||||
|
import { MessageHandler } from "../ws";
|
||||||
|
import { clients, Olm } from "@server/db/schema";
|
||||||
|
import { eq, lt, isNull } from "drizzle-orm";
|
||||||
|
import logger from "@server/logger";
|
||||||
|
import { time } from "console";
|
||||||
|
|
||||||
|
// Track if the offline checker interval is running
|
||||||
|
let offlineCheckerInterval: NodeJS.Timeout | null = null;
|
||||||
|
const OFFLINE_CHECK_INTERVAL = 30 * 1000; // Check every 30 seconds
|
||||||
|
const OFFLINE_THRESHOLD_MS = 2 * 60 * 1000; // 2 minutes
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Starts the background interval that checks for clients that haven't pinged recently
|
||||||
|
* and marks them as offline
|
||||||
|
*/
|
||||||
|
export const startOfflineChecker = (): void => {
|
||||||
|
if (offlineCheckerInterval) {
|
||||||
|
return; // Already running
|
||||||
|
}
|
||||||
|
|
||||||
|
offlineCheckerInterval = setInterval(async () => {
|
||||||
|
try {
|
||||||
|
const twoMinutesAgo = new Date(Date.now() - OFFLINE_THRESHOLD_MS);
|
||||||
|
|
||||||
|
// Find clients that haven't pinged in the last 2 minutes and mark them as offline
|
||||||
|
await db
|
||||||
|
.update(clients)
|
||||||
|
.set({ online: false })
|
||||||
|
.where(
|
||||||
|
eq(clients.online, true) &&
|
||||||
|
(lt(clients.lastPing, twoMinutesAgo.toISOString()) || isNull(clients.lastPing))
|
||||||
|
);
|
||||||
|
|
||||||
|
} catch (error) {
|
||||||
|
logger.error("Error in offline checker interval", { error });
|
||||||
|
}
|
||||||
|
}, OFFLINE_CHECK_INTERVAL);
|
||||||
|
|
||||||
|
logger.info("Started offline checker interval");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stops the background interval that checks for offline clients
|
||||||
|
*/
|
||||||
|
export const stopOfflineChecker = (): void => {
|
||||||
|
if (offlineCheckerInterval) {
|
||||||
|
clearInterval(offlineCheckerInterval);
|
||||||
|
offlineCheckerInterval = null;
|
||||||
|
logger.info("Stopped offline checker interval");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handles ping messages from clients and responds with pong
|
||||||
|
*/
|
||||||
|
export const handleOlmPingMessage: MessageHandler = async (context) => {
|
||||||
|
const { message, client: c, sendToClient } = context;
|
||||||
|
const olm = c as Olm;
|
||||||
|
|
||||||
|
if (!olm) {
|
||||||
|
logger.warn("Olm not found");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!olm.clientId) {
|
||||||
|
logger.warn("Olm has no client ID!");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Update the client's last ping timestamp
|
||||||
|
await db
|
||||||
|
.update(clients)
|
||||||
|
.set({
|
||||||
|
lastPing: new Date().toISOString(),
|
||||||
|
online: true,
|
||||||
|
})
|
||||||
|
.where(eq(clients.clientId, olm.clientId));
|
||||||
|
} catch (error) {
|
||||||
|
logger.error("Error handling ping message", { error });
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
message: {
|
||||||
|
type: "pong",
|
||||||
|
data: {
|
||||||
|
timestamp: new Date().toISOString(),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
broadcast: false,
|
||||||
|
excludeSender: false
|
||||||
|
};
|
||||||
|
};
|
||||||
@@ -1,4 +1,5 @@
|
|||||||
export * from "./handleOlmRegisterMessage";
|
export * from "./handleOlmRegisterMessage";
|
||||||
export * from "./getOlmToken";
|
export * from "./getOlmToken";
|
||||||
export * from "./createOlm";
|
export * from "./createOlm";
|
||||||
export * from "./handleOlmRelayMessage";
|
export * from "./handleOlmRelayMessage";
|
||||||
|
export * from "./handleOlmPingMessage";
|
||||||
Reference in New Issue
Block a user