perf(backend): 通知をRedisに保存するように

Resolve #10168
This commit is contained in:
syuilo
2023-04-04 14:06:57 +09:00
parent 38d0b62167
commit 30d6992684
29 changed files with 185 additions and 613 deletions

View File

@@ -169,10 +169,6 @@ export class NoteReadService implements OnApplicationShutdown {
this.globalEventService.publishMainStream(userId, 'readAllChannels');
}
});
this.notificationService.readNotificationByQuery(userId, {
noteId: In([...readMentions.map(n => n.id), ...readSpecifiedNotes.map(n => n.id)]),
});
}
}

View File

@@ -1,8 +1,9 @@
import { setTimeout } from 'node:timers/promises';
import Redis from 'ioredis';
import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
import { In } from 'typeorm';
import { DI } from '@/di-symbols.js';
import type { MutingsRepository, NotificationsRepository, UserProfilesRepository, UsersRepository } from '@/models/index.js';
import type { MutingsRepository, UserProfilesRepository, UsersRepository } from '@/models/index.js';
import type { User } from '@/models/entities/User.js';
import type { Notification } from '@/models/entities/Notification.js';
import { UserEntityService } from '@/core/entities/UserEntityService.js';
@@ -17,15 +18,15 @@ export class NotificationService implements OnApplicationShutdown {
#shutdownController = new AbortController();
constructor(
@Inject(DI.redis)
private redisClient: Redis.Redis,
@Inject(DI.usersRepository)
private usersRepository: UsersRepository,
@Inject(DI.userProfilesRepository)
private userProfilesRepository: UserProfilesRepository,
@Inject(DI.notificationsRepository)
private notificationsRepository: NotificationsRepository,
@Inject(DI.mutingsRepository)
private mutingsRepository: MutingsRepository,
@@ -38,50 +39,31 @@ export class NotificationService implements OnApplicationShutdown {
}
@bindThis
public async readNotification(
public async readAllNotification(
userId: User['id'],
notificationIds: Notification['id'][],
) {
if (notificationIds.length === 0) return;
const latestReadNotificationId = await this.redisClient.get(`latestReadNotification:${userId}`);
const latestNotificationIdsRes = await this.redisClient.xrevrange(
`notificationTimeline:${userId}`,
'+',
'-',
'COUNT', 1);
console.log('latestNotificationIdsRes', latestNotificationIdsRes);
const latestNotificationId = latestNotificationIdsRes[0]?.[0];
// Update documents
const result = await this.notificationsRepository.update({
notifieeId: userId,
id: In(notificationIds),
isRead: false,
}, {
isRead: true,
});
if (latestNotificationId == null) return;
if (result.affected === 0) return;
this.redisClient.set(`latestReadNotification:${userId}`, latestNotificationId);
if (!await this.userEntityService.getHasUnreadNotification(userId)) return this.postReadAllNotifications(userId);
else return this.postReadNotifications(userId, notificationIds);
}
@bindThis
public async readNotificationByQuery(
userId: User['id'],
query: Record<string, any>,
) {
const notificationIds = await this.notificationsRepository.findBy({
...query,
notifieeId: userId,
isRead: false,
}).then(notifications => notifications.map(notification => notification.id));
return this.readNotification(userId, notificationIds);
if (latestReadNotificationId == null || (latestReadNotificationId < latestNotificationId)) {
return this.postReadAllNotifications(userId);
}
}
@bindThis
private postReadAllNotifications(userId: User['id']) {
this.globalEventService.publishMainStream(userId, 'readAllNotifications');
return this.pushNotificationService.pushNotification(userId, 'readAllNotifications', undefined);
}
@bindThis
private postReadNotifications(userId: User['id'], notificationIds: Notification['id'][]) {
return this.pushNotificationService.pushNotification(userId, 'readNotifications', { notificationIds });
}
@bindThis
@@ -90,47 +72,48 @@ export class NotificationService implements OnApplicationShutdown {
type: Notification['type'],
data: Partial<Notification>,
): Promise<Notification | null> {
if (data.notifierId && (notifieeId === data.notifierId)) {
return null;
// TODO: Cache
const profile = await this.userProfilesRepository.findOneBy({ userId: notifieeId });
const isMuted = profile?.mutingNotificationTypes.includes(type);
if (isMuted) return null;
if (data.notifierId) {
if (notifieeId === data.notifierId) {
return null;
}
// TODO: cache
const mutings = await this.mutingsRepository.findOneBy({
muterId: notifieeId,
muteeId: data.notifierId,
});
if (mutings) {
return null;
}
}
const profile = await this.userProfilesRepository.findOneBy({ userId: notifieeId });
// TODO: Cache
const isMuted = profile?.mutingNotificationTypes.includes(type);
// Create notification
const notification = await this.notificationsRepository.insert({
const notification = {
id: this.idService.genId(),
createdAt: new Date(),
notifieeId: notifieeId,
type: type,
// 相手がこの通知をミュートしているようなら、既読を予めつけておく
isRead: isMuted,
...data,
} as Partial<Notification>)
.then(x => this.notificationsRepository.findOneByOrFail(x.identifiers[0]));
} as Notification;
const packed = await this.notificationEntityService.pack(notification, {});
this.redisClient.xadd(
`notificationTimeline:${notifieeId}`,
'MAXLEN', '~', '300',
`${this.idService.parse(notification.id).date.getTime()}-*`,
'data', JSON.stringify(notification));
const packed = await this.notificationEntityService.pack(notification, notifieeId, {});
// Publish notification event
this.globalEventService.publishMainStream(notifieeId, 'notification', packed);
// 2秒経っても(今回作成した)通知が既読にならなかったら「未読の通知がありますよ」イベントを発行する
setTimeout(2000, 'unread note', { signal: this.#shutdownController.signal }).then(async () => {
const fresh = await this.notificationsRepository.findOneBy({ id: notification.id });
if (fresh == null) return; // 既に削除されているかもしれない
if (fresh.isRead) return;
//#region ただしミュートしているユーザーからの通知なら無視
// TODO: Cache
const mutings = await this.mutingsRepository.findBy({
muterId: notifieeId,
});
if (data.notifierId && mutings.map(m => m.muteeId).includes(data.notifierId)) {
return;
}
//#endregion
setTimeout(2000, 'unread notification', { signal: this.#shutdownController.signal }).then(async () => {
const latestReadNotificationId = await this.redisClient.get(`latestReadNotification:${notifieeId}`);
if (latestReadNotificationId && (latestReadNotificationId >= notification.id)) return;
this.globalEventService.publishMainStream(notifieeId, 'unreadNotification', packed);
this.pushNotificationService.pushNotification(notifieeId, 'notification', packed);

View File

@@ -15,10 +15,6 @@ type PushNotificationsTypes = {
antenna: { id: string, name: string };
note: Packed<'Note'>;
};
'readNotifications': { notificationIds: string[] };
'readAllNotifications': undefined;
'readAntenna': { antennaId: string };
'readAllAntennas': undefined;
};
// Reduce length because push message servers have character limits
@@ -72,14 +68,6 @@ export class PushNotificationService {
});
for (const subscription of subscriptions) {
// Continue if sendReadMessage is false
if ([
'readNotifications',
'readAllNotifications',
'readAntenna',
'readAllAntennas',
].includes(type) && !subscription.sendReadMessage) continue;
const pushSubscription = {
endpoint: subscription.endpoint,
keys: {

View File

@@ -1,7 +1,8 @@
import { Inject, Injectable } from '@nestjs/common';
import { ModuleRef } from '@nestjs/core';
import { In } from 'typeorm';
import { DI } from '@/di-symbols.js';
import type { AccessTokensRepository, NoteReactionsRepository, NotificationsRepository, User } from '@/models/index.js';
import type { AccessTokensRepository, NoteReactionsRepository, NotesRepository, User, UsersRepository } from '@/models/index.js';
import { awaitAll } from '@/misc/prelude/await-all.js';
import type { Notification } from '@/models/entities/Notification.js';
import type { Note } from '@/models/entities/Note.js';
@@ -25,8 +26,11 @@ export class NotificationEntityService implements OnModuleInit {
constructor(
private moduleRef: ModuleRef,
@Inject(DI.notificationsRepository)
private notificationsRepository: NotificationsRepository,
@Inject(DI.notesRepository)
private notesRepository: NotesRepository,
@Inject(DI.usersRepository)
private usersRepository: UsersRepository,
@Inject(DI.noteReactionsRepository)
private noteReactionsRepository: NoteReactionsRepository,
@@ -48,30 +52,39 @@ export class NotificationEntityService implements OnModuleInit {
@bindThis
public async pack(
src: Notification['id'] | Notification,
src: Notification,
meId: User['id'],
options: {
_hint_?: {
packedNotes: Map<Note['id'], Packed<'Note'>>;
};
},
hint?: {
packedNotes: Map<Note['id'], Packed<'Note'>>;
packedUsers: Map<User['id'], Packed<'User'>>;
},
): Promise<Packed<'Notification'>> {
const notification = typeof src === 'object' ? src : await this.notificationsRepository.findOneByOrFail({ id: src });
const notification = src;
const token = notification.appAccessTokenId ? await this.accessTokensRepository.findOneByOrFail({ id: notification.appAccessTokenId }) : null;
const noteIfNeed = NOTE_REQUIRED_NOTIFICATION_TYPES.has(notification.type) && notification.noteId != null ? (
options._hint_?.packedNotes != null
? options._hint_.packedNotes.get(notification.noteId)
: this.noteEntityService.pack(notification.note ?? notification.noteId!, { id: notification.notifieeId }, {
hint?.packedNotes != null
? hint.packedNotes.get(notification.noteId)
: this.noteEntityService.pack(notification.noteId!, { id: meId }, {
detail: true,
})
) : undefined;
const userIfNeed = notification.notifierId != null ? (
hint?.packedUsers != null
? hint.packedUsers.get(notification.notifierId)
: this.userEntityService.pack(notification.notifierId!, { id: meId }, {
detail: false,
})
) : undefined;
return await awaitAll({
id: notification.id,
createdAt: notification.createdAt.toISOString(),
createdAt: new Date(notification.createdAt).toISOString(),
type: notification.type,
isRead: notification.isRead,
userId: notification.notifierId,
user: notification.notifierId ? this.userEntityService.pack(notification.notifier ?? notification.notifierId) : null,
...(userIfNeed != null ? { user: userIfNeed } : {}),
...(noteIfNeed != null ? { note: noteIfNeed } : {}),
...(notification.type === 'reaction' ? {
reaction: notification.reaction,
@@ -87,33 +100,36 @@ export class NotificationEntityService implements OnModuleInit {
});
}
/**
* @param notifications you should join "note" property when fetch from DB, and all notifieeId should be same as meId
*/
@bindThis
public async packMany(
notifications: Notification[],
meId: User['id'],
) {
if (notifications.length === 0) return [];
for (const notification of notifications) {
if (meId !== notification.notifieeId) {
// because we call note packMany with meId, all notifieeId should be same as meId
throw new Error('TRY_TO_PACK_ANOTHER_USER_NOTIFICATION');
}
}
const notes = notifications.map(x => x.note).filter(isNotNull);
const noteIds = notifications.map(x => x.noteId).filter(isNotNull);
const notes = noteIds.length > 0 ? await this.notesRepository.find({
where: { id: In(noteIds) },
relations: ['user', 'user.avatar', 'user.banner', 'reply', 'reply.user', 'reply.user.avatar', 'reply.user.banner', 'renote', 'renote.user', 'renote.user.avatar', 'renote.user.banner'],
}) : [];
const packedNotesArray = await this.noteEntityService.packMany(notes, { id: meId }, {
detail: true,
});
const packedNotes = new Map(packedNotesArray.map(p => [p.id, p]));
return await Promise.all(notifications.map(x => this.pack(x, {
_hint_: {
packedNotes,
},
const userIds = notifications.map(x => x.notifierId).filter(isNotNull);
const users = userIds.length > 0 ? await this.usersRepository.find({
where: { id: In(userIds) },
relations: ['avatar', 'banner'],
}) : [];
const packedUsersArray = await this.userEntityService.packMany(users, { id: meId }, {
detail: false,
});
const packedUsers = new Map(packedUsersArray.map(p => [p.id, p]));
return await Promise.all(notifications.map(x => this.pack(x, meId, {}, {
packedNotes,
packedUsers,
})));
}
}

View File

@@ -1,5 +1,6 @@
import { Inject, Injectable } from '@nestjs/common';
import { In, Not } from 'typeorm';
import Redis from 'ioredis';
import Ajv from 'ajv';
import { ModuleRef } from '@nestjs/core';
import { DI } from '@/di-symbols.js';
@@ -12,7 +13,7 @@ import { KVCache } from '@/misc/cache.js';
import type { Instance } from '@/models/entities/Instance.js';
import type { LocalUser, RemoteUser, User } from '@/models/entities/User.js';
import { birthdaySchema, descriptionSchema, localUsernameSchema, locationSchema, nameSchema, passwordSchema } from '@/models/entities/User.js';
import type { UsersRepository, UserSecurityKeysRepository, FollowingsRepository, FollowRequestsRepository, BlockingsRepository, MutingsRepository, DriveFilesRepository, NoteUnreadsRepository, ChannelFollowingsRepository, NotificationsRepository, UserNotePiningsRepository, UserProfilesRepository, InstancesRepository, AnnouncementReadsRepository, AnnouncementsRepository, PagesRepository, UserProfile, RenoteMutingsRepository } from '@/models/index.js';
import type { UsersRepository, UserSecurityKeysRepository, FollowingsRepository, FollowRequestsRepository, BlockingsRepository, MutingsRepository, DriveFilesRepository, NoteUnreadsRepository, ChannelFollowingsRepository, UserNotePiningsRepository, UserProfilesRepository, InstancesRepository, AnnouncementReadsRepository, AnnouncementsRepository, PagesRepository, UserProfile, RenoteMutingsRepository } from '@/models/index.js';
import { bindThis } from '@/decorators.js';
import { RoleService } from '@/core/RoleService.js';
import type { OnModuleInit } from '@nestjs/common';
@@ -60,6 +61,9 @@ export class UserEntityService implements OnModuleInit {
@Inject(DI.config)
private config: Config,
@Inject(DI.redis)
private redisClient: Redis.Redis,
@Inject(DI.usersRepository)
private usersRepository: UsersRepository,
@@ -90,9 +94,6 @@ export class UserEntityService implements OnModuleInit {
@Inject(DI.channelFollowingsRepository)
private channelFollowingsRepository: ChannelFollowingsRepository,
@Inject(DI.notificationsRepository)
private notificationsRepository: NotificationsRepository,
@Inject(DI.userNotePiningsRepository)
private userNotePiningsRepository: UserNotePiningsRepository,
@@ -247,21 +248,17 @@ export class UserEntityService implements OnModuleInit {
@bindThis
public async getHasUnreadNotification(userId: User['id']): Promise<boolean> {
const mute = await this.mutingsRepository.findBy({
muterId: userId,
});
const mutedUserIds = mute.map(m => m.muteeId);
const latestReadNotificationId = await this.redisClient.get(`latestReadNotification:${userId}`);
const latestNotificationIdsRes = await this.redisClient.xrevrange(
`notificationTimeline:${userId}`,
'+',
'-',
'COUNT', 1);
console.log('latestNotificationIdsRes', latestNotificationIdsRes);
const latestNotificationId = latestNotificationIdsRes[0]?.[0];
const count = await this.notificationsRepository.count({
where: {
notifieeId: userId,
...(mutedUserIds.length > 0 ? { notifierId: Not(In(mutedUserIds)) } : {}),
isRead: false,
},
take: 1,
});
return count > 0;
return latestNotificationId != null && (latestReadNotificationId == null || latestReadNotificationId < latestNotificationId);
}
@bindThis