Add round trip tracking for any message

This commit is contained in:
Owen
2026-02-16 20:29:55 -08:00
parent 5092eb58fb
commit 3debc6c8d3
8 changed files with 317 additions and 32 deletions

View File

@@ -50,6 +50,7 @@ import createHttpError from "http-errors";
import { build } from "@server/build";
import { createStore } from "#dynamic/lib/rateLimitStore";
import { logActionAudit } from "#dynamic/middlewares";
import { checkRoundTripMessage } from "./ws";
// Root routes
export const unauthenticated = Router();
@@ -1123,6 +1124,8 @@ authenticated.get(
blueprints.getBlueprint
);
authenticated.get("/ws/round-trip-message/:messageId", checkRoundTripMessage);
// Auth routes
export const authRouter = Router();
unauthenticated.use("/auth", authRouter);

View File

@@ -0,0 +1,85 @@
import { Request, Response, NextFunction } from "express";
import { z } from "zod";
import { db, roundTripMessageTracker } from "@server/db";
import response from "@server/lib/response";
import HttpCode from "@server/types/HttpCode";
import createHttpError from "http-errors";
import logger from "@server/logger";
import { fromError } from "zod-validation-error";
import { eq } from "drizzle-orm";
import { OpenAPITags, registry } from "@server/openApi";
const checkRoundTripMessageParamsSchema = z
.object({
messageId: z
.string()
.transform(Number)
.pipe(z.number().int().positive())
})
.strict();
// registry.registerPath({
// method: "get",
// path: "/ws/round-trip-message/{messageId}",
// description:
// "Check if a round trip message has been completed by checking the roundTripMessageTracker table",
// tags: [OpenAPITags.WebSocket],
// request: {
// params: checkRoundTripMessageParamsSchema
// },
// responses: {}
// });
export async function checkRoundTripMessage(
req: Request,
res: Response,
next: NextFunction
): Promise<any> {
try {
const parsedParams = checkRoundTripMessageParamsSchema.safeParse(
req.params
);
if (!parsedParams.success) {
return next(
createHttpError(
HttpCode.BAD_REQUEST,
fromError(parsedParams.error).toString()
)
);
}
const { messageId } = parsedParams.data;
// Get the round trip message from the tracker
const [message] = await db
.select()
.from(roundTripMessageTracker)
.where(eq(roundTripMessageTracker.messageId, messageId))
.limit(1);
if (!message) {
return next(
createHttpError(HttpCode.NOT_FOUND, "Message not found")
);
}
return response(res, {
data: {
messageId: message.messageId,
complete: message.complete,
sentAt: message.sentAt,
receivedAt: message.receivedAt,
error: message.error,
},
success: true,
error: false,
message: "Round trip message status retrieved successfully",
status: HttpCode.OK
});
} catch (error) {
logger.error(error);
return next(
createHttpError(HttpCode.INTERNAL_SERVER_ERROR, "An error occurred")
);
}
}

View File

@@ -0,0 +1,49 @@
import { db, roundTripMessageTracker } from "@server/db";
import { MessageHandler } from "@server/routers/ws";
import { eq } from "drizzle-orm";
import logger from "@server/logger";
interface RoundTripCompleteMessage {
messageId: number;
complete: boolean;
error?: string;
}
export const handleRoundTripMessage: MessageHandler = async (
context
) => {
const { message, client: c } = context;
logger.info("Handling round trip message");
const data = message.data as RoundTripCompleteMessage;
try {
const { messageId, complete, error } = data;
if (!messageId) {
logger.error("Round trip message missing messageId");
return;
}
// Update the roundTripMessageTracker with completion status
await db
.update(roundTripMessageTracker)
.set({
complete: complete,
receivedAt: Math.floor(Date.now() / 1000),
error: error || null
})
.where(eq(roundTripMessageTracker.messageId, messageId));
logger.info(`Round trip message ${messageId} marked as complete: ${complete}`);
if (error) {
logger.warn(`Round trip message ${messageId} completed with error: ${error}`);
}
} catch (error) {
logger.error("Error processing round trip message:", error);
}
return;
};

View File

@@ -1,2 +1,3 @@
export * from "./ws";
export * from "./types";
export * from "./checkRoundTripMessage";

View File

@@ -18,6 +18,7 @@ import {
handleOlmDisconnecingMessage
} from "../olm";
import { handleHealthcheckStatusMessage } from "../target";
import { handleRoundTripMessage } from "./handleRoundTripMessage";
import { MessageHandler } from "./types";
export const messageHandlers: Record<string, MessageHandler> = {
@@ -35,7 +36,8 @@ export const messageHandlers: Record<string, MessageHandler> = {
"newt/socket/containers": handleDockerContainersMessage,
"newt/ping/request": handleNewtPingRequestMessage,
"newt/blueprint/apply": handleApplyBlueprintMessage,
"newt/healthcheck/status": handleHealthcheckStatusMessage
"newt/healthcheck/status": handleHealthcheckStatusMessage,
"ws/round-trip/complete": handleRoundTripMessage
};
startOlmOfflineChecker(); // this is to handle the offline check for olms