From 5b9386b18a8144205adc4618fd0b26324f190754 Mon Sep 17 00:00:00 2001 From: Owen Date: Thu, 20 Nov 2025 12:40:25 -0500 Subject: [PATCH] Add lock --- server/lib/lock.ts | 111 ++++++ server/private/lib/lock.ts | 328 ++++++++++++++++++ .../routers/newt/handleNewtRegisterMessage.ts | 49 +-- 3 files changed, 468 insertions(+), 20 deletions(-) create mode 100644 server/lib/lock.ts create mode 100644 server/private/lib/lock.ts diff --git a/server/lib/lock.ts b/server/lib/lock.ts new file mode 100644 index 00000000..7eea8908 --- /dev/null +++ b/server/lib/lock.ts @@ -0,0 +1,111 @@ +export class LockManager { + /** + * Acquire a distributed lock using Redis SET with NX and PX options + * @param lockKey - Unique identifier for the lock + * @param ttlMs - Time to live in milliseconds + * @returns Promise - true if lock acquired, false otherwise + */ + async acquireLock( + lockKey: string, + ttlMs: number = 30000 + ): Promise { + return true; + } + + /** + * Release a lock using Lua script to ensure atomicity + * @param lockKey - Unique identifier for the lock + */ + async releaseLock(lockKey: string): Promise {} + + /** + * Force release a lock regardless of owner (use with caution) + * @param lockKey - Unique identifier for the lock + */ + async forceReleaseLock(lockKey: string): Promise {} + + /** + * Check if a lock exists and get its info + * @param lockKey - Unique identifier for the lock + * @returns Promise<{exists: boolean, ownedByMe: boolean, ttl: number}> + */ + async getLockInfo(lockKey: string): Promise<{ + exists: boolean; + ownedByMe: boolean; + ttl: number; + owner?: string; + }> { + return { exists: true, ownedByMe: true, ttl: 0 }; + } + + /** + * Extend the TTL of an existing lock owned by this worker + * @param lockKey - Unique identifier for the lock + * @param ttlMs - New TTL in milliseconds + * @returns Promise - true if extended successfully + */ + async extendLock(lockKey: string, ttlMs: number): Promise { + return true; + } + + /** + * Attempt to acquire lock with retries and exponential backoff + * @param lockKey - Unique identifier for the lock + * @param ttlMs - Time to live in milliseconds + * @param maxRetries - Maximum number of retry attempts + * @param baseDelayMs - Base delay between retries in milliseconds + * @returns Promise - true if lock acquired + */ + async acquireLockWithRetry( + lockKey: string, + ttlMs: number = 30000, + maxRetries: number = 5, + baseDelayMs: number = 100 + ): Promise { + return true; + } + + /** + * Execute a function while holding a lock + * @param lockKey - Unique identifier for the lock + * @param fn - Function to execute while holding the lock + * @param ttlMs - Lock TTL in milliseconds + * @returns Promise - Result of the executed function + */ + async withLock( + lockKey: string, + fn: () => Promise, + ttlMs: number = 30000 + ): Promise { + const acquired = await this.acquireLock(lockKey, ttlMs); + + if (!acquired) { + throw new Error(`Failed to acquire lock: ${lockKey}`); + } + + try { + return await fn(); + } finally { + await this.releaseLock(lockKey); + } + } + + /** + * Clean up expired locks - Redis handles this automatically, but this method + * can be used to get statistics about locks + * @returns Promise<{activeLocksCount: number, locksOwnedByMe: number}> + */ + async getLockStatistics(): Promise<{ + activeLocksCount: number; + locksOwnedByMe: number; + }> { + return { activeLocksCount: 0, locksOwnedByMe: 0 }; + } + + /** + * Close the Redis connection + */ + async disconnect(): Promise {} +} + +export const lockManager = new LockManager(); diff --git a/server/private/lib/lock.ts b/server/private/lib/lock.ts new file mode 100644 index 00000000..9ecf63de --- /dev/null +++ b/server/private/lib/lock.ts @@ -0,0 +1,328 @@ +/* + * This file is part of a proprietary work. + * + * Copyright (c) 2025 Fossorial, Inc. + * All rights reserved. + * + * This file is licensed under the Fossorial Commercial License. + * You may not use this file except in compliance with the License. + * Unauthorized use, copying, modification, or distribution is strictly prohibited. + * + * This file is not licensed under the AGPLv3. + */ + +import { config } from "@server/lib/config"; +import logger from "@server/logger"; +import { redis } from "#private/lib/redis"; + +export class LockManager { + /** + * Acquire a distributed lock using Redis SET with NX and PX options + * @param lockKey - Unique identifier for the lock + * @param ttlMs - Time to live in milliseconds + * @returns Promise - true if lock acquired, false otherwise + */ + async acquireLock( + lockKey: string, + ttlMs: number = 30000 + ): Promise { + const lockValue = `${ + config.getRawConfig().gerbil.exit_node_name + }:${Date.now()}`; + const redisKey = `lock:${lockKey}`; + + try { + // Use SET with NX (only set if not exists) and PX (expire in milliseconds) + // This is atomic and handles both setting and expiration + const result = await redis.set( + redisKey, + lockValue, + "PX", + ttlMs, + "NX" + ); + + if (result === "OK") { + logger.debug( + `Lock acquired: ${lockKey} by ${ + config.getRawConfig().gerbil.exit_node_name + }` + ); + return true; + } + + // Check if the existing lock is from this worker (reentrant behavior) + const existingValue = await redis.get(redisKey); + if ( + existingValue && + existingValue.startsWith( + `${config.getRawConfig().gerbil.exit_node_name}:` + ) + ) { + // Extend the lock TTL since it's the same worker + await redis.pexpire(redisKey, ttlMs); + logger.debug( + `Lock extended: ${lockKey} by ${ + config.getRawConfig().gerbil.exit_node_name + }` + ); + return true; + } + + return false; + } catch (error) { + logger.error(`Failed to acquire lock ${lockKey}:`, error); + return false; + } + } + + /** + * Release a lock using Lua script to ensure atomicity + * @param lockKey - Unique identifier for the lock + */ + async releaseLock(lockKey: string): Promise { + const redisKey = `lock:${lockKey}`; + + // Lua script to ensure we only delete the lock if it belongs to this worker + const luaScript = ` + local key = KEYS[1] + local worker_prefix = ARGV[1] + local current_value = redis.call('GET', key) + + if current_value and string.find(current_value, worker_prefix, 1, true) == 1 then + return redis.call('DEL', key) + else + return 0 + end + `; + + try { + const result = (await redis.eval( + luaScript, + 1, + redisKey, + `${config.getRawConfig().gerbil.exit_node_name}:` + )) as number; + + if (result === 1) { + logger.debug( + `Lock released: ${lockKey} by ${ + config.getRawConfig().gerbil.exit_node_name + }` + ); + } else { + logger.warn( + `Lock not released - not owned by worker: ${lockKey} by ${ + config.getRawConfig().gerbil.exit_node_name + }` + ); + } + } catch (error) { + logger.error(`Failed to release lock ${lockKey}:`, error); + } + } + + /** + * Force release a lock regardless of owner (use with caution) + * @param lockKey - Unique identifier for the lock + */ + async forceReleaseLock(lockKey: string): Promise { + const redisKey = `lock:${lockKey}`; + + try { + const result = await redis.del(redisKey); + if (result === 1) { + logger.debug(`Lock force released: ${lockKey}`); + } + } catch (error) { + logger.error(`Failed to force release lock ${lockKey}:`, error); + } + } + + /** + * Check if a lock exists and get its info + * @param lockKey - Unique identifier for the lock + * @returns Promise<{exists: boolean, ownedByMe: boolean, ttl: number}> + */ + async getLockInfo(lockKey: string): Promise<{ + exists: boolean; + ownedByMe: boolean; + ttl: number; + owner?: string; + }> { + const redisKey = `lock:${lockKey}`; + + try { + const [value, ttl] = await Promise.all([ + redis.get(redisKey), + redis.pttl(redisKey) + ]); + + const exists = value !== null; + const ownedByMe = + exists && + value!.startsWith(`${config.getRawConfig().gerbil.exit_node_name}:`); + const owner = exists ? value!.split(":")[0] : undefined; + + return { + exists, + ownedByMe, + ttl: ttl > 0 ? ttl : 0, + owner + }; + } catch (error) { + logger.error(`Failed to get lock info ${lockKey}:`, error); + return { exists: false, ownedByMe: false, ttl: 0 }; + } + } + + /** + * Extend the TTL of an existing lock owned by this worker + * @param lockKey - Unique identifier for the lock + * @param ttlMs - New TTL in milliseconds + * @returns Promise - true if extended successfully + */ + async extendLock(lockKey: string, ttlMs: number): Promise { + const redisKey = `lock:${lockKey}`; + + // Lua script to extend TTL only if lock is owned by this worker + const luaScript = ` + local key = KEYS[1] + local worker_prefix = ARGV[1] + local ttl = tonumber(ARGV[2]) + local current_value = redis.call('GET', key) + + if current_value and string.find(current_value, worker_prefix, 1, true) == 1 then + return redis.call('PEXPIRE', key, ttl) + else + return 0 + end + `; + + try { + const result = (await redis.eval( + luaScript, + 1, + redisKey, + `${config.getRawConfig().gerbil.exit_node_name}:`, + ttlMs.toString() + )) as number; + + if (result === 1) { + logger.debug( + `Lock extended: ${lockKey} by ${ + config.getRawConfig().gerbil.exit_node_name + } for ${ttlMs}ms` + ); + return true; + } + return false; + } catch (error) { + logger.error(`Failed to extend lock ${lockKey}:`, error); + return false; + } + } + + /** + * Attempt to acquire lock with retries and exponential backoff + * @param lockKey - Unique identifier for the lock + * @param ttlMs - Time to live in milliseconds + * @param maxRetries - Maximum number of retry attempts + * @param baseDelayMs - Base delay between retries in milliseconds + * @returns Promise - true if lock acquired + */ + async acquireLockWithRetry( + lockKey: string, + ttlMs: number = 30000, + maxRetries: number = 5, + baseDelayMs: number = 100 + ): Promise { + for (let attempt = 0; attempt <= maxRetries; attempt++) { + const acquired = await this.acquireLock(lockKey, ttlMs); + + if (acquired) { + return true; + } + + if (attempt < maxRetries) { + // Exponential backoff with jitter + const delay = + baseDelayMs * Math.pow(2, attempt) + Math.random() * 100; + await new Promise((resolve) => setTimeout(resolve, delay)); + } + } + + logger.warn( + `Failed to acquire lock ${lockKey} after ${maxRetries + 1} attempts` + ); + return false; + } + + /** + * Execute a function while holding a lock + * @param lockKey - Unique identifier for the lock + * @param fn - Function to execute while holding the lock + * @param ttlMs - Lock TTL in milliseconds + * @returns Promise - Result of the executed function + */ + async withLock( + lockKey: string, + fn: () => Promise, + ttlMs: number = 30000 + ): Promise { + const acquired = await this.acquireLock(lockKey, ttlMs); + + if (!acquired) { + throw new Error(`Failed to acquire lock: ${lockKey}`); + } + + try { + return await fn(); + } finally { + await this.releaseLock(lockKey); + } + } + + /** + * Clean up expired locks - Redis handles this automatically, but this method + * can be used to get statistics about locks + * @returns Promise<{activeLocksCount: number, locksOwnedByMe: number}> + */ + async getLockStatistics(): Promise<{ + activeLocksCount: number; + locksOwnedByMe: number; + }> { + try { + const keys = await redis.keys("lock:*"); + let locksOwnedByMe = 0; + + if (keys.length > 0) { + const values = await redis.mget(...keys); + locksOwnedByMe = values.filter( + (value) => + value && + value.startsWith( + `${config.getRawConfig().gerbil.exit_node_name}:` + ) + ).length; + } + + return { + activeLocksCount: keys.length, + locksOwnedByMe + }; + } catch (error) { + logger.error("Failed to get lock statistics:", error); + return { activeLocksCount: 0, locksOwnedByMe: 0 }; + } + } + + /** + * Close the Redis connection + */ + async disconnect(): Promise { + await redis.quit(); + } +} + +export const lockManager = new LockManager(); diff --git a/server/routers/newt/handleNewtRegisterMessage.ts b/server/routers/newt/handleNewtRegisterMessage.ts index ff89bd3d..8a29d0ac 100644 --- a/server/routers/newt/handleNewtRegisterMessage.ts +++ b/server/routers/newt/handleNewtRegisterMessage.ts @@ -17,6 +17,7 @@ import { verifyExitNodeOrgAccess } from "#dynamic/lib/exitNodes"; import { fetchContainers } from "./dockerSocket"; +import { lockManager } from "#dynamic/lib/lock"; export type ExitNodePingResult = { exitNodeId: number; @@ -362,26 +363,34 @@ async function getUniqueSubnetForSite( exitNode: ExitNode, trx: Transaction | typeof db = db ): Promise { - const sitesQuery = await trx - .select({ - subnet: sites.subnet - }) - .from(sites) - .where(eq(sites.exitNodeId, exitNode.exitNodeId)); + const lockKey = `subnet-allocation:${exitNode.exitNodeId}`; + + return await lockManager.withLock( + lockKey, + async () => { + const sitesQuery = await trx + .select({ + subnet: sites.subnet + }) + .from(sites) + .where(eq(sites.exitNodeId, exitNode.exitNodeId)); - const blockSize = config.getRawConfig().gerbil.site_block_size; - const subnets = sitesQuery - .map((site) => site.subnet) - .filter( - (subnet) => - subnet && /^(\d{1,3}\.){3}\d{1,3}\/\d{1,2}$/.test(subnet) - ) - .filter((subnet) => subnet !== null); - subnets.push(exitNode.address.replace(/\/\d+$/, `/${blockSize}`)); - const newSubnet = findNextAvailableCidr( - subnets, - blockSize, - exitNode.address + const blockSize = config.getRawConfig().gerbil.site_block_size; + const subnets = sitesQuery + .map((site) => site.subnet) + .filter( + (subnet) => + subnet && /^(\d{1,3}\.){3}\d{1,3}\/\d{1,2}$/.test(subnet) + ) + .filter((subnet) => subnet !== null); + subnets.push(exitNode.address.replace(/\/\d+$/, `/${blockSize}`)); + const newSubnet = findNextAvailableCidr( + subnets, + blockSize, + exitNode.address + ); + return newSubnet; + }, + 5000 // 5 second lock TTL - subnet allocation should be quick ); - return newSubnet; }