import { eq, sql, and } from "drizzle-orm"; import { db, usage, customers, limits, Usage, Limit, Transaction, orgs } from "@server/db"; import { FeatureId, getFeatureMeterId } from "./features"; import logger from "@server/logger"; import { build } from "@server/build"; import cache from "@server/lib/cache"; export function noop() { if (build !== "saas") { return true; } return false; } export class UsageService { constructor() { if (noop()) { return; } } /** * Truncate a number to 11 decimal places to prevent precision issues */ private truncateValue(value: number): number { return Math.round(value * 100000000000) / 100000000000; // 11 decimal places } public async add( orgId: string, featureId: FeatureId, value: number, transaction: any = null ): Promise { if (noop()) { return null; } let orgIdToUse = await this.getBillingOrg(orgId, transaction); // Truncate value to 11 decimal places value = this.truncateValue(value); // Implement retry logic for deadlock handling const maxRetries = 3; let attempt = 0; while (attempt <= maxRetries) { try { let usage; if (transaction) { usage = await this.internalAddUsage( orgIdToUse, featureId, value, transaction ); } else { await db.transaction(async (trx) => { usage = await this.internalAddUsage( orgIdToUse, featureId, value, trx ); }); } return usage || null; } catch (error: any) { // Check if this is a deadlock error const isDeadlock = error?.code === "40P01" || error?.cause?.code === "40P01" || (error?.message && error.message.includes("deadlock")); if (isDeadlock && attempt < maxRetries) { attempt++; // Exponential backoff with jitter: 50-150ms, 100-300ms, 200-600ms const baseDelay = Math.pow(2, attempt - 1) * 50; const jitter = Math.random() * baseDelay; const delay = baseDelay + jitter; logger.warn( `Deadlock detected for ${orgIdToUse}/${featureId}, retrying attempt ${attempt}/${maxRetries} after ${delay.toFixed(0)}ms` ); await new Promise((resolve) => setTimeout(resolve, delay)); continue; } logger.error( `Failed to add usage for ${orgIdToUse}/${featureId} after ${attempt} attempts:`, error ); break; } } return null; } private async internalAddUsage( orgId: string, // here the orgId is the billing org already resolved by getBillingOrg in updateCount featureId: FeatureId, value: number, trx: Transaction ): Promise { // Truncate value to 11 decimal places value = this.truncateValue(value); const usageId = `${orgId}-${featureId}`; const meterId = getFeatureMeterId(featureId); // Use upsert: insert if not exists, otherwise increment const [returnUsage] = await trx .insert(usage) .values({ usageId, featureId, orgId, meterId, instantaneousValue: value || 0, latestValue: value || 0, updatedAt: Math.floor(Date.now() / 1000) }) .onConflictDoUpdate({ target: usage.usageId, set: { instantaneousValue: sql`COALESCE(${usage.instantaneousValue}, 0) + ${value}` } }) .returning(); logger.debug( `Added usage for org ${orgId} feature ${featureId}: +${value}, new instantaneousValue: ${returnUsage.instantaneousValue}` ); return returnUsage; } // Helper function to get today's date as string (YYYY-MM-DD) getTodayDateString(): string { return new Date().toISOString().split("T")[0]; } // Helper function to get date string from Date object getDateString(date: number): string { return new Date(date * 1000).toISOString().split("T")[0]; } async updateCount( orgId: string, featureId: FeatureId, value?: number, customerId?: string ): Promise { if (noop()) { return; } let orgIdToUse = await this.getBillingOrg(orgId); try { // Truncate value to 11 decimal places if provided if (value !== undefined && value !== null) { value = this.truncateValue(value); } let currentUsage: Usage | null = null; await db.transaction(async (trx) => { // Get existing meter record const usageId = `${orgIdToUse}-${featureId}`; // Get current usage record [currentUsage] = await trx .select() .from(usage) .where(eq(usage.usageId, usageId)) .limit(1); if (currentUsage) { await trx .update(usage) .set({ instantaneousValue: value, updatedAt: Math.floor(Date.now() / 1000) }) .where(eq(usage.usageId, usageId)); } else { // First record for this meter const meterId = getFeatureMeterId(featureId); await trx.insert(usage).values({ usageId, featureId, orgId: orgIdToUse, meterId, instantaneousValue: value || 0, latestValue: value || 0, updatedAt: Math.floor(Date.now() / 1000) }); } }); // if (privateConfig.getRawPrivateConfig().flags.usage_reporting) { // await this.logStripeEvent(featureId, value || 0, customerId); // } } catch (error) { logger.error( `Failed to update count usage for ${orgIdToUse}/${featureId}:`, error ); } } private async getCustomerId( orgId: string, featureId: FeatureId ): Promise { let orgIdToUse = await this.getBillingOrg(orgId); const cacheKey = `customer_${orgIdToUse}_${featureId}`; const cached = cache.get(cacheKey); if (cached) { return cached; } try { // Query subscription data const [customer] = await db .select({ customerId: customers.customerId }) .from(customers) .where(eq(customers.orgId, orgIdToUse)) .limit(1); if (!customer) { return null; } const customerId = customer.customerId; // Cache the result cache.set(cacheKey, customerId, 300); // 5 minute TTL return customerId; } catch (error) { logger.error( `Failed to get subscription data for ${orgIdToUse}/${featureId}:`, error ); return null; } } public async getUsage( orgId: string, featureId: FeatureId, trx: Transaction | typeof db = db ): Promise { if (noop()) { return null; } let orgIdToUse = await this.getBillingOrg(orgId, trx); const usageId = `${orgIdToUse}-${featureId}`; try { const [result] = await trx .select() .from(usage) .where(eq(usage.usageId, usageId)) .limit(1); if (!result) { // Lets create one if it doesn't exist using upsert to handle race conditions logger.info( `Creating new usage record for ${orgIdToUse}/${featureId}` ); const meterId = getFeatureMeterId(featureId); try { const [newUsage] = await trx .insert(usage) .values({ usageId, featureId, orgId: orgIdToUse, meterId, latestValue: 0, updatedAt: Math.floor(Date.now() / 1000) }) .onConflictDoNothing() .returning(); if (newUsage) { return newUsage; } else { // Record was created by another process, fetch it const [existingUsage] = await trx .select() .from(usage) .where(eq(usage.usageId, usageId)) .limit(1); return existingUsage || null; } } catch (insertError) { // Fallback: try to fetch existing record in case of any insert issues logger.warn( `Insert failed for ${orgIdToUse}/${featureId}, attempting to fetch existing record:`, insertError ); const [existingUsage] = await trx .select() .from(usage) .where(eq(usage.usageId, usageId)) .limit(1); return existingUsage || null; } } return result; } catch (error) { logger.error( `Failed to get usage for ${orgIdToUse}/${featureId}:`, error ); throw error; } } public async getBillingOrg( orgId: string, trx: Transaction | typeof db = db ): Promise { let orgIdToUse = orgId; // get the org const [org] = await trx .select() .from(orgs) .where(eq(orgs.orgId, orgId)) .limit(1); if (!org) { throw new Error(`Organization with ID ${orgId} not found`); } if (!org.isBillingOrg) { if (org.billingOrgId) { orgIdToUse = org.billingOrgId; } else { throw new Error( `Organization ${orgId} is not a billing org and does not have a billingOrgId set` ); } } return orgIdToUse; } public async checkLimitSet( orgId: string, featureId?: FeatureId, usage?: Usage, trx: Transaction | typeof db = db ): Promise { if (noop()) { return false; } let orgIdToUse = await this.getBillingOrg(orgId, trx); // This method should check the current usage against the limits set for the organization // and kick out all of the sites on the org let hasExceededLimits = false; try { let orgLimits: Limit[] = []; if (featureId) { // Get all limits set for this organization orgLimits = await trx .select() .from(limits) .where( and( eq(limits.orgId, orgIdToUse), eq(limits.featureId, featureId) ) ); } else { // Get all limits set for this organization orgLimits = await trx .select() .from(limits) .where(eq(limits.orgId, orgIdToUse)); } if (orgLimits.length === 0) { logger.debug(`No limits set for org ${orgIdToUse}`); return false; } // Check each limit against current usage for (const limit of orgLimits) { let currentUsage: Usage | null; if (usage) { currentUsage = usage; } else { currentUsage = await this.getUsage( orgIdToUse, limit.featureId as FeatureId, trx ); } const usageValue = currentUsage?.instantaneousValue || currentUsage?.latestValue || 0; logger.debug( `Current usage for org ${orgIdToUse} on feature ${limit.featureId}: ${usageValue}` ); logger.debug( `Limit for org ${orgIdToUse} on feature ${limit.featureId}: ${limit.value}` ); if ( currentUsage && limit.value !== null && usageValue > limit.value ) { logger.debug( `Org ${orgIdToUse} has exceeded limit for ${limit.featureId}: ` + `${usageValue} > ${limit.value}` ); hasExceededLimits = true; break; // Exit early if any limit is exceeded } } } catch (error) { logger.error(`Error checking limits for org ${orgIdToUse}:`, error); } return hasExceededLimits; } } export const usageService = new UsageService();