mirror of
https://github.com/fosrl/pangolin.git
synced 2026-03-03 17:26:38 +00:00
Pull up downstream changes
This commit is contained in:
@@ -3,30 +3,25 @@ import logger from "@server/logger";
|
||||
import config from "@server/lib/config";
|
||||
|
||||
class RedisManager {
|
||||
private static instance: RedisManager;
|
||||
public client: Redis | null = null;
|
||||
private subscriber: Redis | null = null;
|
||||
private publisher: Redis | null = null;
|
||||
private isEnabled: boolean = false;
|
||||
private isHealthy: boolean = true;
|
||||
private lastHealthCheck: number = 0;
|
||||
private healthCheckInterval: number = 30000; // 30 seconds
|
||||
private subscribers: Map<
|
||||
string,
|
||||
Set<(channel: string, message: string) => void>
|
||||
> = new Map();
|
||||
|
||||
private constructor() {
|
||||
constructor() {
|
||||
this.isEnabled = config.getRawConfig().flags?.enable_redis || false;
|
||||
if (this.isEnabled) {
|
||||
this.initializeClients();
|
||||
}
|
||||
}
|
||||
|
||||
public static getInstance(): RedisManager {
|
||||
if (!RedisManager.instance) {
|
||||
RedisManager.instance = new RedisManager();
|
||||
}
|
||||
return RedisManager.instance;
|
||||
}
|
||||
|
||||
private getRedisConfig(): RedisOptions {
|
||||
const redisConfig = config.getRawConfig().redis!;
|
||||
const opts: RedisOptions = {
|
||||
@@ -34,38 +29,78 @@ class RedisManager {
|
||||
port: redisConfig.port!,
|
||||
password: redisConfig.password,
|
||||
db: redisConfig.db,
|
||||
tls: {
|
||||
rejectUnauthorized:
|
||||
redisConfig.tls?.reject_unauthorized || false
|
||||
}
|
||||
// tls: {
|
||||
// rejectUnauthorized:
|
||||
// redisConfig.tls?.reject_unauthorized || false
|
||||
// }
|
||||
};
|
||||
return opts;
|
||||
}
|
||||
|
||||
// Add reconnection logic in initializeClients
|
||||
private initializeClients(): void {
|
||||
const config = this.getRedisConfig();
|
||||
|
||||
try {
|
||||
// Main client for general operations
|
||||
this.client = new Redis(config);
|
||||
this.client = new Redis({
|
||||
...config,
|
||||
enableReadyCheck: false,
|
||||
maxRetriesPerRequest: 3,
|
||||
keepAlive: 30000,
|
||||
connectTimeout: 10000, // 10 seconds
|
||||
commandTimeout: 5000, // 5 seconds
|
||||
});
|
||||
|
||||
// Dedicated publisher client
|
||||
this.publisher = new Redis(config);
|
||||
this.publisher = new Redis({
|
||||
...config,
|
||||
enableReadyCheck: false,
|
||||
maxRetriesPerRequest: 3,
|
||||
keepAlive: 30000,
|
||||
connectTimeout: 10000, // 10 seconds
|
||||
commandTimeout: 5000, // 5 seconds
|
||||
});
|
||||
|
||||
// Dedicated subscriber client
|
||||
this.subscriber = new Redis(config);
|
||||
this.subscriber = new Redis({
|
||||
...config,
|
||||
enableReadyCheck: false,
|
||||
maxRetriesPerRequest: 3,
|
||||
keepAlive: 30000,
|
||||
connectTimeout: 10000, // 10 seconds
|
||||
commandTimeout: 5000, // 5 seconds
|
||||
});
|
||||
|
||||
// Set up error handlers
|
||||
// Add reconnection handlers
|
||||
this.client.on("error", (err) => {
|
||||
logger.error("Redis client error:", err);
|
||||
this.isHealthy = false;
|
||||
});
|
||||
|
||||
this.client.on("reconnecting", () => {
|
||||
logger.info("Redis client reconnecting...");
|
||||
this.isHealthy = false;
|
||||
});
|
||||
|
||||
this.client.on("ready", () => {
|
||||
logger.info("Redis client ready");
|
||||
this.isHealthy = true;
|
||||
});
|
||||
|
||||
this.publisher.on("error", (err) => {
|
||||
logger.error("Redis publisher error:", err);
|
||||
this.isHealthy = false;
|
||||
});
|
||||
|
||||
this.publisher.on("ready", () => {
|
||||
logger.info("Redis publisher ready");
|
||||
});
|
||||
|
||||
this.subscriber.on("error", (err) => {
|
||||
logger.error("Redis subscriber error:", err);
|
||||
this.isHealthy = false;
|
||||
});
|
||||
|
||||
this.subscriber.on("ready", () => {
|
||||
logger.info("Redis subscriber ready");
|
||||
});
|
||||
|
||||
// Set up connection handlers
|
||||
@@ -102,18 +137,65 @@ class RedisManager {
|
||||
);
|
||||
|
||||
logger.info("Redis clients initialized successfully");
|
||||
|
||||
// Start periodic health monitoring
|
||||
this.startHealthMonitoring();
|
||||
} catch (error) {
|
||||
logger.error("Failed to initialize Redis clients:", error);
|
||||
this.isEnabled = false;
|
||||
}
|
||||
}
|
||||
|
||||
public isRedisEnabled(): boolean {
|
||||
return this.isEnabled && this.client !== null;
|
||||
private startHealthMonitoring(): void {
|
||||
if (!this.isEnabled) return;
|
||||
|
||||
// Check health every 30 seconds
|
||||
setInterval(async () => {
|
||||
try {
|
||||
await this.checkRedisHealth();
|
||||
} catch (error) {
|
||||
logger.error("Error during Redis health monitoring:", error);
|
||||
}
|
||||
}, this.healthCheckInterval);
|
||||
}
|
||||
|
||||
public getClient(): Redis | null {
|
||||
return this.client;
|
||||
public isRedisEnabled(): boolean {
|
||||
return this.isEnabled && this.client !== null && this.isHealthy;
|
||||
}
|
||||
|
||||
private async checkRedisHealth(): Promise<boolean> {
|
||||
const now = Date.now();
|
||||
|
||||
// Only check health every 30 seconds
|
||||
if (now - this.lastHealthCheck < this.healthCheckInterval) {
|
||||
return this.isHealthy;
|
||||
}
|
||||
|
||||
this.lastHealthCheck = now;
|
||||
|
||||
if (!this.client) {
|
||||
this.isHealthy = false;
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
await Promise.race([
|
||||
this.client.ping(),
|
||||
new Promise((_, reject) =>
|
||||
setTimeout(() => reject(new Error('Health check timeout')), 2000)
|
||||
)
|
||||
]);
|
||||
this.isHealthy = true;
|
||||
return true;
|
||||
} catch (error) {
|
||||
logger.error("Redis health check failed:", error);
|
||||
this.isHealthy = false;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public getClient(): Redis {
|
||||
return this.client!;
|
||||
}
|
||||
|
||||
public async set(
|
||||
@@ -247,11 +329,25 @@ class RedisManager {
|
||||
public async publish(channel: string, message: string): Promise<boolean> {
|
||||
if (!this.isRedisEnabled() || !this.publisher) return false;
|
||||
|
||||
// Quick health check before attempting to publish
|
||||
const isHealthy = await this.checkRedisHealth();
|
||||
if (!isHealthy) {
|
||||
logger.warn("Skipping Redis publish due to unhealthy connection");
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
await this.publisher.publish(channel, message);
|
||||
// Add timeout to prevent hanging
|
||||
await Promise.race([
|
||||
this.publisher.publish(channel, message),
|
||||
new Promise((_, reject) =>
|
||||
setTimeout(() => reject(new Error('Redis publish timeout')), 3000)
|
||||
)
|
||||
]);
|
||||
return true;
|
||||
} catch (error) {
|
||||
logger.error("Redis PUBLISH error:", error);
|
||||
this.isHealthy = false; // Mark as unhealthy on error
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -267,13 +363,19 @@ class RedisManager {
|
||||
if (!this.subscribers.has(channel)) {
|
||||
this.subscribers.set(channel, new Set());
|
||||
// Only subscribe to the channel if it's the first subscriber
|
||||
await this.subscriber.subscribe(channel);
|
||||
await Promise.race([
|
||||
this.subscriber.subscribe(channel),
|
||||
new Promise((_, reject) =>
|
||||
setTimeout(() => reject(new Error('Redis subscribe timeout')), 5000)
|
||||
)
|
||||
]);
|
||||
}
|
||||
|
||||
this.subscribers.get(channel)!.add(callback);
|
||||
return true;
|
||||
} catch (error) {
|
||||
logger.error("Redis SUBSCRIBE error:", error);
|
||||
this.isHealthy = false;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -330,5 +432,6 @@ class RedisManager {
|
||||
}
|
||||
}
|
||||
|
||||
export const redisManager = RedisManager.getInstance();
|
||||
export const redisManager = new RedisManager();
|
||||
export const redis = redisManager.getClient();
|
||||
export default redisManager;
|
||||
|
||||
Reference in New Issue
Block a user