Move things around; rename to olm

This commit is contained in:
Owen
2025-02-21 10:13:41 -05:00
parent 41983ce356
commit e112fcba29
13 changed files with 642 additions and 93 deletions

View File

@@ -3,10 +3,11 @@ import { Server as HttpServer } from "http";
import { WebSocket, WebSocketServer } from "ws";
import { IncomingMessage } from "http";
import { Socket } from "net";
import { Newt, newts, NewtSession } from "@server/db/schema";
import { Newt, newts, NewtSession, Olm, olms, OlmSession } from "@server/db/schema";
import { eq } from "drizzle-orm";
import db from "@server/db";
import { validateNewtSessionToken } from "@server/auth/sessions/newt";
import { validateOlmSessionToken } from "@server/auth/sessions/olm";
import { messageHandlers } from "./messageHandlers";
import logger from "@server/logger";
@@ -15,13 +16,17 @@ interface WebSocketRequest extends IncomingMessage {
token?: string;
}
type ClientType = 'newt' | 'olm';
interface AuthenticatedWebSocket extends WebSocket {
newt?: Newt;
client?: Newt | Olm;
clientType?: ClientType;
}
interface TokenPayload {
newt: Newt;
session: NewtSession;
client: Newt | Olm;
session: NewtSession | OlmSession;
clientType: ClientType;
}
interface WSMessage {
@@ -33,15 +38,16 @@ interface HandlerResponse {
message: WSMessage;
broadcast?: boolean;
excludeSender?: boolean;
targetNewtId?: string;
targetClientId?: string;
}
interface HandlerContext {
message: WSMessage;
senderWs: WebSocket;
newt: Newt | undefined;
sendToClient: (newtId: string, message: WSMessage) => boolean;
broadcastToAllExcept: (message: WSMessage, excludeNewtId?: string) => void;
client: Newt | Olm | undefined;
clientType: ClientType;
sendToClient: (clientId: string, message: WSMessage) => boolean;
broadcastToAllExcept: (message: WSMessage, excludeClientId?: string) => void;
connectedClients: Map<string, WebSocket[]>;
}
@@ -54,34 +60,32 @@ const wss: WebSocketServer = new WebSocketServer({ noServer: true });
let connectedClients: Map<string, AuthenticatedWebSocket[]> = new Map();
// Helper functions for client management
const addClient = (newtId: string, ws: AuthenticatedWebSocket): void => {
const existingClients = connectedClients.get(newtId) || [];
const addClient = (clientId: string, ws: AuthenticatedWebSocket, clientType: ClientType): void => {
const existingClients = connectedClients.get(clientId) || [];
existingClients.push(ws);
connectedClients.set(newtId, existingClients);
logger.info(`Client added to tracking - Newt ID: ${newtId}, Total connections: ${existingClients.length}`);
connectedClients.set(clientId, existingClients);
logger.info(`Client added to tracking - ${clientType.toUpperCase()} ID: ${clientId}, Total connections: ${existingClients.length}`);
};
const removeClient = (newtId: string, ws: AuthenticatedWebSocket): void => {
const existingClients = connectedClients.get(newtId) || [];
const removeClient = (clientId: string, ws: AuthenticatedWebSocket, clientType: ClientType): void => {
const existingClients = connectedClients.get(clientId) || [];
const updatedClients = existingClients.filter(client => client !== ws);
if (updatedClients.length === 0) {
connectedClients.delete(newtId);
logger.info(`All connections removed for Newt ID: ${newtId}`);
connectedClients.delete(clientId);
logger.info(`All connections removed for ${clientType.toUpperCase()} ID: ${clientId}`);
} else {
connectedClients.set(newtId, updatedClients);
logger.info(`Connection removed - Newt ID: ${newtId}, Remaining connections: ${updatedClients.length}`);
connectedClients.set(clientId, updatedClients);
logger.info(`Connection removed - ${clientType.toUpperCase()} ID: ${clientId}, Remaining connections: ${updatedClients.length}`);
}
};
// Helper functions for sending messages
const sendToClient = (newtId: string, message: WSMessage): boolean => {
const clients = connectedClients.get(newtId);
const sendToClient = (clientId: string, message: WSMessage): boolean => {
const clients = connectedClients.get(clientId);
if (!clients || clients.length === 0) {
logger.info(`No active connections found for Newt ID: ${newtId}`);
logger.info(`No active connections found for Client ID: ${clientId}`);
return false;
}
const messageString = JSON.stringify(message);
clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
@@ -91,9 +95,9 @@ const sendToClient = (newtId: string, message: WSMessage): boolean => {
return true;
};
const broadcastToAllExcept = (message: WSMessage, excludeNewtId?: string): void => {
connectedClients.forEach((clients, newtId) => {
if (newtId !== excludeNewtId) {
const broadcastToAllExcept = (message: WSMessage, excludeClientId?: string): void => {
connectedClients.forEach((clients, clientId) => {
if (clientId !== excludeClientId) {
clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify(message));
@@ -103,84 +107,88 @@ const broadcastToAllExcept = (message: WSMessage, excludeNewtId?: string): void
});
};
// Token verification middleware (unchanged)
const verifyToken = async (token: string): Promise<TokenPayload | null> => {
// Token verification middleware
const verifyToken = async (token: string, clientType: ClientType): Promise<TokenPayload | null> => {
try {
const { session, newt } = await validateNewtSessionToken(token);
if (!session || !newt) {
return null;
if (clientType === 'newt') {
const { session, newt } = await validateNewtSessionToken(token);
if (!session || !newt) {
return null;
}
const existingNewt = await db
.select()
.from(newts)
.where(eq(newts.newtId, newt.newtId));
if (!existingNewt || !existingNewt[0]) {
return null;
}
return { client: existingNewt[0], session, clientType };
} else {
const { session, olm } = await validateOlmSessionToken(token);
if (!session || !olm) {
return null;
}
const existingOlm = await db
.select()
.from(olms)
.where(eq(olms.olmId, olm.olmId));
if (!existingOlm || !existingOlm[0]) {
return null;
}
return { client: existingOlm[0], session, clientType };
}
const existingNewt = await db
.select()
.from(newts)
.where(eq(newts.newtId, newt.newtId));
if (!existingNewt || !existingNewt[0]) {
return null;
}
return { newt: existingNewt[0], session };
} catch (error) {
logger.error("Token verification failed:", error);
return null;
}
};
const setupConnection = (ws: AuthenticatedWebSocket, newt: Newt): void => {
const setupConnection = (ws: AuthenticatedWebSocket, client: Newt | Olm, clientType: ClientType): void => {
logger.info("Establishing websocket connection");
if (!newt) {
logger.error("Connection attempt without newt");
if (!client) {
logger.error("Connection attempt without client");
return ws.terminate();
}
ws.newt = newt;
ws.client = client;
ws.clientType = clientType;
// Add client to tracking
addClient(newt.newtId, ws);
const clientId = clientType === 'newt' ? (client as Newt).newtId : (client as Olm).olmId;
addClient(clientId, ws, clientType);
ws.on("message", async (data) => {
try {
const message: WSMessage = JSON.parse(data.toString());
// logger.info(`Message received from Newt ID ${newtId}:`, message);
// Validate message format
if (!message.type || typeof message.type !== "string") {
throw new Error("Invalid message format: missing or invalid type");
}
// Get the appropriate handler for the message type
const handler = messageHandlers[message.type];
if (!handler) {
throw new Error(`Unsupported message type: ${message.type}`);
}
// Process the message and get response
const response = await handler({
message,
senderWs: ws,
newt: ws.newt,
client: ws.client,
clientType: ws.clientType!,
sendToClient,
broadcastToAllExcept,
connectedClients
});
// Send response if one was returned
if (response) {
if (response.broadcast) {
// Broadcast to all clients except sender if specified
broadcastToAllExcept(response.message, response.excludeSender ? newt.newtId : undefined);
} else if (response.targetNewtId) {
// Send to specific client if targetNewtId is provided
sendToClient(response.targetNewtId, response.message);
broadcastToAllExcept(response.message, response.excludeSender ? clientId : undefined);
} else if (response.targetClientId) {
sendToClient(response.targetClientId, response.message);
} else {
// Send back to sender
ws.send(JSON.stringify(response.message));
}
}
} catch (error) {
logger.error("Message handling error:", error);
ws.send(JSON.stringify({
@@ -194,18 +202,18 @@ const setupConnection = (ws: AuthenticatedWebSocket, newt: Newt): void => {
});
ws.on("close", () => {
removeClient(newt.newtId, ws);
logger.info(`Client disconnected - Newt ID: ${newt.newtId}`);
removeClient(clientId, ws, clientType);
logger.info(`Client disconnected - ${clientType.toUpperCase()} ID: ${clientId}`);
});
ws.on("error", (error: Error) => {
logger.error(`WebSocket error for Newt ID ${newt.newtId}:`, error);
logger.error(`WebSocket error for ${clientType.toUpperCase()} ID ${clientId}:`, error);
});
logger.info(`WebSocket connection established - Newt ID: ${newt.newtId}`);
logger.info(`WebSocket connection established - ${clientType.toUpperCase()} ID: ${clientId}`);
};
// Router endpoint (unchanged)
// Router endpoint
router.get("/ws", (req: Request, res: Response) => {
res.status(200).send("WebSocket endpoint");
});
@@ -214,18 +222,22 @@ router.get("/ws", (req: Request, res: Response) => {
const handleWSUpgrade = (server: HttpServer): void => {
server.on("upgrade", async (request: WebSocketRequest, socket: Socket, head: Buffer) => {
try {
const token = request.url?.includes("?")
? new URLSearchParams(request.url.split("?")[1]).get("token") || ""
: request.headers["sec-websocket-protocol"];
const url = new URL(request.url || '', `http://${request.headers.host}`);
const token = url.searchParams.get('token') || request.headers["sec-websocket-protocol"] || '';
let clientType = url.searchParams.get('clientType') as ClientType;
if (!token) {
logger.warn("Unauthorized connection attempt: no token...");
if (!clientType) {
clientType = "newt";
}
if (!token || !clientType || !['newt', 'olm'].includes(clientType)) {
logger.warn("Unauthorized connection attempt: invalid token or client type...");
socket.write("HTTP/1.1 401 Unauthorized\r\n\r\n");
socket.destroy();
return;
}
const tokenPayload = await verifyToken(token);
const tokenPayload = await verifyToken(token, clientType);
if (!tokenPayload) {
logger.warn("Unauthorized connection attempt: invalid token...");
socket.write("HTTP/1.1 401 Unauthorized\r\n\r\n");
@@ -234,7 +246,7 @@ const handleWSUpgrade = (server: HttpServer): void => {
}
wss.handleUpgrade(request, socket, head, (ws: AuthenticatedWebSocket) => {
setupConnection(ws, tokenPayload.newt);
setupConnection(ws, tokenPayload.client, tokenPayload.clientType);
});
} catch (error) {
logger.error("WebSocket upgrade error:", error);
@@ -250,4 +262,4 @@ export {
sendToClient,
broadcastToAllExcept,
connectedClients
};
};