Misskey® Reactions Buffering Technology™ (#14579)
* wip * wip * Update ReactionsBufferingService.ts * Update ReactionsBufferingService.ts * wip * wip * wip * Update ReactionsBufferingService.ts * wip * wip * wip * Update NoteEntityService.ts * wip * wip * wip * wip * Update CHANGELOG.md
This commit is contained in:
162
packages/backend/src/core/ReactionsBufferingService.ts
Normal file
162
packages/backend/src/core/ReactionsBufferingService.ts
Normal file
@@ -0,0 +1,162 @@
|
||||
/*
|
||||
* SPDX-FileCopyrightText: syuilo and misskey-project
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
import { Inject, Injectable } from '@nestjs/common';
|
||||
import * as Redis from 'ioredis';
|
||||
import { DI } from '@/di-symbols.js';
|
||||
import type { MiNote } from '@/models/Note.js';
|
||||
import { bindThis } from '@/decorators.js';
|
||||
import type { MiUser, NotesRepository } from '@/models/_.js';
|
||||
import type { Config } from '@/config.js';
|
||||
import { PER_NOTE_REACTION_USER_PAIR_CACHE_MAX } from '@/const.js';
|
||||
|
||||
const REDIS_DELTA_PREFIX = 'reactionsBufferDeltas';
|
||||
const REDIS_PAIR_PREFIX = 'reactionsBufferPairs';
|
||||
|
||||
@Injectable()
|
||||
export class ReactionsBufferingService {
|
||||
constructor(
|
||||
@Inject(DI.config)
|
||||
private config: Config,
|
||||
|
||||
@Inject(DI.redisForReactions)
|
||||
private redisForReactions: Redis.Redis, // TODO: 専用のRedisインスタンスにする
|
||||
|
||||
@Inject(DI.notesRepository)
|
||||
private notesRepository: NotesRepository,
|
||||
) {
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public async create(noteId: MiNote['id'], userId: MiUser['id'], reaction: string, currentPairs: string[]): Promise<void> {
|
||||
const pipeline = this.redisForReactions.pipeline();
|
||||
pipeline.hincrby(`${REDIS_DELTA_PREFIX}:${noteId}`, reaction, 1);
|
||||
for (let i = 0; i < currentPairs.length; i++) {
|
||||
pipeline.zadd(`${REDIS_PAIR_PREFIX}:${noteId}`, i, currentPairs[i]);
|
||||
}
|
||||
pipeline.zadd(`${REDIS_PAIR_PREFIX}:${noteId}`, Date.now(), `${userId}/${reaction}`);
|
||||
pipeline.zremrangebyrank(`${REDIS_PAIR_PREFIX}:${noteId}`, 0, -(PER_NOTE_REACTION_USER_PAIR_CACHE_MAX + 1));
|
||||
await pipeline.exec();
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public async delete(noteId: MiNote['id'], userId: MiUser['id'], reaction: string): Promise<void> {
|
||||
const pipeline = this.redisForReactions.pipeline();
|
||||
pipeline.hincrby(`${REDIS_DELTA_PREFIX}:${noteId}`, reaction, -1);
|
||||
pipeline.zrem(`${REDIS_PAIR_PREFIX}:${noteId}`, `${userId}/${reaction}`);
|
||||
// TODO: 「消した要素一覧」も持っておかないとcreateされた時に上書きされて復活する
|
||||
await pipeline.exec();
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public async get(noteId: MiNote['id']): Promise<{
|
||||
deltas: Record<string, number>;
|
||||
pairs: ([MiUser['id'], string])[];
|
||||
}> {
|
||||
const pipeline = this.redisForReactions.pipeline();
|
||||
pipeline.hgetall(`${REDIS_DELTA_PREFIX}:${noteId}`);
|
||||
pipeline.zrange(`${REDIS_PAIR_PREFIX}:${noteId}`, 0, -1);
|
||||
const results = await pipeline.exec();
|
||||
|
||||
const resultDeltas = results![0][1] as Record<string, string>;
|
||||
const resultPairs = results![1][1] as string[];
|
||||
|
||||
const deltas = {} as Record<string, number>;
|
||||
for (const [name, count] of Object.entries(resultDeltas)) {
|
||||
deltas[name] = parseInt(count);
|
||||
}
|
||||
|
||||
const pairs = resultPairs.map(x => x.split('/') as [MiUser['id'], string]);
|
||||
|
||||
return {
|
||||
deltas,
|
||||
pairs,
|
||||
};
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public async getMany(noteIds: MiNote['id'][]): Promise<Map<MiNote['id'], {
|
||||
deltas: Record<string, number>;
|
||||
pairs: ([MiUser['id'], string])[];
|
||||
}>> {
|
||||
const map = new Map<MiNote['id'], {
|
||||
deltas: Record<string, number>;
|
||||
pairs: ([MiUser['id'], string])[];
|
||||
}>();
|
||||
|
||||
const pipeline = this.redisForReactions.pipeline();
|
||||
for (const noteId of noteIds) {
|
||||
pipeline.hgetall(`${REDIS_DELTA_PREFIX}:${noteId}`);
|
||||
pipeline.zrange(`${REDIS_PAIR_PREFIX}:${noteId}`, 0, -1);
|
||||
}
|
||||
const results = await pipeline.exec();
|
||||
|
||||
const opsForEachNotes = 2;
|
||||
for (let i = 0; i < noteIds.length; i++) {
|
||||
const noteId = noteIds[i];
|
||||
const resultDeltas = results![i * opsForEachNotes][1] as Record<string, string>;
|
||||
const resultPairs = results![i * opsForEachNotes + 1][1] as string[];
|
||||
|
||||
const deltas = {} as Record<string, number>;
|
||||
for (const [name, count] of Object.entries(resultDeltas)) {
|
||||
deltas[name] = parseInt(count);
|
||||
}
|
||||
|
||||
const pairs = resultPairs.map(x => x.split('/') as [MiUser['id'], string]);
|
||||
|
||||
map.set(noteId, {
|
||||
deltas,
|
||||
pairs,
|
||||
});
|
||||
}
|
||||
|
||||
return map;
|
||||
}
|
||||
|
||||
// TODO: scanは重い可能性があるので、別途 bufferedNoteIds を直接Redis上に持っておいてもいいかもしれない
|
||||
@bindThis
|
||||
public async bake(): Promise<void> {
|
||||
const bufferedNoteIds = [];
|
||||
let cursor = '0';
|
||||
do {
|
||||
// https://github.com/redis/ioredis#transparent-key-prefixing
|
||||
const result = await this.redisForReactions.scan(
|
||||
cursor,
|
||||
'MATCH',
|
||||
`${this.config.redis.prefix}:${REDIS_DELTA_PREFIX}:*`,
|
||||
'COUNT',
|
||||
'1000');
|
||||
|
||||
cursor = result[0];
|
||||
bufferedNoteIds.push(...result[1].map(x => x.replace(`${this.config.redis.prefix}:${REDIS_DELTA_PREFIX}:`, '')));
|
||||
} while (cursor !== '0');
|
||||
|
||||
const bufferedMap = await this.getMany(bufferedNoteIds);
|
||||
|
||||
// clear
|
||||
const pipeline = this.redisForReactions.pipeline();
|
||||
for (const noteId of bufferedNoteIds) {
|
||||
pipeline.del(`${REDIS_DELTA_PREFIX}:${noteId}`);
|
||||
pipeline.del(`${REDIS_PAIR_PREFIX}:${noteId}`);
|
||||
}
|
||||
await pipeline.exec();
|
||||
|
||||
// TODO: SQL一個にまとめたい
|
||||
for (const [noteId, buffered] of bufferedMap) {
|
||||
const sql = Object.entries(buffered.deltas)
|
||||
.map(([reaction, count]) =>
|
||||
`jsonb_set("reactions", '{${reaction}}', (COALESCE("reactions"->>'${reaction}', '0')::int + ${count})::text::jsonb)`)
|
||||
.join(' || ');
|
||||
|
||||
this.notesRepository.createQueryBuilder().update()
|
||||
.set({
|
||||
reactions: () => sql,
|
||||
reactionAndUserPairCache: buffered.pairs.map(x => x.join('/')),
|
||||
})
|
||||
.where('id = :id', { id: noteId })
|
||||
.execute();
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user