Add retry to aquire

This commit is contained in:
Owen
2026-01-11 10:39:28 -08:00
parent 192702daf9
commit 78b00a18cc

View File

@@ -24,7 +24,9 @@ export class LockManager {
*/ */
async acquireLock( async acquireLock(
lockKey: string, lockKey: string,
ttlMs: number = 30000 ttlMs: number = 30000,
maxRetries: number = 3,
retryDelayMs: number = 100
): Promise<boolean> { ): Promise<boolean> {
if (!redis || !redis.status || redis.status !== "ready") { if (!redis || !redis.status || redis.status !== "ready") {
return true; return true;
@@ -35,49 +37,67 @@ export class LockManager {
}:${Date.now()}`; }:${Date.now()}`;
const redisKey = `lock:${lockKey}`; const redisKey = `lock:${lockKey}`;
try { for (let attempt = 0; attempt < maxRetries; attempt++) {
// Use SET with NX (only set if not exists) and PX (expire in milliseconds) try {
// This is atomic and handles both setting and expiration // Use SET with NX (only set if not exists) and PX (expire in milliseconds)
const result = await redis.set( // This is atomic and handles both setting and expiration
redisKey, const result = await redis.set(
lockValue, redisKey,
"PX", lockValue,
ttlMs, "PX",
"NX" ttlMs,
); "NX"
if (result === "OK") {
logger.debug(
`Lock acquired: ${lockKey} by ${
config.getRawConfig().gerbil.exit_node_name
}`
); );
return true;
}
// Check if the existing lock is from this worker (reentrant behavior) if (result === "OK") {
const existingValue = await redis.get(redisKey); logger.debug(
if ( `Lock acquired: ${lockKey} by ${
existingValue && config.getRawConfig().gerbil.exit_node_name
existingValue.startsWith( }`
`${config.getRawConfig().gerbil.exit_node_name}:` );
) return true;
) { }
// Extend the lock TTL since it's the same worker
await redis.pexpire(redisKey, ttlMs);
logger.debug(
`Lock extended: ${lockKey} by ${
config.getRawConfig().gerbil.exit_node_name
}`
);
return true;
}
return false; // Check if the existing lock is from this worker (reentrant behavior)
} catch (error) { const existingValue = await redis.get(redisKey);
logger.error(`Failed to acquire lock ${lockKey}:`, error); if (
return false; existingValue &&
existingValue.startsWith(
`${config.getRawConfig().gerbil.exit_node_name}:`
)
) {
// Extend the lock TTL since it's the same worker
await redis.pexpire(redisKey, ttlMs);
logger.debug(
`Lock extended: ${lockKey} by ${
config.getRawConfig().gerbil.exit_node_name
}`
);
return true;
}
// If this isn't our last attempt, wait before retrying with exponential backoff
if (attempt < maxRetries - 1) {
const delay = retryDelayMs * Math.pow(2, attempt);
logger.debug(
`Lock ${lockKey} not available, retrying in ${delay}ms (attempt ${attempt + 1}/${maxRetries})`
);
await new Promise((resolve) => setTimeout(resolve, delay));
}
} catch (error) {
logger.error(`Failed to acquire lock ${lockKey} (attempt ${attempt + 1}/${maxRetries}):`, error);
// On error, still retry if we have attempts left
if (attempt < maxRetries - 1) {
const delay = retryDelayMs * Math.pow(2, attempt);
await new Promise((resolve) => setTimeout(resolve, delay));
}
}
} }
logger.debug(
`Failed to acquire lock ${lockKey} after ${maxRetries} attempts`
);
return false;
} }
/** /**