mirror of
https://github.com/fosrl/pangolin.git
synced 2026-03-02 08:46:38 +00:00
Merge dev into fix/log-analytics-adjustments
This commit is contained in:
@@ -37,7 +37,14 @@ import { validateRemoteExitNodeSessionToken } from "#private/auth/sessions/remot
|
||||
import { rateLimitService } from "#private/lib/rateLimit";
|
||||
import { messageHandlers } from "@server/routers/ws/messageHandlers";
|
||||
import { messageHandlers as privateMessageHandlers } from "#private/routers/ws/messageHandlers";
|
||||
import { AuthenticatedWebSocket, ClientType, WSMessage, TokenPayload, WebSocketRequest, RedisMessage } from "@server/routers/ws";
|
||||
import {
|
||||
AuthenticatedWebSocket,
|
||||
ClientType,
|
||||
WSMessage,
|
||||
TokenPayload,
|
||||
WebSocketRequest,
|
||||
RedisMessage
|
||||
} from "@server/routers/ws";
|
||||
import { validateSessionToken } from "@server/auth/sessions/app";
|
||||
|
||||
// Merge public and private message handlers
|
||||
@@ -55,9 +62,9 @@ const processMessage = async (
|
||||
try {
|
||||
const message: WSMessage = JSON.parse(data.toString());
|
||||
|
||||
logger.debug(
|
||||
`Processing message from ${clientType.toUpperCase()} ID: ${clientId}, type: ${message.type}`
|
||||
);
|
||||
// logger.debug(
|
||||
// `Processing message from ${clientType.toUpperCase()} ID: ${clientId}, type: ${message.type}`
|
||||
// );
|
||||
|
||||
if (!message.type || typeof message.type !== "string") {
|
||||
throw new Error("Invalid message format: missing or invalid type");
|
||||
@@ -216,7 +223,7 @@ const initializeRedisSubscription = async (): Promise<void> => {
|
||||
// Each node is responsible for restoring its own connection state to Redis
|
||||
// This approach is more efficient than cross-node coordination because:
|
||||
// 1. Each node knows its own connections (source of truth)
|
||||
// 2. No network overhead from broadcasting state between nodes
|
||||
// 2. No network overhead from broadcasting state between nodes
|
||||
// 3. No race conditions from simultaneous updates
|
||||
// 4. Redis becomes eventually consistent as each node restores independently
|
||||
// 5. Simpler logic with better fault tolerance
|
||||
@@ -233,8 +240,10 @@ const recoverConnectionState = async (): Promise<void> => {
|
||||
// Each node simply restores its own local connections to Redis
|
||||
// This is the source of truth - no need for cross-node coordination
|
||||
await restoreLocalConnectionsToRedis();
|
||||
|
||||
logger.info("Redis connection state recovery completed - restored local state");
|
||||
|
||||
logger.info(
|
||||
"Redis connection state recovery completed - restored local state"
|
||||
);
|
||||
} catch (error) {
|
||||
logger.error("Error during Redis recovery:", error);
|
||||
} finally {
|
||||
@@ -251,8 +260,10 @@ const restoreLocalConnectionsToRedis = async (): Promise<void> => {
|
||||
try {
|
||||
// Restore all current local connections to Redis
|
||||
for (const [clientId, clients] of connectedClients.entries()) {
|
||||
const validClients = clients.filter(client => client.readyState === WebSocket.OPEN);
|
||||
|
||||
const validClients = clients.filter(
|
||||
(client) => client.readyState === WebSocket.OPEN
|
||||
);
|
||||
|
||||
if (validClients.length > 0) {
|
||||
// Add this node to the client's connection list
|
||||
await redisManager.sadd(getConnectionsKey(clientId), NODE_ID);
|
||||
@@ -303,7 +314,10 @@ const addClient = async (
|
||||
Date.now().toString()
|
||||
);
|
||||
} catch (error) {
|
||||
logger.error("Failed to add client to Redis tracking (connection still functional locally):", error);
|
||||
logger.error(
|
||||
"Failed to add client to Redis tracking (connection still functional locally):",
|
||||
error
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -326,9 +340,14 @@ const removeClient = async (
|
||||
if (redisManager.isRedisEnabled()) {
|
||||
try {
|
||||
await redisManager.srem(getConnectionsKey(clientId), NODE_ID);
|
||||
await redisManager.del(getNodeConnectionsKey(NODE_ID, clientId));
|
||||
await redisManager.del(
|
||||
getNodeConnectionsKey(NODE_ID, clientId)
|
||||
);
|
||||
} catch (error) {
|
||||
logger.error("Failed to remove client from Redis tracking (cleanup will occur on recovery):", error);
|
||||
logger.error(
|
||||
"Failed to remove client from Redis tracking (cleanup will occur on recovery):",
|
||||
error
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -345,7 +364,10 @@ const removeClient = async (
|
||||
ws.connectionId
|
||||
);
|
||||
} catch (error) {
|
||||
logger.error("Failed to remove specific connection from Redis tracking:", error);
|
||||
logger.error(
|
||||
"Failed to remove specific connection from Redis tracking:",
|
||||
error
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -372,7 +394,9 @@ const sendToClientLocal = async (
|
||||
}
|
||||
});
|
||||
|
||||
logger.debug(`sendToClient: Message type ${message.type} sent to clientId ${clientId}`);
|
||||
logger.debug(
|
||||
`sendToClient: Message type ${message.type} sent to clientId ${clientId}`
|
||||
);
|
||||
|
||||
return true;
|
||||
};
|
||||
@@ -411,14 +435,22 @@ const sendToClient = async (
|
||||
fromNodeId: NODE_ID
|
||||
};
|
||||
|
||||
await redisManager.publish(REDIS_CHANNEL, JSON.stringify(redisMessage));
|
||||
await redisManager.publish(
|
||||
REDIS_CHANNEL,
|
||||
JSON.stringify(redisMessage)
|
||||
);
|
||||
} catch (error) {
|
||||
logger.error("Failed to send message via Redis, message may be lost:", error);
|
||||
logger.error(
|
||||
"Failed to send message via Redis, message may be lost:",
|
||||
error
|
||||
);
|
||||
// Continue execution - local delivery already attempted
|
||||
}
|
||||
} else if (!localSent && !redisManager.isRedisEnabled()) {
|
||||
// Redis is disabled or unavailable - log that we couldn't deliver to remote nodes
|
||||
logger.debug(`Could not deliver message to ${clientId} - not connected locally and Redis unavailable`);
|
||||
logger.debug(
|
||||
`Could not deliver message to ${clientId} - not connected locally and Redis unavailable`
|
||||
);
|
||||
}
|
||||
|
||||
return localSent;
|
||||
@@ -441,13 +473,21 @@ const broadcastToAllExcept = async (
|
||||
fromNodeId: NODE_ID
|
||||
};
|
||||
|
||||
await redisManager.publish(REDIS_CHANNEL, JSON.stringify(redisMessage));
|
||||
await redisManager.publish(
|
||||
REDIS_CHANNEL,
|
||||
JSON.stringify(redisMessage)
|
||||
);
|
||||
} catch (error) {
|
||||
logger.error("Failed to broadcast message via Redis, remote nodes may not receive it:", error);
|
||||
logger.error(
|
||||
"Failed to broadcast message via Redis, remote nodes may not receive it:",
|
||||
error
|
||||
);
|
||||
// Continue execution - local broadcast already completed
|
||||
}
|
||||
} else {
|
||||
logger.debug("Redis unavailable - broadcast limited to local node only");
|
||||
logger.debug(
|
||||
"Redis unavailable - broadcast limited to local node only"
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -512,8 +552,10 @@ const verifyToken = async (
|
||||
return null;
|
||||
}
|
||||
|
||||
if (olm.userId) { // this is a user device and we need to check the user token
|
||||
const { session: userSession, user } = await validateSessionToken(userToken);
|
||||
if (olm.userId) {
|
||||
// this is a user device and we need to check the user token
|
||||
const { session: userSession, user } =
|
||||
await validateSessionToken(userToken);
|
||||
if (!userSession || !user) {
|
||||
return null;
|
||||
}
|
||||
@@ -668,7 +710,7 @@ const handleWSUpgrade = (server: HttpServer): void => {
|
||||
url.searchParams.get("token") ||
|
||||
request.headers["sec-websocket-protocol"] ||
|
||||
"";
|
||||
const userToken = url.searchParams.get('userToken') || '';
|
||||
const userToken = url.searchParams.get("userToken") || "";
|
||||
let clientType = url.searchParams.get(
|
||||
"clientType"
|
||||
) as ClientType;
|
||||
@@ -690,7 +732,11 @@ const handleWSUpgrade = (server: HttpServer): void => {
|
||||
return;
|
||||
}
|
||||
|
||||
const tokenPayload = await verifyToken(token, clientType, userToken);
|
||||
const tokenPayload = await verifyToken(
|
||||
token,
|
||||
clientType,
|
||||
userToken
|
||||
);
|
||||
if (!tokenPayload) {
|
||||
logger.debug(
|
||||
"Unauthorized connection attempt: invalid token..."
|
||||
@@ -724,50 +770,68 @@ const handleWSUpgrade = (server: HttpServer): void => {
|
||||
// Add periodic connection state sync to handle Redis disconnections/reconnections
|
||||
const startPeriodicStateSync = (): void => {
|
||||
// Lightweight sync every 5 minutes - just restore our own state
|
||||
setInterval(async () => {
|
||||
if (redisManager.isRedisEnabled() && !isRedisRecoveryInProgress) {
|
||||
try {
|
||||
await restoreLocalConnectionsToRedis();
|
||||
logger.debug("Periodic connection state sync completed");
|
||||
} catch (error) {
|
||||
logger.error("Error during periodic connection state sync:", error);
|
||||
setInterval(
|
||||
async () => {
|
||||
if (redisManager.isRedisEnabled() && !isRedisRecoveryInProgress) {
|
||||
try {
|
||||
await restoreLocalConnectionsToRedis();
|
||||
logger.debug("Periodic connection state sync completed");
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
"Error during periodic connection state sync:",
|
||||
error
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}, 5 * 60 * 1000); // 5 minutes
|
||||
},
|
||||
5 * 60 * 1000
|
||||
); // 5 minutes
|
||||
|
||||
// Cleanup stale connections every 15 minutes
|
||||
setInterval(async () => {
|
||||
if (redisManager.isRedisEnabled()) {
|
||||
try {
|
||||
await cleanupStaleConnections();
|
||||
logger.debug("Periodic connection cleanup completed");
|
||||
} catch (error) {
|
||||
logger.error("Error during periodic connection cleanup:", error);
|
||||
setInterval(
|
||||
async () => {
|
||||
if (redisManager.isRedisEnabled()) {
|
||||
try {
|
||||
await cleanupStaleConnections();
|
||||
logger.debug("Periodic connection cleanup completed");
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
"Error during periodic connection cleanup:",
|
||||
error
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}, 15 * 60 * 1000); // 15 minutes
|
||||
},
|
||||
15 * 60 * 1000
|
||||
); // 15 minutes
|
||||
};
|
||||
|
||||
const cleanupStaleConnections = async (): Promise<void> => {
|
||||
if (!redisManager.isRedisEnabled()) return;
|
||||
|
||||
try {
|
||||
const nodeKeys = await redisManager.getClient()?.keys(`ws:node:${NODE_ID}:*`) || [];
|
||||
|
||||
const nodeKeys =
|
||||
(await redisManager.getClient()?.keys(`ws:node:${NODE_ID}:*`)) ||
|
||||
[];
|
||||
|
||||
for (const nodeKey of nodeKeys) {
|
||||
const connections = await redisManager.hgetall(nodeKey);
|
||||
const clientId = nodeKey.replace(`ws:node:${NODE_ID}:`, '');
|
||||
const clientId = nodeKey.replace(`ws:node:${NODE_ID}:`, "");
|
||||
const localClients = connectedClients.get(clientId) || [];
|
||||
const localConnectionIds = localClients
|
||||
.filter(client => client.readyState === WebSocket.OPEN)
|
||||
.map(client => client.connectionId)
|
||||
.filter((client) => client.readyState === WebSocket.OPEN)
|
||||
.map((client) => client.connectionId)
|
||||
.filter(Boolean);
|
||||
|
||||
// Remove Redis entries for connections that no longer exist locally
|
||||
for (const [connectionId, timestamp] of Object.entries(connections)) {
|
||||
for (const [connectionId, timestamp] of Object.entries(
|
||||
connections
|
||||
)) {
|
||||
if (!localConnectionIds.includes(connectionId)) {
|
||||
await redisManager.hdel(nodeKey, connectionId);
|
||||
logger.debug(`Cleaned up stale connection: ${connectionId} for client: ${clientId}`);
|
||||
logger.debug(
|
||||
`Cleaned up stale connection: ${connectionId} for client: ${clientId}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -776,7 +840,9 @@ const cleanupStaleConnections = async (): Promise<void> => {
|
||||
if (Object.keys(remainingConnections).length === 0) {
|
||||
await redisManager.srem(getConnectionsKey(clientId), NODE_ID);
|
||||
await redisManager.del(nodeKey);
|
||||
logger.debug(`Cleaned up empty connection tracking for client: ${clientId}`);
|
||||
logger.debug(
|
||||
`Cleaned up empty connection tracking for client: ${clientId}`
|
||||
);
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
@@ -789,38 +855,38 @@ if (redisManager.isRedisEnabled()) {
|
||||
initializeRedisSubscription().catch((error) => {
|
||||
logger.error("Failed to initialize Redis subscription:", error);
|
||||
});
|
||||
|
||||
|
||||
// Register recovery callback with Redis manager
|
||||
// When Redis reconnects, each node simply restores its own local state
|
||||
redisManager.onReconnection(async () => {
|
||||
logger.info("Redis reconnected, starting WebSocket state recovery...");
|
||||
await recoverConnectionState();
|
||||
});
|
||||
|
||||
|
||||
// Start periodic state synchronization
|
||||
startPeriodicStateSync();
|
||||
|
||||
|
||||
logger.info(
|
||||
`WebSocket handler initialized with Redis support - Node ID: ${NODE_ID}`
|
||||
);
|
||||
} else {
|
||||
logger.debug(
|
||||
"WebSocket handler initialized in local mode"
|
||||
);
|
||||
logger.debug("WebSocket handler initialized in local mode");
|
||||
}
|
||||
|
||||
// Disconnect a specific client and force them to reconnect
|
||||
const disconnectClient = async (clientId: string): Promise<boolean> => {
|
||||
const mapKey = getClientMapKey(clientId);
|
||||
const clients = connectedClients.get(mapKey);
|
||||
|
||||
|
||||
if (!clients || clients.length === 0) {
|
||||
logger.debug(`No connections found for client ID: ${clientId}`);
|
||||
return false;
|
||||
}
|
||||
|
||||
logger.info(`Disconnecting client ID: ${clientId} (${clients.length} connection(s))`);
|
||||
|
||||
logger.info(
|
||||
`Disconnecting client ID: ${clientId} (${clients.length} connection(s))`
|
||||
);
|
||||
|
||||
// Close all connections for this client
|
||||
clients.forEach((client) => {
|
||||
if (client.readyState === WebSocket.OPEN) {
|
||||
|
||||
Reference in New Issue
Block a user