fix(backend): memory leak in memory caches (#14363)
* encapsulate `MemoryKVCache<T>` * remove infinity caches * encapsulate other caches * add missing awaits to internally synchronize caches * implement pull-through caching * tune cache lifetimes * optimize cache GC by stopping early * summarize changes in CHANGELOG.md * Fix timeout comments Co-authored-by: かっこかり <67428053+kakkokari-gtyih@users.noreply.github.com> * add comments about awaiting the redis write --------- Co-authored-by: かっこかり <67428053+kakkokari-gtyih@users.noreply.github.com>
This commit is contained in:
@@ -7,23 +7,23 @@ import * as Redis from 'ioredis';
|
||||
import { bindThis } from '@/decorators.js';
|
||||
|
||||
export class RedisKVCache<T> {
|
||||
private redisClient: Redis.Redis;
|
||||
private name: string;
|
||||
private lifetime: number;
|
||||
private memoryCache: MemoryKVCache<T>;
|
||||
private fetcher: (key: string) => Promise<T>;
|
||||
private toRedisConverter: (value: T) => string;
|
||||
private fromRedisConverter: (value: string) => T | undefined;
|
||||
private readonly lifetime: number;
|
||||
private readonly memoryCache: MemoryKVCache<T>;
|
||||
private readonly fetcher: (key: string) => Promise<T>;
|
||||
private readonly toRedisConverter: (value: T) => string;
|
||||
private readonly fromRedisConverter: (value: string) => T | undefined;
|
||||
|
||||
constructor(redisClient: RedisKVCache<T>['redisClient'], name: RedisKVCache<T>['name'], opts: {
|
||||
lifetime: RedisKVCache<T>['lifetime'];
|
||||
memoryCacheLifetime: number;
|
||||
fetcher: RedisKVCache<T>['fetcher'];
|
||||
toRedisConverter: RedisKVCache<T>['toRedisConverter'];
|
||||
fromRedisConverter: RedisKVCache<T>['fromRedisConverter'];
|
||||
}) {
|
||||
this.redisClient = redisClient;
|
||||
this.name = name;
|
||||
constructor(
|
||||
private redisClient: Redis.Redis,
|
||||
private name: string,
|
||||
opts: {
|
||||
lifetime: RedisKVCache<T>['lifetime'];
|
||||
memoryCacheLifetime: number;
|
||||
fetcher: RedisKVCache<T>['fetcher'];
|
||||
toRedisConverter: RedisKVCache<T>['toRedisConverter'];
|
||||
fromRedisConverter: RedisKVCache<T>['fromRedisConverter'];
|
||||
},
|
||||
) {
|
||||
this.lifetime = opts.lifetime;
|
||||
this.memoryCache = new MemoryKVCache(opts.memoryCacheLifetime);
|
||||
this.fetcher = opts.fetcher;
|
||||
@@ -55,7 +55,13 @@ export class RedisKVCache<T> {
|
||||
|
||||
const cached = await this.redisClient.get(`kvcache:${this.name}:${key}`);
|
||||
if (cached == null) return undefined;
|
||||
return this.fromRedisConverter(cached);
|
||||
|
||||
const value = this.fromRedisConverter(cached);
|
||||
if (value !== undefined) {
|
||||
this.memoryCache.set(key, value);
|
||||
}
|
||||
|
||||
return value;
|
||||
}
|
||||
|
||||
@bindThis
|
||||
@@ -66,6 +72,10 @@ export class RedisKVCache<T> {
|
||||
|
||||
/**
|
||||
* キャッシュがあればそれを返し、無ければfetcherを呼び出して結果をキャッシュ&返します
|
||||
* This awaits the call to Redis to ensure that the write succeeded, which is important for a few reasons:
|
||||
* * Other code uses this to synchronize changes between worker processes. A failed write can internally de-sync the cluster.
|
||||
* * Without an `await`, consecutive calls could race. An unlucky race could result in the older write overwriting the newer value.
|
||||
* * Not awaiting here makes the entire cache non-consistent. The prevents many possible uses.
|
||||
*/
|
||||
@bindThis
|
||||
public async fetch(key: string): Promise<T> {
|
||||
@@ -77,14 +87,14 @@ export class RedisKVCache<T> {
|
||||
|
||||
// Cache MISS
|
||||
const value = await this.fetcher(key);
|
||||
this.set(key, value);
|
||||
await this.set(key, value);
|
||||
return value;
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public async refresh(key: string) {
|
||||
const value = await this.fetcher(key);
|
||||
this.set(key, value);
|
||||
await this.set(key, value);
|
||||
|
||||
// TODO: イベント発行して他プロセスのメモリキャッシュも更新できるようにする
|
||||
}
|
||||
@@ -101,23 +111,23 @@ export class RedisKVCache<T> {
|
||||
}
|
||||
|
||||
export class RedisSingleCache<T> {
|
||||
private redisClient: Redis.Redis;
|
||||
private name: string;
|
||||
private lifetime: number;
|
||||
private memoryCache: MemorySingleCache<T>;
|
||||
private fetcher: () => Promise<T>;
|
||||
private toRedisConverter: (value: T) => string;
|
||||
private fromRedisConverter: (value: string) => T | undefined;
|
||||
private readonly lifetime: number;
|
||||
private readonly memoryCache: MemorySingleCache<T>;
|
||||
private readonly fetcher: () => Promise<T>;
|
||||
private readonly toRedisConverter: (value: T) => string;
|
||||
private readonly fromRedisConverter: (value: string) => T | undefined;
|
||||
|
||||
constructor(redisClient: RedisSingleCache<T>['redisClient'], name: RedisSingleCache<T>['name'], opts: {
|
||||
lifetime: RedisSingleCache<T>['lifetime'];
|
||||
memoryCacheLifetime: number;
|
||||
fetcher: RedisSingleCache<T>['fetcher'];
|
||||
toRedisConverter: RedisSingleCache<T>['toRedisConverter'];
|
||||
fromRedisConverter: RedisSingleCache<T>['fromRedisConverter'];
|
||||
}) {
|
||||
this.redisClient = redisClient;
|
||||
this.name = name;
|
||||
constructor(
|
||||
private redisClient: Redis.Redis,
|
||||
private name: string,
|
||||
opts: {
|
||||
lifetime: number;
|
||||
memoryCacheLifetime: number;
|
||||
fetcher: RedisSingleCache<T>['fetcher'];
|
||||
toRedisConverter: RedisSingleCache<T>['toRedisConverter'];
|
||||
fromRedisConverter: RedisSingleCache<T>['fromRedisConverter'];
|
||||
},
|
||||
) {
|
||||
this.lifetime = opts.lifetime;
|
||||
this.memoryCache = new MemorySingleCache(opts.memoryCacheLifetime);
|
||||
this.fetcher = opts.fetcher;
|
||||
@@ -149,7 +159,13 @@ export class RedisSingleCache<T> {
|
||||
|
||||
const cached = await this.redisClient.get(`singlecache:${this.name}`);
|
||||
if (cached == null) return undefined;
|
||||
return this.fromRedisConverter(cached);
|
||||
|
||||
const value = this.fromRedisConverter(cached);
|
||||
if (value !== undefined) {
|
||||
this.memoryCache.set(value);
|
||||
}
|
||||
|
||||
return value;
|
||||
}
|
||||
|
||||
@bindThis
|
||||
@@ -160,6 +176,10 @@ export class RedisSingleCache<T> {
|
||||
|
||||
/**
|
||||
* キャッシュがあればそれを返し、無ければfetcherを呼び出して結果をキャッシュ&返します
|
||||
* This awaits the call to Redis to ensure that the write succeeded, which is important for a few reasons:
|
||||
* * Other code uses this to synchronize changes between worker processes. A failed write can internally de-sync the cluster.
|
||||
* * Without an `await`, consecutive calls could race. An unlucky race could result in the older write overwriting the newer value.
|
||||
* * Not awaiting here makes the entire cache non-consistent. The prevents many possible uses.
|
||||
*/
|
||||
@bindThis
|
||||
public async fetch(): Promise<T> {
|
||||
@@ -171,14 +191,14 @@ export class RedisSingleCache<T> {
|
||||
|
||||
// Cache MISS
|
||||
const value = await this.fetcher();
|
||||
this.set(value);
|
||||
await this.set(value);
|
||||
return value;
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public async refresh() {
|
||||
const value = await this.fetcher();
|
||||
this.set(value);
|
||||
await this.set(value);
|
||||
|
||||
// TODO: イベント発行して他プロセスのメモリキャッシュも更新できるようにする
|
||||
}
|
||||
@@ -187,22 +207,12 @@ export class RedisSingleCache<T> {
|
||||
// TODO: メモリ節約のためあまり参照されないキーを定期的に削除できるようにする?
|
||||
|
||||
export class MemoryKVCache<T> {
|
||||
/**
|
||||
* データを持つマップ
|
||||
* @deprecated これを直接操作するべきではない
|
||||
*/
|
||||
public cache: Map<string, { date: number; value: T; }>;
|
||||
private lifetime: number;
|
||||
private gcIntervalHandle: NodeJS.Timeout;
|
||||
private readonly cache = new Map<string, { date: number; value: T; }>();
|
||||
private readonly gcIntervalHandle = setInterval(() => this.gc(), 1000 * 60 * 3); // 3m
|
||||
|
||||
constructor(lifetime: MemoryKVCache<never>['lifetime']) {
|
||||
this.cache = new Map();
|
||||
this.lifetime = lifetime;
|
||||
|
||||
this.gcIntervalHandle = setInterval(() => {
|
||||
this.gc();
|
||||
}, 1000 * 60 * 3);
|
||||
}
|
||||
constructor(
|
||||
private readonly lifetime: number,
|
||||
) {}
|
||||
|
||||
@bindThis
|
||||
/**
|
||||
@@ -287,10 +297,14 @@ export class MemoryKVCache<T> {
|
||||
@bindThis
|
||||
public gc(): void {
|
||||
const now = Date.now();
|
||||
|
||||
for (const [key, { date }] of this.cache.entries()) {
|
||||
if ((now - date) > this.lifetime) {
|
||||
this.cache.delete(key);
|
||||
}
|
||||
// The map is ordered from oldest to youngest.
|
||||
// We can stop once we find an entry that's still active, because all following entries must *also* be active.
|
||||
const age = now - date;
|
||||
if (age < this.lifetime) break;
|
||||
|
||||
this.cache.delete(key);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -298,16 +312,19 @@ export class MemoryKVCache<T> {
|
||||
public dispose(): void {
|
||||
clearInterval(this.gcIntervalHandle);
|
||||
}
|
||||
|
||||
public get entries() {
|
||||
return this.cache.entries();
|
||||
}
|
||||
}
|
||||
|
||||
export class MemorySingleCache<T> {
|
||||
private cachedAt: number | null = null;
|
||||
private value: T | undefined;
|
||||
private lifetime: number;
|
||||
|
||||
constructor(lifetime: MemorySingleCache<never>['lifetime']) {
|
||||
this.lifetime = lifetime;
|
||||
}
|
||||
constructor(
|
||||
private lifetime: number,
|
||||
) {}
|
||||
|
||||
@bindThis
|
||||
public set(value: T): void {
|
||||
|
Reference in New Issue
Block a user