Pass through transaction

This commit is contained in:
Owen
2025-10-17 14:04:49 -07:00
parent e5a436593f
commit c07abf8ff9
4 changed files with 29 additions and 19 deletions

View File

@@ -612,7 +612,8 @@ export class UsageService {
public async getUsage( public async getUsage(
orgId: string, orgId: string,
featureId: FeatureId featureId: FeatureId,
trx: Transaction | typeof db = db
): Promise<Usage | null> { ): Promise<Usage | null> {
if (noop()) { if (noop()) {
return null; return null;
@@ -621,7 +622,7 @@ export class UsageService {
const usageId = `${orgId}-${featureId}`; const usageId = `${orgId}-${featureId}`;
try { try {
const [result] = await db const [result] = await trx
.select() .select()
.from(usage) .from(usage)
.where(eq(usage.usageId, usageId)) .where(eq(usage.usageId, usageId))
@@ -635,7 +636,7 @@ export class UsageService {
const meterId = getFeatureMeterId(featureId); const meterId = getFeatureMeterId(featureId);
try { try {
const [newUsage] = await db const [newUsage] = await trx
.insert(usage) .insert(usage)
.values({ .values({
usageId, usageId,
@@ -652,7 +653,7 @@ export class UsageService {
return newUsage; return newUsage;
} else { } else {
// Record was created by another process, fetch it // Record was created by another process, fetch it
const [existingUsage] = await db const [existingUsage] = await trx
.select() .select()
.from(usage) .from(usage)
.where(eq(usage.usageId, usageId)) .where(eq(usage.usageId, usageId))
@@ -665,7 +666,7 @@ export class UsageService {
`Insert failed for ${orgId}/${featureId}, attempting to fetch existing record:`, `Insert failed for ${orgId}/${featureId}, attempting to fetch existing record:`,
insertError insertError
); );
const [existingUsage] = await db const [existingUsage] = await trx
.select() .select()
.from(usage) .from(usage)
.where(eq(usage.usageId, usageId)) .where(eq(usage.usageId, usageId))
@@ -812,7 +813,8 @@ export class UsageService {
orgId: string, orgId: string,
kickSites = false, kickSites = false,
featureId?: FeatureId, featureId?: FeatureId,
usage?: Usage usage?: Usage,
trx: Transaction | typeof db = db
): Promise<boolean> { ): Promise<boolean> {
if (noop()) { if (noop()) {
return false; return false;
@@ -825,7 +827,7 @@ export class UsageService {
let orgLimits: Limit[] = []; let orgLimits: Limit[] = [];
if (featureId) { if (featureId) {
// Get all limits set for this organization // Get all limits set for this organization
orgLimits = await db orgLimits = await trx
.select() .select()
.from(limits) .from(limits)
.where( .where(
@@ -836,7 +838,7 @@ export class UsageService {
); );
} else { } else {
// Get all limits set for this organization // Get all limits set for this organization
orgLimits = await db orgLimits = await trx
.select() .select()
.from(limits) .from(limits)
.where(eq(limits.orgId, orgId)); .where(eq(limits.orgId, orgId));
@@ -855,7 +857,8 @@ export class UsageService {
} else { } else {
currentUsage = await this.getUsage( currentUsage = await this.getUsage(
orgId, orgId,
limit.featureId as FeatureId limit.featureId as FeatureId,
trx
); );
} }
@@ -890,7 +893,7 @@ export class UsageService {
); );
// Get all sites for this organization // Get all sites for this organization
const orgSites = await db const orgSites = await trx
.select() .select()
.from(sites) .from(sites)
.where(eq(sites.orgId, orgId)); .where(eq(sites.orgId, orgId));
@@ -902,7 +905,7 @@ export class UsageService {
// Send termination messages to newt sites // Send termination messages to newt sites
for (const site of orgSites) { for (const site of orgSites) {
if (site.type === "newt") { if (site.type === "newt") {
const [newt] = await db const [newt] = await trx
.select() .select()
.from(newts) .from(newts)
.where(eq(newts.siteId, site.siteId)) .where(eq(newts.siteId, site.siteId))
@@ -917,7 +920,7 @@ export class UsageService {
}; };
// Don't await to prevent blocking // Don't await to prevent blocking
sendToClient(newt.newtId, payload).catch( await sendToClient(newt.newtId, payload).catch(
(error: any) => { (error: any) => {
logger.error( logger.error(
`Failed to send termination message to newt ${newt.newtId}:`, `Failed to send termination message to newt ${newt.newtId}:`,

View File

@@ -1,4 +1,4 @@
import { db, exitNodes } from "@server/db"; import { db, exitNodes, Transaction } from "@server/db";
import logger from "@server/logger"; import logger from "@server/logger";
import { ExitNodePingResult } from "@server/routers/newt"; import { ExitNodePingResult } from "@server/routers/newt";
import { eq } from "drizzle-orm"; import { eq } from "drizzle-orm";
@@ -59,7 +59,11 @@ export function selectBestExitNode(
return pingResults[0]; return pingResults[0];
} }
export async function checkExitNodeOrg(exitNodeId: number, orgId: string) { export async function checkExitNodeOrg(
exitNodeId: number,
orgId: string,
trx?: Transaction | typeof db
): Promise<boolean> {
return false; return false;
} }

View File

@@ -18,7 +18,8 @@ import {
resources, resources,
targets, targets,
sites, sites,
targetHealthCheck targetHealthCheck,
Transaction
} from "@server/db"; } from "@server/db";
import logger from "@server/logger"; import logger from "@server/logger";
import { ExitNodePingResult } from "@server/routers/newt"; import { ExitNodePingResult } from "@server/routers/newt";
@@ -333,8 +334,8 @@ export function selectBestExitNode(
return fallbackNode; return fallbackNode;
} }
export async function checkExitNodeOrg(exitNodeId: number, orgId: string) { export async function checkExitNodeOrg(exitNodeId: number, orgId: string, trx: Transaction | typeof db = db) {
const [exitNodeOrg] = await db const [exitNodeOrg] = await trx
.select() .select()
.from(exitNodeOrgs) .from(exitNodeOrgs)
.where( .where(

View File

@@ -98,7 +98,8 @@ export async function updateSiteBandwidth(
if ( if (
await checkExitNodeOrg( await checkExitNodeOrg(
exitNodeId, exitNodeId,
updatedSite.orgId updatedSite.orgId,
trx
) )
) { ) {
// not allowed // not allowed
@@ -242,7 +243,8 @@ export async function updateSiteBandwidth(
if ( if (
await checkExitNodeOrg( await checkExitNodeOrg(
exitNodeId, exitNodeId,
updatedSite.orgId updatedSite.orgId,
trx
) )
) { ) {
// not allowed // not allowed