Merge branch 'dev' into distribution

This commit is contained in:
miloschwartz
2025-10-13 11:06:14 -07:00
85 changed files with 906 additions and 1639 deletions

View File

@@ -31,6 +31,17 @@ interface StripeEvent {
};
}
export function noop() {
if (
build !== "saas" ||
!process.env.S3_BUCKET ||
!process.env.LOCAL_FILE_PATH
) {
return true;
}
return false;
}
export class UsageService {
private cache: NodeCache;
private bucketName: string | undefined;
@@ -41,7 +52,7 @@ export class UsageService {
constructor() {
this.cache = new NodeCache({ stdTTL: 300 }); // 5 minute TTL
if (build !== "saas") {
if (noop()) {
return;
}
// this.bucketName = privateConfig.getRawPrivateConfig().stripe?.s3Bucket;
@@ -71,7 +82,9 @@ export class UsageService {
private async initializeEventsDirectory(): Promise<void> {
if (!this.eventsDir) {
logger.warn("Stripe local file path is not configured, skipping events directory initialization.");
logger.warn(
"Stripe local file path is not configured, skipping events directory initialization."
);
return;
}
try {
@@ -83,7 +96,9 @@ export class UsageService {
private async uploadPendingEventFilesOnStartup(): Promise<void> {
if (!this.eventsDir || !this.bucketName) {
logger.warn("Stripe local file path or bucket name is not configured, skipping leftover event file upload.");
logger.warn(
"Stripe local file path or bucket name is not configured, skipping leftover event file upload."
);
return;
}
try {
@@ -106,15 +121,17 @@ export class UsageService {
ContentType: "application/json"
});
await s3Client.send(uploadCommand);
// Check if file still exists before unlinking
try {
await fs.access(filePath);
await fs.unlink(filePath);
} catch (unlinkError) {
logger.debug(`Startup file ${file} was already deleted`);
logger.debug(
`Startup file ${file} was already deleted`
);
}
logger.info(
`Uploaded leftover event file ${file} to S3 with ${events.length} events`
);
@@ -124,7 +141,9 @@ export class UsageService {
await fs.access(filePath);
await fs.unlink(filePath);
} catch (unlinkError) {
logger.debug(`Empty startup file ${file} was already deleted`);
logger.debug(
`Empty startup file ${file} was already deleted`
);
}
}
} catch (err) {
@@ -135,8 +154,8 @@ export class UsageService {
}
}
}
} catch (err) {
logger.error("Failed to scan for leftover event files:", err);
} catch (error) {
logger.error("Failed to scan for leftover event files");
}
}
@@ -146,17 +165,17 @@ export class UsageService {
value: number,
transaction: any = null
): Promise<Usage | null> {
if (build !== "saas") {
if (noop()) {
return null;
}
// Truncate value to 11 decimal places
value = this.truncateValue(value);
// Implement retry logic for deadlock handling
const maxRetries = 3;
let attempt = 0;
while (attempt <= maxRetries) {
try {
// Get subscription data for this org (with caching)
@@ -179,7 +198,12 @@ export class UsageService {
);
} else {
await db.transaction(async (trx) => {
usage = await this.internalAddUsage(orgId, featureId, value, trx);
usage = await this.internalAddUsage(
orgId,
featureId,
value,
trx
);
});
}
@@ -189,25 +213,26 @@ export class UsageService {
return usage || null;
} catch (error: any) {
// Check if this is a deadlock error
const isDeadlock = error?.code === '40P01' ||
error?.cause?.code === '40P01' ||
(error?.message && error.message.includes('deadlock'));
const isDeadlock =
error?.code === "40P01" ||
error?.cause?.code === "40P01" ||
(error?.message && error.message.includes("deadlock"));
if (isDeadlock && attempt < maxRetries) {
attempt++;
// Exponential backoff with jitter: 50-150ms, 100-300ms, 200-600ms
const baseDelay = Math.pow(2, attempt - 1) * 50;
const jitter = Math.random() * baseDelay;
const delay = baseDelay + jitter;
logger.warn(
`Deadlock detected for ${orgId}/${featureId}, retrying attempt ${attempt}/${maxRetries} after ${delay.toFixed(0)}ms`
);
await new Promise(resolve => setTimeout(resolve, delay));
await new Promise((resolve) => setTimeout(resolve, delay));
continue;
}
logger.error(
`Failed to add usage for ${orgId}/${featureId} after ${attempt} attempts:`,
error
@@ -227,10 +252,10 @@ export class UsageService {
): Promise<Usage> {
// Truncate value to 11 decimal places
value = this.truncateValue(value);
const usageId = `${orgId}-${featureId}`;
const meterId = getFeatureMeterId(featureId);
// Use upsert: insert if not exists, otherwise increment
const [returnUsage] = await trx
.insert(usage)
@@ -247,7 +272,8 @@ export class UsageService {
set: {
latestValue: sql`${usage.latestValue} + ${value}`
}
}).returning();
})
.returning();
return returnUsage;
}
@@ -268,7 +294,7 @@ export class UsageService {
value?: number,
customerId?: string
): Promise<void> {
if (build !== "saas") {
if (noop()) {
return;
}
try {
@@ -339,7 +365,7 @@ export class UsageService {
.set({
latestValue: newRunningTotal,
instantaneousValue: value,
updatedAt: Math.floor(Date.now() / 1000)
updatedAt: Math.floor(Date.now() / 1000)
})
.where(eq(usage.usageId, usageId));
}
@@ -354,7 +380,7 @@ export class UsageService {
meterId,
instantaneousValue: truncatedValue,
latestValue: truncatedValue,
updatedAt: Math.floor(Date.now() / 1000)
updatedAt: Math.floor(Date.now() / 1000)
});
}
});
@@ -415,7 +441,7 @@ export class UsageService {
): Promise<void> {
// Truncate value to 11 decimal places before sending to Stripe
const truncatedValue = this.truncateValue(value);
const event: StripeEvent = {
identifier: uuidv4(),
timestamp: Math.floor(new Date().getTime() / 1000),
@@ -432,7 +458,9 @@ export class UsageService {
private async writeEventToFile(event: StripeEvent): Promise<void> {
if (!this.eventsDir || !this.bucketName) {
logger.warn("Stripe local file path or bucket name is not configured, skipping event file write.");
logger.warn(
"Stripe local file path or bucket name is not configured, skipping event file write."
);
return;
}
if (!this.currentEventFile) {
@@ -481,7 +509,9 @@ export class UsageService {
private async uploadFileToS3(): Promise<void> {
if (!this.bucketName || !this.eventsDir) {
logger.warn("Stripe local file path or bucket name is not configured, skipping S3 upload.");
logger.warn(
"Stripe local file path or bucket name is not configured, skipping S3 upload."
);
return;
}
if (!this.currentEventFile) {
@@ -493,7 +523,9 @@ export class UsageService {
// Check if this file is already being uploaded
if (this.uploadingFiles.has(fileName)) {
logger.debug(`File ${fileName} is already being uploaded, skipping`);
logger.debug(
`File ${fileName} is already being uploaded, skipping`
);
return;
}
@@ -505,7 +537,9 @@ export class UsageService {
try {
await fs.access(filePath);
} catch (error) {
logger.debug(`File ${fileName} does not exist, may have been already processed`);
logger.debug(
`File ${fileName} does not exist, may have been already processed`
);
this.uploadingFiles.delete(fileName);
// Reset current file if it was this file
if (this.currentEventFile === fileName) {
@@ -525,7 +559,9 @@ export class UsageService {
await fs.unlink(filePath);
} catch (unlinkError) {
// File may have been already deleted
logger.debug(`File ${fileName} was already deleted during cleanup`);
logger.debug(
`File ${fileName} was already deleted during cleanup`
);
}
this.currentEventFile = null;
this.uploadingFiles.delete(fileName);
@@ -548,7 +584,9 @@ export class UsageService {
await fs.unlink(filePath);
} catch (unlinkError) {
// File may have been already deleted by another process
logger.debug(`File ${fileName} was already deleted during upload`);
logger.debug(
`File ${fileName} was already deleted during upload`
);
}
logger.info(
@@ -559,10 +597,7 @@ export class UsageService {
this.currentEventFile = null;
this.currentFileStartTime = 0;
} catch (error) {
logger.error(
`Failed to upload ${fileName} to S3:`,
error
);
logger.error(`Failed to upload ${fileName} to S3:`, error);
} finally {
// Always remove from uploading set
this.uploadingFiles.delete(fileName);
@@ -579,7 +614,7 @@ export class UsageService {
orgId: string,
featureId: FeatureId
): Promise<Usage | null> {
if (build !== "saas") {
if (noop()) {
return null;
}
@@ -598,7 +633,7 @@ export class UsageService {
`Creating new usage record for ${orgId}/${featureId}`
);
const meterId = getFeatureMeterId(featureId);
try {
const [newUsage] = await db
.insert(usage)
@@ -653,7 +688,7 @@ export class UsageService {
orgId: string,
featureId: FeatureId
): Promise<Usage | null> {
if (build !== "saas") {
if (noop()) {
return null;
}
await this.updateDaily(orgId, featureId); // Ensure daily usage is updated
@@ -673,7 +708,9 @@ export class UsageService {
*/
private async uploadOldEventFiles(): Promise<void> {
if (!this.eventsDir || !this.bucketName) {
logger.warn("Stripe local file path or bucket name is not configured, skipping old event file upload.");
logger.warn(
"Stripe local file path or bucket name is not configured, skipping old event file upload."
);
return;
}
try {
@@ -681,15 +718,17 @@ export class UsageService {
const now = Date.now();
for (const file of files) {
if (!file.endsWith(".json")) continue;
// Skip files that are already being uploaded
if (this.uploadingFiles.has(file)) {
logger.debug(`Skipping file ${file} as it's already being uploaded`);
logger.debug(
`Skipping file ${file} as it's already being uploaded`
);
continue;
}
const filePath = path.join(this.eventsDir, file);
try {
// Check if file still exists before processing
try {
@@ -704,7 +743,7 @@ export class UsageService {
if (age >= 90000) {
// 1.5 minutes - Mark as being uploaded
this.uploadingFiles.add(file);
try {
const fileContent = await fs.readFile(
filePath,
@@ -720,15 +759,17 @@ export class UsageService {
ContentType: "application/json"
});
await s3Client.send(uploadCommand);
// Check if file still exists before unlinking
try {
await fs.access(filePath);
await fs.unlink(filePath);
} catch (unlinkError) {
logger.debug(`File ${file} was already deleted during interval upload`);
logger.debug(
`File ${file} was already deleted during interval upload`
);
}
logger.info(
`Interval: Uploaded event file ${file} to S3 with ${events.length} events`
);
@@ -743,7 +784,9 @@ export class UsageService {
await fs.access(filePath);
await fs.unlink(filePath);
} catch (unlinkError) {
logger.debug(`Empty file ${file} was already deleted`);
logger.debug(
`Empty file ${file} was already deleted`
);
}
}
} finally {
@@ -765,12 +808,17 @@ export class UsageService {
}
}
public async checkLimitSet(orgId: string, kickSites = false, featureId?: FeatureId, usage?: Usage): Promise<boolean> {
if (build !== "saas") {
public async checkLimitSet(
orgId: string,
kickSites = false,
featureId?: FeatureId,
usage?: Usage
): Promise<boolean> {
if (noop()) {
return false;
}
// This method should check the current usage against the limits set for the organization
// and kick out all of the sites on the org
// and kick out all of the sites on the org
let hasExceededLimits = false;
try {
@@ -805,16 +853,30 @@ export class UsageService {
if (usage) {
currentUsage = usage;
} else {
currentUsage = await this.getUsage(orgId, limit.featureId as FeatureId);
currentUsage = await this.getUsage(
orgId,
limit.featureId as FeatureId
);
}
const usageValue = currentUsage?.instantaneousValue || currentUsage?.latestValue || 0;
logger.debug(`Current usage for org ${orgId} on feature ${limit.featureId}: ${usageValue}`);
logger.debug(`Limit for org ${orgId} on feature ${limit.featureId}: ${limit.value}`);
if (currentUsage && limit.value !== null && usageValue > limit.value) {
const usageValue =
currentUsage?.instantaneousValue ||
currentUsage?.latestValue ||
0;
logger.debug(
`Current usage for org ${orgId} on feature ${limit.featureId}: ${usageValue}`
);
logger.debug(
`Limit for org ${orgId} on feature ${limit.featureId}: ${limit.value}`
);
if (
currentUsage &&
limit.value !== null &&
usageValue > limit.value
) {
logger.debug(
`Org ${orgId} has exceeded limit for ${limit.featureId}: ` +
`${usageValue} > ${limit.value}`
`${usageValue} > ${limit.value}`
);
hasExceededLimits = true;
break; // Exit early if any limit is exceeded
@@ -823,7 +885,9 @@ export class UsageService {
// If any limits are exceeded, disconnect all sites for this organization
if (hasExceededLimits && kickSites) {
logger.warn(`Disconnecting all sites for org ${orgId} due to exceeded limits`);
logger.warn(
`Disconnecting all sites for org ${orgId} due to exceeded limits`
);
// Get all sites for this organization
const orgSites = await db
@@ -832,7 +896,7 @@ export class UsageService {
.where(eq(sites.orgId, orgId));
// Mark all sites as offline and send termination messages
const siteUpdates = orgSites.map(site => site.siteId);
const siteUpdates = orgSites.map((site) => site.siteId);
if (siteUpdates.length > 0) {
// Send termination messages to newt sites
@@ -853,17 +917,21 @@ export class UsageService {
};
// Don't await to prevent blocking
sendToClient(newt.newtId, payload).catch((error: any) => {
logger.error(
`Failed to send termination message to newt ${newt.newtId}:`,
error
);
});
sendToClient(newt.newtId, payload).catch(
(error: any) => {
logger.error(
`Failed to send termination message to newt ${newt.newtId}:`,
error
);
}
);
}
}
}
logger.info(`Disconnected ${orgSites.length} sites for org ${orgId} due to exceeded limits`);
logger.info(
`Disconnected ${orgSites.length} sites for org ${orgId} due to exceeded limits`
);
}
}
} catch (error) {

View File

@@ -1,70 +1,3 @@
import axios from "axios";
import { tokenManager } from "./tokenManager";
import logger from "@server/logger";
import config from "./config";
/**
* Get valid certificates for the specified domains
*/
export async function getValidCertificatesForDomainsHybrid(domains: Set<string>): Promise<
Array<{
id: number;
domain: string;
wildcard: boolean | null;
certFile: string | null;
keyFile: string | null;
expiresAt: number | null;
updatedAt?: number | null;
}>
> {
if (domains.size === 0) {
return [];
}
const domainArray = Array.from(domains);
try {
const response = await axios.get(
`${config.getRawConfig().managed?.endpoint}/api/v1/hybrid/certificates/domains`,
{
params: {
domains: domainArray
},
headers: (await tokenManager.getAuthHeader()).headers
}
);
if (response.status !== 200) {
logger.error(
`Failed to fetch certificates for domains: ${response.status} ${response.statusText}`,
{ responseData: response.data, domains: domainArray }
);
return [];
}
// logger.debug(
// `Successfully retrieved ${response.data.data?.length || 0} certificates for ${domainArray.length} domains`
// );
return response.data.data;
} catch (error) {
// pull data out of the axios error to log
if (axios.isAxiosError(error)) {
logger.error("Error getting certificates:", {
message: error.message,
code: error.code,
status: error.response?.status,
statusText: error.response?.statusText,
url: error.config?.url,
method: error.config?.method
});
} else {
logger.error("Error getting certificates:", error);
}
return [];
}
}
export async function getValidCertificatesForDomains(domains: Set<string>): Promise<
Array<{
id: number;

View File

@@ -102,10 +102,7 @@ export class Config {
if (!this.rawConfig) {
throw new Error("Config not loaded. Call load() first.");
}
if (this.rawConfig.managed) {
// LETS NOT WORRY ABOUT THE SERVER SECRET WHEN MANAGED
return;
}
license.setServerSecret(this.rawConfig.server.secret!);
await this.checkKeyStatus();
@@ -158,10 +155,6 @@ export class Config {
return false;
}
public isManagedMode() {
return typeof this.rawConfig?.managed === "object";
}
public async checkSupporterKey() {
const [key] = await db.select().from(supporterKey).limit(1);

View File

@@ -15,8 +15,7 @@ import {
} from "@server/db";
import { eq } from "drizzle-orm";
import { defaultRoleAllowedActions } from "@server/routers/role";
import { FeatureId, limitsService, sandboxLimitSet } from "@server/lib/billing";
import { createCustomer } from "@server/private/lib/billing/createCustomer";
import { FeatureId, limitsService, sandboxLimitSet, createCustomer } from "@server/lib/billing";
import { usageService } from "@server/lib/billing/usageService";
export async function createUserAccountOrg(

View File

@@ -1,8 +1,5 @@
import logger from "@server/logger";
import { maxmindLookup } from "@server/db/maxmind";
import axios from "axios";
import config from "./config";
import { tokenManager } from "./tokenManager";
export async function getCountryCodeForIp(
ip: string
@@ -33,32 +30,4 @@ export async function getCountryCodeForIp(
}
return;
}
export async function remoteGetCountryCodeForIp(
ip: string
): Promise<string | undefined> {
try {
const response = await axios.get(
`${config.getRawConfig().managed?.endpoint}/api/v1/hybrid/geoip/${ip}`,
await tokenManager.getAuthHeader()
);
return response.data.data.countryCode;
} catch (error) {
if (axios.isAxiosError(error)) {
logger.error("Error fetching config in verify session:", {
message: error.message,
code: error.code,
status: error.response?.status,
statusText: error.response?.statusText,
url: error.config?.url,
method: error.config?.method
});
} else {
logger.error("Error fetching config in verify session:", error);
}
}
return;
}
}

View File

@@ -42,18 +42,6 @@ export const configSchema = z
anonymous_usage: true
}
}),
managed: z
.object({
name: z.string().optional(),
id: z.string().optional(),
secret: z.string().optional(),
endpoint: z
.string()
.optional()
.default("https://pangolin.fossorial.io"),
redirect_endpoint: z.string().optional()
})
.optional(),
domains: z
.record(
z.string(),
@@ -346,10 +334,7 @@ export const configSchema = z
if (data.flags?.disable_config_managed_domains) {
return true;
}
// If hybrid is defined, domains are not required
if (data.managed) {
return true;
}
if (keys.length === 0) {
return false;
}
@@ -361,10 +346,6 @@ export const configSchema = z
)
.refine(
(data) => {
// If hybrid is defined, server secret is not required
if (data.managed) {
return true;
}
// If hybrid is not defined, server secret must be defined. If its not defined already then pull it from env
if (data.server?.secret === undefined) {
data.server.secret = process.env.SERVER_SECRET;
@@ -380,10 +361,6 @@ export const configSchema = z
)
.refine(
(data) => {
// If hybrid is defined, dashboard_url is not required
if (data.managed) {
return true;
}
// If hybrid is not defined, dashboard_url must be defined
return (
data.app.dashboard_url !== undefined &&

View File

@@ -1,73 +0,0 @@
import { Request, Response, NextFunction } from "express";
import { Router } from "express";
import axios from "axios";
import HttpCode from "@server/types/HttpCode";
import createHttpError from "http-errors";
import logger from "@server/logger";
import config from "@server/lib/config";
import { tokenManager } from "./tokenManager";
/**
* Proxy function that forwards requests to the remote cloud server
*/
export const proxyToRemote = async (
req: Request,
res: Response,
next: NextFunction,
endpoint: string
): Promise<any> => {
try {
const remoteUrl = `${config.getRawConfig().managed?.endpoint?.replace(/\/$/, '')}/api/v1/${endpoint}`;
logger.debug(`Proxying request to remote server: ${remoteUrl}`);
// Forward the request to the remote server
const response = await axios({
method: req.method as any,
url: remoteUrl,
data: req.body,
headers: {
'Content-Type': 'application/json',
...(await tokenManager.getAuthHeader()).headers
},
params: req.query,
timeout: 30000, // 30 second timeout
validateStatus: () => true // Don't throw on non-2xx status codes
});
logger.debug(`Proxy response: ${JSON.stringify(response.data)}`);
// Forward the response status and data
return res.status(response.status).json(response.data);
} catch (error) {
logger.error("Error proxying request to remote server:", error);
if (axios.isAxiosError(error)) {
if (error.code === 'ECONNREFUSED' || error.code === 'ENOTFOUND') {
return next(
createHttpError(
HttpCode.SERVICE_UNAVAILABLE,
"Remote server is unavailable"
)
);
}
if (error.code === 'ECONNABORTED') {
return next(
createHttpError(
HttpCode.REQUEST_TIMEOUT,
"Request to remote server timed out"
)
);
}
}
return next(
createHttpError(
HttpCode.INTERNAL_SERVER_ERROR,
"Error communicating with remote server"
)
);
}
};

View File

@@ -1,274 +0,0 @@
import axios from "axios";
import config from "@server/lib/config";
import logger from "@server/logger";
export interface TokenResponse {
success: boolean;
message?: string;
data: {
token: string;
};
}
/**
* Token Manager - Handles automatic token refresh for hybrid server authentication
*
* Usage throughout the application:
* ```typescript
* import { tokenManager } from "@server/lib/tokenManager";
*
* // Get the current valid token
* const token = await tokenManager.getToken();
*
* // Force refresh if needed
* await tokenManager.refreshToken();
* ```
*
* The token manager automatically refreshes tokens every 24 hours by default
* and is started once in the privateHybridServer.ts file.
*/
export class TokenManager {
private token: string | null = null;
private refreshInterval: NodeJS.Timeout | null = null;
private isRefreshing: boolean = false;
private refreshIntervalMs: number;
private retryInterval: NodeJS.Timeout | null = null;
private retryIntervalMs: number;
private tokenAvailablePromise: Promise<void> | null = null;
private tokenAvailableResolve: (() => void) | null = null;
constructor(refreshIntervalMs: number = 24 * 60 * 60 * 1000, retryIntervalMs: number = 5000) {
// Default to 24 hours for refresh, 5 seconds for retry
this.refreshIntervalMs = refreshIntervalMs;
this.retryIntervalMs = retryIntervalMs;
this.setupTokenAvailablePromise();
}
/**
* Set up promise that resolves when token becomes available
*/
private setupTokenAvailablePromise(): void {
this.tokenAvailablePromise = new Promise((resolve) => {
this.tokenAvailableResolve = resolve;
});
}
/**
* Resolve the token available promise
*/
private resolveTokenAvailable(): void {
if (this.tokenAvailableResolve) {
this.tokenAvailableResolve();
this.tokenAvailableResolve = null;
}
}
/**
* Start the token manager - gets initial token and sets up refresh interval
* If initial token fetch fails, keeps retrying every few seconds until successful
*/
async start(): Promise<void> {
logger.info("Starting token manager...");
try {
await this.refreshToken();
this.setupRefreshInterval();
this.resolveTokenAvailable();
logger.info("Token manager started successfully");
} catch (error) {
logger.warn(`Failed to get initial token, will retry in ${this.retryIntervalMs / 1000} seconds:`, error);
this.setupRetryInterval();
}
}
/**
* Set up retry interval for initial token acquisition
*/
private setupRetryInterval(): void {
if (this.retryInterval) {
clearInterval(this.retryInterval);
}
this.retryInterval = setInterval(async () => {
try {
logger.debug("Retrying initial token acquisition");
await this.refreshToken();
this.setupRefreshInterval();
this.clearRetryInterval();
this.resolveTokenAvailable();
logger.info("Token manager started successfully after retry");
} catch (error) {
logger.debug("Token acquisition retry failed, will try again");
}
}, this.retryIntervalMs);
}
/**
* Clear retry interval
*/
private clearRetryInterval(): void {
if (this.retryInterval) {
clearInterval(this.retryInterval);
this.retryInterval = null;
}
}
/**
* Stop the token manager and clear all intervals
*/
stop(): void {
if (this.refreshInterval) {
clearInterval(this.refreshInterval);
this.refreshInterval = null;
}
this.clearRetryInterval();
logger.info("Token manager stopped");
}
/**
* Get the current valid token
*/
// TODO: WE SHOULD NOT BE GETTING A TOKEN EVERY TIME WE REQUEST IT
async getToken(): Promise<string> {
// If we don't have a token yet, wait for it to become available
if (!this.token && this.tokenAvailablePromise) {
await this.tokenAvailablePromise;
}
if (!this.token) {
if (this.isRefreshing) {
// Wait for current refresh to complete
await this.waitForRefresh();
} else {
throw new Error("No valid token available");
}
}
if (!this.token) {
throw new Error("No valid token available");
}
return this.token;
}
async getAuthHeader() {
return {
headers: {
Authorization: `Bearer ${await this.getToken()}`,
"X-CSRF-Token": "x-csrf-protection",
}
};
}
/**
* Force refresh the token
*/
async refreshToken(): Promise<void> {
if (this.isRefreshing) {
await this.waitForRefresh();
return;
}
this.isRefreshing = true;
try {
const hybridConfig = config.getRawConfig().managed;
if (
!hybridConfig?.id ||
!hybridConfig?.secret ||
!hybridConfig?.endpoint
) {
throw new Error("Hybrid configuration is not defined");
}
const tokenEndpoint = `${hybridConfig.endpoint}/api/v1/auth/remoteExitNode/get-token`;
const tokenData = {
remoteExitNodeId: hybridConfig.id,
secret: hybridConfig.secret
};
logger.debug("Requesting new token from server");
const response = await axios.post<TokenResponse>(
tokenEndpoint,
tokenData,
{
headers: {
"Content-Type": "application/json",
"X-CSRF-Token": "x-csrf-protection"
},
timeout: 10000 // 10 second timeout
}
);
if (!response.data.success) {
throw new Error(
`Failed to get token: ${response.data.message}`
);
}
if (!response.data.data.token) {
throw new Error("Received empty token from server");
}
this.token = response.data.data.token;
logger.debug("Token refreshed successfully");
} catch (error) {
if (axios.isAxiosError(error)) {
logger.error("Error updating proxy mapping:", {
message: error.message,
code: error.code,
status: error.response?.status,
statusText: error.response?.statusText,
url: error.config?.url,
method: error.config?.method
});
} else {
logger.error("Error updating proxy mapping:", error);
}
throw new Error("Failed to refresh token");
} finally {
this.isRefreshing = false;
}
}
/**
* Set up automatic token refresh interval
*/
private setupRefreshInterval(): void {
if (this.refreshInterval) {
clearInterval(this.refreshInterval);
}
this.refreshInterval = setInterval(async () => {
try {
logger.debug("Auto-refreshing token");
await this.refreshToken();
} catch (error) {
logger.error("Failed to auto-refresh token:", error);
}
}, this.refreshIntervalMs);
}
/**
* Wait for current refresh operation to complete
*/
private async waitForRefresh(): Promise<void> {
return new Promise((resolve) => {
const checkInterval = setInterval(() => {
if (!this.isRefreshing) {
clearInterval(checkInterval);
resolve();
}
}, 100);
});
}
}
// Export a singleton instance for use throughout the application
export const tokenManager = new TokenManager();

View File

@@ -6,13 +6,9 @@ import * as yaml from "js-yaml";
import axios from "axios";
import { db, exitNodes } from "@server/db";
import { eq } from "drizzle-orm";
import { tokenManager } from "../tokenManager";
import { getCurrentExitNodeId } from "@server/lib/exitNodes";
import { getTraefikConfig } from "#dynamic/lib/traefik";
import {
getValidCertificatesForDomains,
getValidCertificatesForDomainsHybrid
} from "#dynamic/lib/certificates";
import { getValidCertificatesForDomains } from "#dynamic/lib/certificates";
import { sendToExitNode } from "#dynamic/lib/exitNodes";
import { build } from "@server/build";
@@ -313,93 +309,92 @@ export class TraefikConfigManager {
this.lastActiveDomains = new Set(domains);
}
// Scan current local certificate state
this.lastLocalCertificateState =
await this.scanLocalCertificateState();
if (
process.env.GENERATE_OWN_CERTIFICATES === "true" &&
build != "oss"
) {
// Scan current local certificate state
this.lastLocalCertificateState =
await this.scanLocalCertificateState();
// Only fetch certificates if needed (domain changes, missing certs, or daily renewal check)
let validCertificates: Array<{
id: number;
domain: string;
wildcard: boolean | null;
certFile: string | null;
keyFile: string | null;
expiresAt: number | null;
updatedAt?: number | null;
}> = [];
// Only fetch certificates if needed (domain changes, missing certs, or daily renewal check)
let validCertificates: Array<{
id: number;
domain: string;
wildcard: boolean | null;
certFile: string | null;
keyFile: string | null;
expiresAt: number | null;
updatedAt?: number | null;
}> = [];
if (this.shouldFetchCertificates(domains)) {
// Filter out domains that are already covered by wildcard certificates
const domainsToFetch = new Set<string>();
for (const domain of domains) {
if (
!isDomainCoveredByWildcard(
domain,
this.lastLocalCertificateState
)
) {
domainsToFetch.add(domain);
} else {
logger.debug(
`Domain ${domain} is covered by existing wildcard certificate, skipping fetch`
);
}
}
if (domainsToFetch.size > 0) {
// Get valid certificates for domains not covered by wildcards
if (config.isManagedMode()) {
validCertificates =
await getValidCertificatesForDomainsHybrid(
domainsToFetch
if (this.shouldFetchCertificates(domains)) {
// Filter out domains that are already covered by wildcard certificates
const domainsToFetch = new Set<string>();
for (const domain of domains) {
if (
!isDomainCoveredByWildcard(
domain,
this.lastLocalCertificateState
)
) {
domainsToFetch.add(domain);
} else {
logger.debug(
`Domain ${domain} is covered by existing wildcard certificate, skipping fetch`
);
} else {
}
}
if (domainsToFetch.size > 0) {
// Get valid certificates for domains not covered by wildcards
validCertificates =
await getValidCertificatesForDomains(
domainsToFetch
);
this.lastCertificateFetch = new Date();
this.lastKnownDomains = new Set(domains);
logger.info(
`Fetched ${validCertificates.length} certificates from remote (${domains.size - domainsToFetch.size} domains covered by wildcards)`
);
// Download and decrypt new certificates
await this.processValidCertificates(validCertificates);
} else {
logger.info(
"All domains are covered by existing wildcard certificates, no fetch needed"
);
this.lastCertificateFetch = new Date();
this.lastKnownDomains = new Set(domains);
}
this.lastCertificateFetch = new Date();
this.lastKnownDomains = new Set(domains);
logger.info(
`Fetched ${validCertificates.length} certificates from remote (${domains.size - domainsToFetch.size} domains covered by wildcards)`
);
// Download and decrypt new certificates
await this.processValidCertificates(validCertificates);
// Always ensure all existing certificates (including wildcards) are in the config
await this.updateDynamicConfigFromLocalCerts(domains);
} else {
logger.info(
"All domains are covered by existing wildcard certificates, no fetch needed"
);
this.lastCertificateFetch = new Date();
this.lastKnownDomains = new Set(domains);
const timeSinceLastFetch = this.lastCertificateFetch
? Math.round(
(Date.now() -
this.lastCertificateFetch.getTime()) /
(1000 * 60)
)
: 0;
// logger.debug(
// `Skipping certificate fetch - no changes detected and within 24-hour window (last fetch: ${timeSinceLastFetch} minutes ago)`
// );
// Still need to ensure config is up to date with existing certificates
await this.updateDynamicConfigFromLocalCerts(domains);
}
// Always ensure all existing certificates (including wildcards) are in the config
await this.updateDynamicConfigFromLocalCerts(domains);
} else {
const timeSinceLastFetch = this.lastCertificateFetch
? Math.round(
(Date.now() - this.lastCertificateFetch.getTime()) /
(1000 * 60)
)
: 0;
// Clean up certificates for domains no longer in use
await this.cleanupUnusedCertificates(domains);
// logger.debug(
// `Skipping certificate fetch - no changes detected and within 24-hour window (last fetch: ${timeSinceLastFetch} minutes ago)`
// );
// Still need to ensure config is up to date with existing certificates
await this.updateDynamicConfigFromLocalCerts(domains);
// wait 1 second for traefik to pick up the new certificates
await new Promise((resolve) => setTimeout(resolve, 500));
}
// Clean up certificates for domains no longer in use
await this.cleanupUnusedCertificates(domains);
// wait 1 second for traefik to pick up the new certificates
await new Promise((resolve) => setTimeout(resolve, 500));
// Write traefik config as YAML to a second dynamic config file if changed
await this.writeTraefikDynamicConfig(traefikConfig);
@@ -448,32 +443,15 @@ export class TraefikConfigManager {
} | null> {
let traefikConfig;
try {
if (config.isManagedMode()) {
const resp = await axios.get(
`${config.getRawConfig().managed?.endpoint}/api/v1/hybrid/traefik-config`,
await tokenManager.getAuthHeader()
);
if (resp.status !== 200) {
logger.error(
`Failed to fetch traefik config: ${resp.status} ${resp.statusText}`,
{ responseData: resp.data }
);
return null;
}
traefikConfig = resp.data.data;
} else {
const currentExitNode = await getCurrentExitNodeId();
// logger.debug(`Fetching traefik config for exit node: ${currentExitNode}`);
traefikConfig = await getTraefikConfig(
// this is called by the local exit node to get its own config
currentExitNode,
config.getRawConfig().traefik.site_types,
build == "oss", // filter out the namespace domains in open source
build != "oss" // generate the login pages on the cloud and hybrid
);
}
const currentExitNode = await getCurrentExitNodeId();
// logger.debug(`Fetching traefik config for exit node: ${currentExitNode}`);
traefikConfig = await getTraefikConfig(
// this is called by the local exit node to get its own config
currentExitNode,
config.getRawConfig().traefik.site_types,
build == "oss", // filter out the namespace domains in open source
build != "oss" // generate the login pages on the cloud and hybrid
);
const domains = new Set<string>();
@@ -718,7 +696,12 @@ export class TraefikConfigManager {
for (const cert of validCertificates) {
try {
if (!cert.certFile || !cert.keyFile) {
if (
!cert.certFile ||
!cert.keyFile ||
cert.certFile.length === 0 ||
cert.keyFile.length === 0
) {
logger.warn(
`Certificate for domain ${cert.domain} is missing cert or key file`
);
@@ -842,7 +825,9 @@ export class TraefikConfigManager {
const lastUpdateStr = fs
.readFileSync(lastUpdatePath, "utf8")
.trim();
lastUpdateTime = Math.floor(new Date(lastUpdateStr).getTime() / 1000);
lastUpdateTime = Math.floor(
new Date(lastUpdateStr).getTime() / 1000
);
} catch {
lastUpdateTime = null;
}

View File

@@ -105,7 +105,12 @@ export async function getTraefikConfig(
const priority = row.priority ?? 100;
// Create a unique key combining resourceId, path config, and rewrite config
const pathKey = [targetPath, pathMatchType, rewritePath, rewritePathType]
const pathKey = [
targetPath,
pathMatchType,
rewritePath,
rewritePathType
]
.filter(Boolean)
.join("-");
const mapKey = [resourceId, pathKey].filter(Boolean).join("-");
@@ -120,13 +125,15 @@ export async function getTraefikConfig(
);
if (!validation.isValid) {
logger.error(`Invalid path rewrite configuration for resource ${resourceId}: ${validation.error}`);
logger.error(
`Invalid path rewrite configuration for resource ${resourceId}: ${validation.error}`
);
return;
}
resourcesMap.set(key, {
resourceId: row.resourceId,
name: resourceName,
name: resourceName,
fullDomain: row.fullDomain,
ssl: row.ssl,
http: row.http,
@@ -239,21 +246,18 @@ export async function getTraefikConfig(
preferWildcardCert = configDomain.prefer_wildcard_cert;
}
let tls = {};
if (build == "oss") {
tls = {
certResolver: certResolver,
...(preferWildcardCert
? {
domains: [
{
main: wildCard
}
]
}
: {})
};
}
const tls = {
certResolver: certResolver,
...(preferWildcardCert
? {
domains: [
{
main: wildCard
}
]
}
: {})
};
const additionalMiddlewares =
config.getRawConfig().traefik.additional_middlewares || [];
@@ -264,11 +268,12 @@ export async function getTraefikConfig(
];
// Handle path rewriting middleware
if (resource.rewritePath &&
if (
resource.rewritePath &&
resource.path &&
resource.pathMatchType &&
resource.rewritePathType) {
resource.rewritePathType
) {
// Create a unique middleware name
const rewriteMiddlewareName = `rewrite-r${resource.resourceId}-${key}`;
@@ -287,7 +292,10 @@ export async function getTraefikConfig(
}
// the middleware to the config
Object.assign(config_output.http.middlewares, rewriteResult.middlewares);
Object.assign(
config_output.http.middlewares,
rewriteResult.middlewares
);
// middlewares to the router middleware chain
if (rewriteResult.chain) {
@@ -298,9 +306,13 @@ export async function getTraefikConfig(
routerMiddlewares.push(rewriteMiddlewareName);
}
logger.debug(`Created path rewrite middleware ${rewriteMiddlewareName}: ${resource.pathMatchType}(${resource.path}) -> ${resource.rewritePathType}(${resource.rewritePath})`);
logger.debug(
`Created path rewrite middleware ${rewriteMiddlewareName}: ${resource.pathMatchType}(${resource.path}) -> ${resource.rewritePathType}(${resource.rewritePath})`
);
} catch (error) {
logger.error(`Failed to create path rewrite middleware for resource ${resource.resourceId}: ${error}`);
logger.error(
`Failed to create path rewrite middleware for resource ${resource.resourceId}: ${error}`
);
}
}
@@ -316,7 +328,9 @@ export async function getTraefikConfig(
value: string;
}[];
} catch (e) {
logger.warn(`Failed to parse headers for resource ${resource.resourceId}: ${e}`);
logger.warn(
`Failed to parse headers for resource ${resource.resourceId}: ${e}`
);
}
headersArr.forEach((header) => {
@@ -482,14 +496,14 @@ export async function getTraefikConfig(
})(),
...(resource.stickySession
? {
sticky: {
cookie: {
name: "p_sticky", // TODO: make this configurable via config.yml like other cookies
secure: resource.ssl,
httpOnly: true
}
}
}
sticky: {
cookie: {
name: "p_sticky", // TODO: make this configurable via config.yml like other cookies
secure: resource.ssl,
httpOnly: true
}
}
}
: {})
}
};
@@ -590,13 +604,13 @@ export async function getTraefikConfig(
})(),
...(resource.stickySession
? {
sticky: {
ipStrategy: {
depth: 0,
sourcePort: true
}
}
}
sticky: {
ipStrategy: {
depth: 0,
sourcePort: true
}
}
}
: {})
}
};