Handle new usage tracking with multi org

This commit is contained in:
Owen
2026-02-17 17:09:48 -08:00
parent 79cf7c84dc
commit 4d6240c987
17 changed files with 432 additions and 584 deletions

View File

@@ -1,12 +1,8 @@
import { eq, sql, and } from "drizzle-orm";
import { v4 as uuidv4 } from "uuid";
import { PutObjectCommand } from "@aws-sdk/client-s3";
import {
db,
usage,
customers,
sites,
newts,
limits,
Usage,
Limit,
@@ -15,21 +11,9 @@ import {
} from "@server/db";
import { FeatureId, getFeatureMeterId } from "./features";
import logger from "@server/logger";
import { sendToClient } from "#dynamic/routers/ws";
import { build } from "@server/build";
import { s3Client } from "@server/lib/s3";
import cache from "@server/lib/cache";
interface StripeEvent {
identifier?: string;
timestamp: number;
event_name: string;
payload: {
value: number;
stripe_customer_id: string;
};
}
export function noop() {
if (build !== "saas") {
return true;
@@ -38,41 +22,11 @@ export function noop() {
}
export class UsageService {
// private bucketName: string | undefined;
// private events: StripeEvent[] = [];
// private lastUploadTime: number = Date.now();
// private isUploading: boolean = false;
constructor() {
if (noop()) {
return;
}
// this.bucketName = process.env.S3_BUCKET || undefined;
// // Periodically check and upload events
// setInterval(() => {
// this.checkAndUploadEvents().catch((err) => {
// logger.error("Error in periodic event upload:", err);
// });
// }, 30000); // every 30 seconds
// // Handle graceful shutdown on SIGTERM
// process.on("SIGTERM", async () => {
// logger.info(
// "SIGTERM received, uploading events before shutdown..."
// );
// await this.forceUpload();
// logger.info("Events uploaded, proceeding with shutdown");
// });
// // Handle SIGINT as well (Ctrl+C)
// process.on("SIGINT", async () => {
// logger.info("SIGINT received, uploading events before shutdown...");
// await this.forceUpload();
// logger.info("Events uploaded, proceeding with shutdown");
// process.exit(0);
// });
}
/**
@@ -103,16 +57,6 @@ export class UsageService {
while (attempt <= maxRetries) {
try {
// // Get subscription data for this org (with caching)
// const customerId = await this.getCustomerId(orgIdToUse, featureId);
// if (!customerId) {
// logger.warn(
// `No subscription data found for org ${orgIdToUse} and feature ${featureId}`
// );
// return null;
// }
let usage;
if (transaction) {
usage = await this.internalAddUsage(
@@ -132,11 +76,6 @@ export class UsageService {
});
}
// Log event for Stripe
// if (privateConfig.getRawPrivateConfig().flags.usage_reporting) {
// await this.logStripeEvent(featureId, value, customerId);
// }
return usage || null;
} catch (error: any) {
// Check if this is a deadlock error
@@ -191,13 +130,14 @@ export class UsageService {
featureId,
orgId,
meterId,
instantaneousValue: value,
latestValue: value,
updatedAt: Math.floor(Date.now() / 1000)
})
.onConflictDoUpdate({
target: usage.usageId,
set: {
latestValue: sql`${usage.latestValue} + ${value}`
instantaneousValue: sql`${usage.instantaneousValue} + ${value}`
}
})
.returning();
@@ -228,17 +168,6 @@ export class UsageService {
let orgIdToUse = await this.getBillingOrg(orgId);
try {
// if (!customerId) {
// customerId =
// (await this.getCustomerId(orgIdToUse, featureId)) || undefined;
// if (!customerId) {
// logger.warn(
// `No subscription data found for org ${orgIdToUse} and feature ${featureId}`
// );
// return;
// }
// }
// Truncate value to 11 decimal places if provided
if (value !== undefined && value !== null) {
value = this.truncateValue(value);
@@ -523,114 +452,6 @@ export class UsageService {
return hasExceededLimits;
}
// private async logStripeEvent(
// featureId: FeatureId,
// value: number,
// customerId: string
// ): Promise<void> {
// // Truncate value to 11 decimal places before sending to Stripe
// const truncatedValue = this.truncateValue(value);
// const event: StripeEvent = {
// identifier: uuidv4(),
// timestamp: Math.floor(new Date().getTime() / 1000),
// event_name: featureId,
// payload: {
// value: truncatedValue,
// stripe_customer_id: customerId
// }
// };
// this.addEventToMemory(event);
// await this.checkAndUploadEvents();
// }
// private addEventToMemory(event: StripeEvent): void {
// if (!this.bucketName) {
// logger.warn(
// "S3 bucket name is not configured, skipping event storage."
// );
// return;
// }
// this.events.push(event);
// }
// private async checkAndUploadEvents(): Promise<void> {
// const now = Date.now();
// const timeSinceLastUpload = now - this.lastUploadTime;
// // Check if at least 1 minute has passed since last upload
// if (timeSinceLastUpload >= 60000 && this.events.length > 0) {
// await this.uploadEventsToS3();
// }
// }
// private async uploadEventsToS3(): Promise<void> {
// if (!this.bucketName) {
// logger.warn(
// "S3 bucket name is not configured, skipping S3 upload."
// );
// return;
// }
// if (this.events.length === 0) {
// return;
// }
// // Check if already uploading
// if (this.isUploading) {
// logger.debug("Already uploading events, skipping");
// return;
// }
// this.isUploading = true;
// try {
// // Take a snapshot of current events and clear the array
// const eventsToUpload = [...this.events];
// this.events = [];
// this.lastUploadTime = Date.now();
// const fileName = this.generateEventFileName();
// const fileContent = JSON.stringify(eventsToUpload, null, 2);
// // Upload to S3
// const uploadCommand = new PutObjectCommand({
// Bucket: this.bucketName,
// Key: fileName,
// Body: fileContent,
// ContentType: "application/json"
// });
// await s3Client.send(uploadCommand);
// logger.info(
// `Uploaded ${fileName} to S3 with ${eventsToUpload.length} events`
// );
// } catch (error) {
// logger.error("Failed to upload events to S3:", error);
// // Note: Events are lost if upload fails. In a production system,
// // you might want to add the events back to the array or implement retry logic
// } finally {
// this.isUploading = false;
// }
// }
// private generateEventFileName(): string {
// const timestamp = new Date().toISOString().replace(/[:.]/g, "-");
// const uuid = uuidv4().substring(0, 8);
// return `events-${timestamp}-${uuid}.json`;
// }
// public async forceUpload(): Promise<void> {
// if (this.events.length > 0) {
// // Force upload regardless of time
// this.lastUploadTime = 0; // Reset to force upload
// await this.uploadEventsToS3();
// }
// }
}
export const usageService = new UsageService();