Make easier to run in dev - fix a couple of things

This commit is contained in:
Owen
2025-10-12 16:23:38 -07:00
parent f17a957058
commit a50c0d84e9
10 changed files with 265 additions and 123 deletions

View File

@@ -31,6 +31,17 @@ interface StripeEvent {
};
}
export function noop() {
if (
build !== "saas" ||
!process.env.S3_BUCKET ||
!process.env.LOCAL_FILE_PATH
) {
return true;
}
return false;
}
export class UsageService {
private cache: NodeCache;
private bucketName: string | undefined;
@@ -41,7 +52,7 @@ export class UsageService {
constructor() {
this.cache = new NodeCache({ stdTTL: 300 }); // 5 minute TTL
if (build !== "saas") {
if (noop()) {
return;
}
// this.bucketName = privateConfig.getRawPrivateConfig().stripe?.s3Bucket;
@@ -71,7 +82,9 @@ export class UsageService {
private async initializeEventsDirectory(): Promise<void> {
if (!this.eventsDir) {
logger.warn("Stripe local file path is not configured, skipping events directory initialization.");
logger.warn(
"Stripe local file path is not configured, skipping events directory initialization."
);
return;
}
try {
@@ -83,7 +96,9 @@ export class UsageService {
private async uploadPendingEventFilesOnStartup(): Promise<void> {
if (!this.eventsDir || !this.bucketName) {
logger.warn("Stripe local file path or bucket name is not configured, skipping leftover event file upload.");
logger.warn(
"Stripe local file path or bucket name is not configured, skipping leftover event file upload."
);
return;
}
try {
@@ -106,15 +121,17 @@ export class UsageService {
ContentType: "application/json"
});
await s3Client.send(uploadCommand);
// Check if file still exists before unlinking
try {
await fs.access(filePath);
await fs.unlink(filePath);
} catch (unlinkError) {
logger.debug(`Startup file ${file} was already deleted`);
logger.debug(
`Startup file ${file} was already deleted`
);
}
logger.info(
`Uploaded leftover event file ${file} to S3 with ${events.length} events`
);
@@ -124,7 +141,9 @@ export class UsageService {
await fs.access(filePath);
await fs.unlink(filePath);
} catch (unlinkError) {
logger.debug(`Empty startup file ${file} was already deleted`);
logger.debug(
`Empty startup file ${file} was already deleted`
);
}
}
} catch (err) {
@@ -135,8 +154,8 @@ export class UsageService {
}
}
}
} catch (err) {
logger.error("Failed to scan for leftover event files:", err);
} catch (error) {
logger.error("Failed to scan for leftover event files");
}
}
@@ -146,17 +165,17 @@ export class UsageService {
value: number,
transaction: any = null
): Promise<Usage | null> {
if (build !== "saas") {
if (noop()) {
return null;
}
// Truncate value to 11 decimal places
value = this.truncateValue(value);
// Implement retry logic for deadlock handling
const maxRetries = 3;
let attempt = 0;
while (attempt <= maxRetries) {
try {
// Get subscription data for this org (with caching)
@@ -179,7 +198,12 @@ export class UsageService {
);
} else {
await db.transaction(async (trx) => {
usage = await this.internalAddUsage(orgId, featureId, value, trx);
usage = await this.internalAddUsage(
orgId,
featureId,
value,
trx
);
});
}
@@ -189,25 +213,26 @@ export class UsageService {
return usage || null;
} catch (error: any) {
// Check if this is a deadlock error
const isDeadlock = error?.code === '40P01' ||
error?.cause?.code === '40P01' ||
(error?.message && error.message.includes('deadlock'));
const isDeadlock =
error?.code === "40P01" ||
error?.cause?.code === "40P01" ||
(error?.message && error.message.includes("deadlock"));
if (isDeadlock && attempt < maxRetries) {
attempt++;
// Exponential backoff with jitter: 50-150ms, 100-300ms, 200-600ms
const baseDelay = Math.pow(2, attempt - 1) * 50;
const jitter = Math.random() * baseDelay;
const delay = baseDelay + jitter;
logger.warn(
`Deadlock detected for ${orgId}/${featureId}, retrying attempt ${attempt}/${maxRetries} after ${delay.toFixed(0)}ms`
);
await new Promise(resolve => setTimeout(resolve, delay));
await new Promise((resolve) => setTimeout(resolve, delay));
continue;
}
logger.error(
`Failed to add usage for ${orgId}/${featureId} after ${attempt} attempts:`,
error
@@ -227,10 +252,10 @@ export class UsageService {
): Promise<Usage> {
// Truncate value to 11 decimal places
value = this.truncateValue(value);
const usageId = `${orgId}-${featureId}`;
const meterId = getFeatureMeterId(featureId);
// Use upsert: insert if not exists, otherwise increment
const [returnUsage] = await trx
.insert(usage)
@@ -247,7 +272,8 @@ export class UsageService {
set: {
latestValue: sql`${usage.latestValue} + ${value}`
}
}).returning();
})
.returning();
return returnUsage;
}
@@ -268,7 +294,7 @@ export class UsageService {
value?: number,
customerId?: string
): Promise<void> {
if (build !== "saas") {
if (noop()) {
return;
}
try {
@@ -339,7 +365,7 @@ export class UsageService {
.set({
latestValue: newRunningTotal,
instantaneousValue: value,
updatedAt: Math.floor(Date.now() / 1000)
updatedAt: Math.floor(Date.now() / 1000)
})
.where(eq(usage.usageId, usageId));
}
@@ -354,7 +380,7 @@ export class UsageService {
meterId,
instantaneousValue: truncatedValue,
latestValue: truncatedValue,
updatedAt: Math.floor(Date.now() / 1000)
updatedAt: Math.floor(Date.now() / 1000)
});
}
});
@@ -415,7 +441,7 @@ export class UsageService {
): Promise<void> {
// Truncate value to 11 decimal places before sending to Stripe
const truncatedValue = this.truncateValue(value);
const event: StripeEvent = {
identifier: uuidv4(),
timestamp: Math.floor(new Date().getTime() / 1000),
@@ -432,7 +458,9 @@ export class UsageService {
private async writeEventToFile(event: StripeEvent): Promise<void> {
if (!this.eventsDir || !this.bucketName) {
logger.warn("Stripe local file path or bucket name is not configured, skipping event file write.");
logger.warn(
"Stripe local file path or bucket name is not configured, skipping event file write."
);
return;
}
if (!this.currentEventFile) {
@@ -481,7 +509,9 @@ export class UsageService {
private async uploadFileToS3(): Promise<void> {
if (!this.bucketName || !this.eventsDir) {
logger.warn("Stripe local file path or bucket name is not configured, skipping S3 upload.");
logger.warn(
"Stripe local file path or bucket name is not configured, skipping S3 upload."
);
return;
}
if (!this.currentEventFile) {
@@ -493,7 +523,9 @@ export class UsageService {
// Check if this file is already being uploaded
if (this.uploadingFiles.has(fileName)) {
logger.debug(`File ${fileName} is already being uploaded, skipping`);
logger.debug(
`File ${fileName} is already being uploaded, skipping`
);
return;
}
@@ -505,7 +537,9 @@ export class UsageService {
try {
await fs.access(filePath);
} catch (error) {
logger.debug(`File ${fileName} does not exist, may have been already processed`);
logger.debug(
`File ${fileName} does not exist, may have been already processed`
);
this.uploadingFiles.delete(fileName);
// Reset current file if it was this file
if (this.currentEventFile === fileName) {
@@ -525,7 +559,9 @@ export class UsageService {
await fs.unlink(filePath);
} catch (unlinkError) {
// File may have been already deleted
logger.debug(`File ${fileName} was already deleted during cleanup`);
logger.debug(
`File ${fileName} was already deleted during cleanup`
);
}
this.currentEventFile = null;
this.uploadingFiles.delete(fileName);
@@ -548,7 +584,9 @@ export class UsageService {
await fs.unlink(filePath);
} catch (unlinkError) {
// File may have been already deleted by another process
logger.debug(`File ${fileName} was already deleted during upload`);
logger.debug(
`File ${fileName} was already deleted during upload`
);
}
logger.info(
@@ -559,10 +597,7 @@ export class UsageService {
this.currentEventFile = null;
this.currentFileStartTime = 0;
} catch (error) {
logger.error(
`Failed to upload ${fileName} to S3:`,
error
);
logger.error(`Failed to upload ${fileName} to S3:`, error);
} finally {
// Always remove from uploading set
this.uploadingFiles.delete(fileName);
@@ -579,7 +614,7 @@ export class UsageService {
orgId: string,
featureId: FeatureId
): Promise<Usage | null> {
if (build !== "saas") {
if (noop()) {
return null;
}
@@ -598,7 +633,7 @@ export class UsageService {
`Creating new usage record for ${orgId}/${featureId}`
);
const meterId = getFeatureMeterId(featureId);
try {
const [newUsage] = await db
.insert(usage)
@@ -653,7 +688,7 @@ export class UsageService {
orgId: string,
featureId: FeatureId
): Promise<Usage | null> {
if (build !== "saas") {
if (noop()) {
return null;
}
await this.updateDaily(orgId, featureId); // Ensure daily usage is updated
@@ -673,7 +708,9 @@ export class UsageService {
*/
private async uploadOldEventFiles(): Promise<void> {
if (!this.eventsDir || !this.bucketName) {
logger.warn("Stripe local file path or bucket name is not configured, skipping old event file upload.");
logger.warn(
"Stripe local file path or bucket name is not configured, skipping old event file upload."
);
return;
}
try {
@@ -681,15 +718,17 @@ export class UsageService {
const now = Date.now();
for (const file of files) {
if (!file.endsWith(".json")) continue;
// Skip files that are already being uploaded
if (this.uploadingFiles.has(file)) {
logger.debug(`Skipping file ${file} as it's already being uploaded`);
logger.debug(
`Skipping file ${file} as it's already being uploaded`
);
continue;
}
const filePath = path.join(this.eventsDir, file);
try {
// Check if file still exists before processing
try {
@@ -704,7 +743,7 @@ export class UsageService {
if (age >= 90000) {
// 1.5 minutes - Mark as being uploaded
this.uploadingFiles.add(file);
try {
const fileContent = await fs.readFile(
filePath,
@@ -720,15 +759,17 @@ export class UsageService {
ContentType: "application/json"
});
await s3Client.send(uploadCommand);
// Check if file still exists before unlinking
try {
await fs.access(filePath);
await fs.unlink(filePath);
} catch (unlinkError) {
logger.debug(`File ${file} was already deleted during interval upload`);
logger.debug(
`File ${file} was already deleted during interval upload`
);
}
logger.info(
`Interval: Uploaded event file ${file} to S3 with ${events.length} events`
);
@@ -743,7 +784,9 @@ export class UsageService {
await fs.access(filePath);
await fs.unlink(filePath);
} catch (unlinkError) {
logger.debug(`Empty file ${file} was already deleted`);
logger.debug(
`Empty file ${file} was already deleted`
);
}
}
} finally {
@@ -765,12 +808,17 @@ export class UsageService {
}
}
public async checkLimitSet(orgId: string, kickSites = false, featureId?: FeatureId, usage?: Usage): Promise<boolean> {
if (build !== "saas") {
public async checkLimitSet(
orgId: string,
kickSites = false,
featureId?: FeatureId,
usage?: Usage
): Promise<boolean> {
if (noop()) {
return false;
}
// This method should check the current usage against the limits set for the organization
// and kick out all of the sites on the org
// and kick out all of the sites on the org
let hasExceededLimits = false;
try {
@@ -805,16 +853,30 @@ export class UsageService {
if (usage) {
currentUsage = usage;
} else {
currentUsage = await this.getUsage(orgId, limit.featureId as FeatureId);
currentUsage = await this.getUsage(
orgId,
limit.featureId as FeatureId
);
}
const usageValue = currentUsage?.instantaneousValue || currentUsage?.latestValue || 0;
logger.debug(`Current usage for org ${orgId} on feature ${limit.featureId}: ${usageValue}`);
logger.debug(`Limit for org ${orgId} on feature ${limit.featureId}: ${limit.value}`);
if (currentUsage && limit.value !== null && usageValue > limit.value) {
const usageValue =
currentUsage?.instantaneousValue ||
currentUsage?.latestValue ||
0;
logger.debug(
`Current usage for org ${orgId} on feature ${limit.featureId}: ${usageValue}`
);
logger.debug(
`Limit for org ${orgId} on feature ${limit.featureId}: ${limit.value}`
);
if (
currentUsage &&
limit.value !== null &&
usageValue > limit.value
) {
logger.debug(
`Org ${orgId} has exceeded limit for ${limit.featureId}: ` +
`${usageValue} > ${limit.value}`
`${usageValue} > ${limit.value}`
);
hasExceededLimits = true;
break; // Exit early if any limit is exceeded
@@ -823,7 +885,9 @@ export class UsageService {
// If any limits are exceeded, disconnect all sites for this organization
if (hasExceededLimits && kickSites) {
logger.warn(`Disconnecting all sites for org ${orgId} due to exceeded limits`);
logger.warn(
`Disconnecting all sites for org ${orgId} due to exceeded limits`
);
// Get all sites for this organization
const orgSites = await db
@@ -832,7 +896,7 @@ export class UsageService {
.where(eq(sites.orgId, orgId));
// Mark all sites as offline and send termination messages
const siteUpdates = orgSites.map(site => site.siteId);
const siteUpdates = orgSites.map((site) => site.siteId);
if (siteUpdates.length > 0) {
// Send termination messages to newt sites
@@ -853,17 +917,21 @@ export class UsageService {
};
// Don't await to prevent blocking
sendToClient(newt.newtId, payload).catch((error: any) => {
logger.error(
`Failed to send termination message to newt ${newt.newtId}:`,
error
);
});
sendToClient(newt.newtId, payload).catch(
(error: any) => {
logger.error(
`Failed to send termination message to newt ${newt.newtId}:`,
error
);
}
);
}
}
}
logger.info(`Disconnected ${orgSites.length} sites for org ${orgId} due to exceeded limits`);
logger.info(
`Disconnected ${orgSites.length} sites for org ${orgId} due to exceeded limits`
);
}
}
} catch (error) {