mirror of
https://github.com/fosrl/pangolin.git
synced 2026-03-02 16:56:39 +00:00
a
This commit is contained in:
@@ -10,7 +10,6 @@ import { validateNewtSessionToken } from "@server/auth/sessions/newt";
|
||||
import { validateOlmSessionToken } from "@server/auth/sessions/olm";
|
||||
import { messageHandlers } from "./messageHandlers";
|
||||
import logger from "@server/logger";
|
||||
import redisManager from "@server/db/redis";
|
||||
import { v4 as uuidv4 } from "uuid";
|
||||
|
||||
// Custom interfaces
|
||||
@@ -54,14 +53,6 @@ interface HandlerContext {
|
||||
connectedClients: Map<string, WebSocket[]>;
|
||||
}
|
||||
|
||||
interface RedisMessage {
|
||||
type: 'direct' | 'broadcast';
|
||||
targetClientId?: string;
|
||||
excludeClientId?: string;
|
||||
message: WSMessage;
|
||||
fromNodeId: string;
|
||||
}
|
||||
|
||||
export type MessageHandler = (context: HandlerContext) => Promise<HandlerResponse | void>;
|
||||
|
||||
const router: Router = Router();
|
||||
@@ -69,41 +60,12 @@ const wss: WebSocketServer = new WebSocketServer({ noServer: true });
|
||||
|
||||
// Generate unique node ID for this instance
|
||||
const NODE_ID = uuidv4();
|
||||
const REDIS_CHANNEL = 'websocket_messages';
|
||||
|
||||
// Client tracking map (local to this node)
|
||||
let connectedClients: Map<string, AuthenticatedWebSocket[]> = new Map();
|
||||
// Helper to get map key
|
||||
const getClientMapKey = (clientId: string) => clientId;
|
||||
|
||||
// Redis keys (generalized)
|
||||
const getConnectionsKey = (clientId: string) => `ws:connections:${clientId}`;
|
||||
const getNodeConnectionsKey = (nodeId: string, clientId: string) => `ws:node:${nodeId}:${clientId}`;
|
||||
|
||||
// Initialize Redis subscription for cross-node messaging
|
||||
const initializeRedisSubscription = async (): Promise<void> => {
|
||||
if (!redisManager.isRedisEnabled()) return;
|
||||
|
||||
await redisManager.subscribe(REDIS_CHANNEL, async (channel: string, message: string) => {
|
||||
try {
|
||||
const redisMessage: RedisMessage = JSON.parse(message);
|
||||
|
||||
// Ignore messages from this node
|
||||
if (redisMessage.fromNodeId === NODE_ID) return;
|
||||
|
||||
if (redisMessage.type === 'direct' && redisMessage.targetClientId) {
|
||||
// Send to specific client on this node
|
||||
await sendToClientLocal(redisMessage.targetClientId, redisMessage.message);
|
||||
} else if (redisMessage.type === 'broadcast') {
|
||||
// Broadcast to all clients on this node except excluded
|
||||
await broadcastToAllExceptLocal(redisMessage.message, redisMessage.excludeClientId);
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Error processing Redis message:', error);
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
// Helper functions for client management
|
||||
const addClient = async (clientType: ClientType, clientId: string, ws: AuthenticatedWebSocket): Promise<void> => {
|
||||
// Generate unique connection ID
|
||||
@@ -116,12 +78,6 @@ const addClient = async (clientType: ClientType, clientId: string, ws: Authentic
|
||||
existingClients.push(ws);
|
||||
connectedClients.set(mapKey, existingClients);
|
||||
|
||||
// Add to Redis tracking if enabled
|
||||
if (redisManager.isRedisEnabled()) {
|
||||
await redisManager.sadd(getConnectionsKey(clientId), NODE_ID);
|
||||
await redisManager.hset(getNodeConnectionsKey(NODE_ID, clientId), connectionId, Date.now().toString());
|
||||
}
|
||||
|
||||
logger.info(`Client added to tracking - ${clientType.toUpperCase()} ID: ${clientId}, Connection ID: ${connectionId}, Total connections: ${existingClients.length}`);
|
||||
};
|
||||
|
||||
@@ -132,19 +88,10 @@ const removeClient = async (clientType: ClientType, clientId: string, ws: Authen
|
||||
if (updatedClients.length === 0) {
|
||||
connectedClients.delete(mapKey);
|
||||
|
||||
if (redisManager.isRedisEnabled()) {
|
||||
await redisManager.srem(getConnectionsKey(clientId), NODE_ID);
|
||||
await redisManager.del(getNodeConnectionsKey(NODE_ID, clientId));
|
||||
}
|
||||
|
||||
logger.info(`All connections removed for ${clientType.toUpperCase()} ID: ${clientId}`);
|
||||
} else {
|
||||
connectedClients.set(mapKey, updatedClients);
|
||||
|
||||
if (redisManager.isRedisEnabled() && ws.connectionId) {
|
||||
await redisManager.hdel(getNodeConnectionsKey(NODE_ID, clientId), ws.connectionId);
|
||||
}
|
||||
|
||||
logger.info(`Connection removed - ${clientType.toUpperCase()} ID: ${clientId}, Remaining connections: ${updatedClients.length}`);
|
||||
}
|
||||
};
|
||||
@@ -178,64 +125,31 @@ const broadcastToAllExceptLocal = async (message: WSMessage, excludeClientId?: s
|
||||
});
|
||||
};
|
||||
|
||||
// Cross-node message sending (via Redis)
|
||||
// Cross-node message sending
|
||||
const sendToClient = async (clientId: string, message: WSMessage): Promise<boolean> => {
|
||||
// Try to send locally first
|
||||
const localSent = await sendToClientLocal(clientId, message);
|
||||
|
||||
// If Redis is enabled, also send via Redis pub/sub to other nodes
|
||||
if (redisManager.isRedisEnabled()) {
|
||||
const redisMessage: RedisMessage = {
|
||||
type: 'direct',
|
||||
targetClientId: clientId,
|
||||
message,
|
||||
fromNodeId: NODE_ID
|
||||
};
|
||||
|
||||
await redisManager.publish(REDIS_CHANNEL, JSON.stringify(redisMessage));
|
||||
}
|
||||
|
||||
return localSent;
|
||||
};
|
||||
|
||||
const broadcastToAllExcept = async (message: WSMessage, excludeClientId?: string): Promise<void> => {
|
||||
// Broadcast locally
|
||||
await broadcastToAllExceptLocal(message, excludeClientId);
|
||||
|
||||
// If Redis is enabled, also broadcast via Redis pub/sub to other nodes
|
||||
if (redisManager.isRedisEnabled()) {
|
||||
const redisMessage: RedisMessage = {
|
||||
type: 'broadcast',
|
||||
excludeClientId,
|
||||
message,
|
||||
fromNodeId: NODE_ID
|
||||
};
|
||||
|
||||
await redisManager.publish(REDIS_CHANNEL, JSON.stringify(redisMessage));
|
||||
}
|
||||
};
|
||||
|
||||
// Check if a client has active connections across all nodes
|
||||
const hasActiveConnections = async (clientId: string): Promise<boolean> => {
|
||||
if (!redisManager.isRedisEnabled()) {
|
||||
const mapKey = getClientMapKey(clientId);
|
||||
const clients = connectedClients.get(mapKey);
|
||||
return !!(clients && clients.length > 0);
|
||||
}
|
||||
|
||||
const activeNodes = await redisManager.smembers(getConnectionsKey(clientId));
|
||||
return activeNodes.length > 0;
|
||||
};
|
||||
|
||||
// Get all active nodes for a client
|
||||
const getActiveNodes = async (clientType: ClientType, clientId: string): Promise<string[]> => {
|
||||
if (!redisManager.isRedisEnabled()) {
|
||||
const mapKey = getClientMapKey(clientId);
|
||||
const clients = connectedClients.get(mapKey);
|
||||
return (clients && clients.length > 0) ? [NODE_ID] : [];
|
||||
}
|
||||
|
||||
return await redisManager.smembers(getConnectionsKey(clientId));
|
||||
};
|
||||
|
||||
// Token verification middleware
|
||||
@@ -391,16 +305,6 @@ const handleWSUpgrade = (server: HttpServer): void => {
|
||||
});
|
||||
};
|
||||
|
||||
// Initialize Redis subscription when the module is loaded
|
||||
if (redisManager.isRedisEnabled()) {
|
||||
initializeRedisSubscription().catch(error => {
|
||||
logger.error('Failed to initialize Redis subscription:', error);
|
||||
});
|
||||
logger.info(`WebSocket handler initialized with Redis support - Node ID: ${NODE_ID}`);
|
||||
} else {
|
||||
logger.debug('WebSocket handler initialized in local mode (Redis disabled)');
|
||||
}
|
||||
|
||||
// Cleanup function for graceful shutdown
|
||||
const cleanup = async (): Promise<void> => {
|
||||
try {
|
||||
@@ -413,14 +317,6 @@ const cleanup = async (): Promise<void> => {
|
||||
});
|
||||
});
|
||||
|
||||
// Clean up Redis tracking for this node
|
||||
if (redisManager.isRedisEnabled()) {
|
||||
const keys = await redisManager.getClient()?.keys(`ws:node:${NODE_ID}:*`) || [];
|
||||
if (keys.length > 0) {
|
||||
await Promise.all(keys.map(key => redisManager.del(key)));
|
||||
}
|
||||
}
|
||||
|
||||
logger.info('WebSocket cleanup completed');
|
||||
} catch (error) {
|
||||
logger.error('Error during WebSocket cleanup:', error);
|
||||
|
||||
Reference in New Issue
Block a user