feat: 通報を受けた際にメールまたはWebhookで通知を送出出来るようにする (#13758)

* feat: 通報を受けた際にメールまたはWebhookで通知を送出出来るようにする

* モデログに対応&エンドポイントを単一オブジェクトでのサポートに変更(API経由で大量に作るシチュエーションもないと思うので)

* fix spdx

* fix migration

* fix migration

* fix models

* add e2e webhook

* tweak

* fix modlog

* fix bugs

* add tests and fix bugs

* add tests and fix bugs

* add tests

* fix path

* regenerate locale

* 混入除去

* 混入除去

* add abuseReportResolved

* fix pnpm-lock.yaml

* add abuseReportResolved test

* fix bugs

* fix ui

* add tests

* fix CHANGELOG.md

* add tests

* add RoleService.getModeratorIds tests

* WebhookServiceをUserとSystemに分割

* fix CHANGELOG.md

* fix test

* insertOneを使う用に

* fix

* regenerate locales

* revert version

* separate webhook job queue

* fix

* 🎨

* Update QueueProcessorService.ts

---------

Co-authored-by: osamu <46447427+sam-osamu@users.noreply.github.com>
Co-authored-by: syuilo <4439005+syuilo@users.noreply.github.com>
This commit is contained in:
おさむのひと
2024-06-08 15:34:19 +09:00
committed by GitHub
parent e0cf5b2402
commit 61fae45390
79 changed files with 6527 additions and 369 deletions

View File

@@ -11,7 +11,8 @@ import { QueueProcessorService } from './QueueProcessorService.js';
import { DeliverProcessorService } from './processors/DeliverProcessorService.js';
import { EndedPollNotificationProcessorService } from './processors/EndedPollNotificationProcessorService.js';
import { InboxProcessorService } from './processors/InboxProcessorService.js';
import { WebhookDeliverProcessorService } from './processors/WebhookDeliverProcessorService.js';
import { UserWebhookDeliverProcessorService } from './processors/UserWebhookDeliverProcessorService.js';
import { SystemWebhookDeliverProcessorService } from './processors/SystemWebhookDeliverProcessorService.js';
import { CheckExpiredMutingsProcessorService } from './processors/CheckExpiredMutingsProcessorService.js';
import { CleanChartsProcessorService } from './processors/CleanChartsProcessorService.js';
import { CleanProcessorService } from './processors/CleanProcessorService.js';
@@ -71,7 +72,8 @@ import { RelationshipProcessorService } from './processors/RelationshipProcessor
DeleteFileProcessorService,
CleanRemoteFilesProcessorService,
RelationshipProcessorService,
WebhookDeliverProcessorService,
UserWebhookDeliverProcessorService,
SystemWebhookDeliverProcessorService,
EndedPollNotificationProcessorService,
DeliverProcessorService,
InboxProcessorService,

View File

@@ -10,7 +10,8 @@ import type { Config } from '@/config.js';
import { DI } from '@/di-symbols.js';
import type Logger from '@/logger.js';
import { bindThis } from '@/decorators.js';
import { WebhookDeliverProcessorService } from './processors/WebhookDeliverProcessorService.js';
import { UserWebhookDeliverProcessorService } from './processors/UserWebhookDeliverProcessorService.js';
import { SystemWebhookDeliverProcessorService } from './processors/SystemWebhookDeliverProcessorService.js';
import { EndedPollNotificationProcessorService } from './processors/EndedPollNotificationProcessorService.js';
import { DeliverProcessorService } from './processors/DeliverProcessorService.js';
import { InboxProcessorService } from './processors/InboxProcessorService.js';
@@ -76,7 +77,8 @@ export class QueueProcessorService implements OnApplicationShutdown {
private dbQueueWorker: Bull.Worker;
private deliverQueueWorker: Bull.Worker;
private inboxQueueWorker: Bull.Worker;
private webhookDeliverQueueWorker: Bull.Worker;
private userWebhookDeliverQueueWorker: Bull.Worker;
private systemWebhookDeliverQueueWorker: Bull.Worker;
private relationshipQueueWorker: Bull.Worker;
private objectStorageQueueWorker: Bull.Worker;
private endedPollNotificationQueueWorker: Bull.Worker;
@@ -86,7 +88,8 @@ export class QueueProcessorService implements OnApplicationShutdown {
private config: Config,
private queueLoggerService: QueueLoggerService,
private webhookDeliverProcessorService: WebhookDeliverProcessorService,
private userWebhookDeliverProcessorService: UserWebhookDeliverProcessorService,
private systemWebhookDeliverProcessorService: SystemWebhookDeliverProcessorService,
private endedPollNotificationProcessorService: EndedPollNotificationProcessorService,
private deliverProcessorService: DeliverProcessorService,
private inboxProcessorService: InboxProcessorService,
@@ -160,13 +163,13 @@ export class QueueProcessorService implements OnApplicationShutdown {
autorun: false,
});
const systemLogger = this.logger.createSubLogger('system');
const logger = this.logger.createSubLogger('system');
this.systemQueueWorker
.on('active', (job) => systemLogger.debug(`active id=${job.id}`))
.on('completed', (job, result) => systemLogger.debug(`completed(${result}) id=${job.id}`))
.on('active', (job) => logger.debug(`active id=${job.id}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
.on('failed', (job, err: Error) => {
systemLogger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) });
logger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) });
if (config.sentryForBackend) {
Sentry.captureMessage(`Queue: System: ${job?.name ?? '?'}: ${err.message}`, {
level: 'error',
@@ -174,8 +177,8 @@ export class QueueProcessorService implements OnApplicationShutdown {
});
}
})
.on('error', (err: Error) => systemLogger.error(`error ${err.stack}`, { e: renderError(err) }))
.on('stalled', (jobId) => systemLogger.warn(`stalled id=${jobId}`));
.on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) }))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@@ -217,13 +220,13 @@ export class QueueProcessorService implements OnApplicationShutdown {
autorun: false,
});
const dbLogger = this.logger.createSubLogger('db');
const logger = this.logger.createSubLogger('db');
this.dbQueueWorker
.on('active', (job) => dbLogger.debug(`active id=${job.id}`))
.on('completed', (job, result) => dbLogger.debug(`completed(${result}) id=${job.id}`))
.on('active', (job) => logger.debug(`active id=${job.id}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
.on('failed', (job, err) => {
dbLogger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) });
logger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) });
if (config.sentryForBackend) {
Sentry.captureMessage(`Queue: DB: ${job?.name ?? '?'}: ${err.message}`, {
level: 'error',
@@ -231,8 +234,8 @@ export class QueueProcessorService implements OnApplicationShutdown {
});
}
})
.on('error', (err: Error) => dbLogger.error(`error ${err.stack}`, { e: renderError(err) }))
.on('stalled', (jobId) => dbLogger.warn(`stalled id=${jobId}`));
.on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) }))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@@ -257,13 +260,13 @@ export class QueueProcessorService implements OnApplicationShutdown {
},
});
const deliverLogger = this.logger.createSubLogger('deliver');
const logger = this.logger.createSubLogger('deliver');
this.deliverQueueWorker
.on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
.on('completed', (job, result) => deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
.on('active', (job) => logger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
.on('failed', (job, err) => {
deliverLogger.error(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`);
logger.error(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`);
if (config.sentryForBackend) {
Sentry.captureMessage(`Queue: Deliver: ${err.message}`, {
level: 'error',
@@ -271,8 +274,8 @@ export class QueueProcessorService implements OnApplicationShutdown {
});
}
})
.on('error', (err: Error) => deliverLogger.error(`error ${err.stack}`, { e: renderError(err) }))
.on('stalled', (jobId) => deliverLogger.warn(`stalled id=${jobId}`));
.on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) }))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@@ -297,13 +300,13 @@ export class QueueProcessorService implements OnApplicationShutdown {
},
});
const inboxLogger = this.logger.createSubLogger('inbox');
const logger = this.logger.createSubLogger('inbox');
this.inboxQueueWorker
.on('active', (job) => inboxLogger.debug(`active ${getJobInfo(job, true)}`))
.on('completed', (job, result) => inboxLogger.debug(`completed(${result}) ${getJobInfo(job, true)}`))
.on('active', (job) => logger.debug(`active ${getJobInfo(job, true)}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)}`))
.on('failed', (job, err) => {
inboxLogger.error(`failed(${err.stack}) ${getJobInfo(job)} activity=${job ? (job.data.activity ? job.data.activity.id : 'none') : '-'}`, { job, e: renderError(err) });
logger.error(`failed(${err.stack}) ${getJobInfo(job)} activity=${job ? (job.data.activity ? job.data.activity.id : 'none') : '-'}`, { job, e: renderError(err) });
if (config.sentryForBackend) {
Sentry.captureMessage(`Queue: Inbox: ${err.message}`, {
level: 'error',
@@ -311,21 +314,21 @@ export class QueueProcessorService implements OnApplicationShutdown {
});
}
})
.on('error', (err: Error) => inboxLogger.error(`error ${err.stack}`, { e: renderError(err) }))
.on('stalled', (jobId) => inboxLogger.warn(`stalled id=${jobId}`));
.on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) }))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
//#region webhook deliver
//#region user-webhook deliver
{
this.webhookDeliverQueueWorker = new Bull.Worker(QUEUE.WEBHOOK_DELIVER, (job) => {
this.userWebhookDeliverQueueWorker = new Bull.Worker(QUEUE.USER_WEBHOOK_DELIVER, (job) => {
if (this.config.sentryForBackend) {
return Sentry.startSpan({ name: 'Queue: WebhookDeliver' }, () => this.webhookDeliverProcessorService.process(job));
return Sentry.startSpan({ name: 'Queue: UserWebhookDeliver' }, () => this.userWebhookDeliverProcessorService.process(job));
} else {
return this.webhookDeliverProcessorService.process(job);
return this.userWebhookDeliverProcessorService.process(job);
}
}, {
...baseQueueOptions(this.config, QUEUE.WEBHOOK_DELIVER),
...baseQueueOptions(this.config, QUEUE.USER_WEBHOOK_DELIVER),
autorun: false,
concurrency: 64,
limiter: {
@@ -337,22 +340,62 @@ export class QueueProcessorService implements OnApplicationShutdown {
},
});
const webhookLogger = this.logger.createSubLogger('webhook');
const logger = this.logger.createSubLogger('user-webhook');
this.webhookDeliverQueueWorker
.on('active', (job) => webhookLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
.on('completed', (job, result) => webhookLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
this.userWebhookDeliverQueueWorker
.on('active', (job) => logger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
.on('failed', (job, err) => {
webhookLogger.error(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`);
logger.error(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`);
if (config.sentryForBackend) {
Sentry.captureMessage(`Queue: WebhookDeliver: ${err.message}`, {
Sentry.captureMessage(`Queue: UserWebhookDeliver: ${err.message}`, {
level: 'error',
extra: { job, err },
});
}
})
.on('error', (err: Error) => webhookLogger.error(`error ${err.stack}`, { e: renderError(err) }))
.on('stalled', (jobId) => webhookLogger.warn(`stalled id=${jobId}`));
.on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) }))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
//#region system-webhook deliver
{
this.systemWebhookDeliverQueueWorker = new Bull.Worker(QUEUE.SYSTEM_WEBHOOK_DELIVER, (job) => {
if (this.config.sentryForBackend) {
return Sentry.startSpan({ name: 'Queue: SystemWebhookDeliver' }, () => this.systemWebhookDeliverProcessorService.process(job));
} else {
return this.systemWebhookDeliverProcessorService.process(job);
}
}, {
...baseQueueOptions(this.config, QUEUE.SYSTEM_WEBHOOK_DELIVER),
autorun: false,
concurrency: 16,
limiter: {
max: 16,
duration: 1000,
},
settings: {
backoffStrategy: httpRelatedBackoff,
},
});
const logger = this.logger.createSubLogger('system-webhook');
this.systemWebhookDeliverQueueWorker
.on('active', (job) => logger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
.on('failed', (job, err) => {
logger.error(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`);
if (config.sentryForBackend) {
Sentry.captureMessage(`Queue: SystemWebhookDeliver: ${err.message}`, {
level: 'error',
extra: { job, err },
});
}
})
.on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) }))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@@ -384,13 +427,13 @@ export class QueueProcessorService implements OnApplicationShutdown {
},
});
const relationshipLogger = this.logger.createSubLogger('relationship');
const logger = this.logger.createSubLogger('relationship');
this.relationshipQueueWorker
.on('active', (job) => relationshipLogger.debug(`active id=${job.id}`))
.on('completed', (job, result) => relationshipLogger.debug(`completed(${result}) id=${job.id}`))
.on('active', (job) => logger.debug(`active id=${job.id}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
.on('failed', (job, err) => {
relationshipLogger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) });
logger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) });
if (config.sentryForBackend) {
Sentry.captureMessage(`Queue: Relationship: ${job?.name ?? '?'}: ${err.message}`, {
level: 'error',
@@ -398,8 +441,8 @@ export class QueueProcessorService implements OnApplicationShutdown {
});
}
})
.on('error', (err: Error) => relationshipLogger.error(`error ${err.stack}`, { e: renderError(err) }))
.on('stalled', (jobId) => relationshipLogger.warn(`stalled id=${jobId}`));
.on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) }))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@@ -425,13 +468,13 @@ export class QueueProcessorService implements OnApplicationShutdown {
concurrency: 16,
});
const objectStorageLogger = this.logger.createSubLogger('objectStorage');
const logger = this.logger.createSubLogger('objectStorage');
this.objectStorageQueueWorker
.on('active', (job) => objectStorageLogger.debug(`active id=${job.id}`))
.on('completed', (job, result) => objectStorageLogger.debug(`completed(${result}) id=${job.id}`))
.on('active', (job) => logger.debug(`active id=${job.id}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
.on('failed', (job, err) => {
objectStorageLogger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) });
logger.error(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) });
if (config.sentryForBackend) {
Sentry.captureMessage(`Queue: ObjectStorage: ${job?.name ?? '?'}: ${err.message}`, {
level: 'error',
@@ -439,8 +482,8 @@ export class QueueProcessorService implements OnApplicationShutdown {
});
}
})
.on('error', (err: Error) => objectStorageLogger.error(`error ${err.stack}`, { e: renderError(err) }))
.on('stalled', (jobId) => objectStorageLogger.warn(`stalled id=${jobId}`));
.on('error', (err: Error) => logger.error(`error ${err.stack}`, { e: renderError(err) }))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@@ -467,7 +510,8 @@ export class QueueProcessorService implements OnApplicationShutdown {
this.dbQueueWorker.run(),
this.deliverQueueWorker.run(),
this.inboxQueueWorker.run(),
this.webhookDeliverQueueWorker.run(),
this.userWebhookDeliverQueueWorker.run(),
this.systemWebhookDeliverQueueWorker.run(),
this.relationshipQueueWorker.run(),
this.objectStorageQueueWorker.run(),
this.endedPollNotificationQueueWorker.run(),
@@ -481,7 +525,8 @@ export class QueueProcessorService implements OnApplicationShutdown {
this.dbQueueWorker.close(),
this.deliverQueueWorker.close(),
this.inboxQueueWorker.close(),
this.webhookDeliverQueueWorker.close(),
this.userWebhookDeliverQueueWorker.close(),
this.systemWebhookDeliverQueueWorker.close(),
this.relationshipQueueWorker.close(),
this.objectStorageQueueWorker.close(),
this.endedPollNotificationQueueWorker.close(),

View File

@@ -14,7 +14,8 @@ export const QUEUE = {
DB: 'db',
RELATIONSHIP: 'relationship',
OBJECT_STORAGE: 'objectStorage',
WEBHOOK_DELIVER: 'webhookDeliver',
USER_WEBHOOK_DELIVER: 'userWebhookDeliver',
SYSTEM_WEBHOOK_DELIVER: 'systemWebhookDeliver',
};
export function baseQueueOptions(config: Config, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.QueueOptions {

View File

@@ -0,0 +1,87 @@
/*
* SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/
import { Inject, Injectable } from '@nestjs/common';
import * as Bull from 'bullmq';
import { DI } from '@/di-symbols.js';
import type { SystemWebhooksRepository } from '@/models/_.js';
import type { Config } from '@/config.js';
import type Logger from '@/logger.js';
import { HttpRequestService } from '@/core/HttpRequestService.js';
import { StatusError } from '@/misc/status-error.js';
import { bindThis } from '@/decorators.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import { SystemWebhookDeliverJobData } from '../types.js';
@Injectable()
export class SystemWebhookDeliverProcessorService {
private logger: Logger;
constructor(
@Inject(DI.config)
private config: Config,
@Inject(DI.systemWebhooksRepository)
private systemWebhooksRepository: SystemWebhooksRepository,
private httpRequestService: HttpRequestService,
private queueLoggerService: QueueLoggerService,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('webhook');
}
@bindThis
public async process(job: Bull.Job<SystemWebhookDeliverJobData>): Promise<string> {
try {
this.logger.debug(`delivering ${job.data.webhookId}`);
const res = await this.httpRequestService.send(job.data.to, {
method: 'POST',
headers: {
'User-Agent': 'Misskey-Hooks',
'X-Misskey-Host': this.config.host,
'X-Misskey-Hook-Id': job.data.webhookId,
'X-Misskey-Hook-Secret': job.data.secret,
'Content-Type': 'application/json',
},
body: JSON.stringify({
server: this.config.url,
hookId: job.data.webhookId,
eventId: job.data.eventId,
createdAt: job.data.createdAt,
type: job.data.type,
body: job.data.content,
}),
});
this.systemWebhooksRepository.update({ id: job.data.webhookId }, {
latestSentAt: new Date(),
latestStatus: res.status,
});
return 'Success';
} catch (res) {
this.logger.error(res as Error);
this.systemWebhooksRepository.update({ id: job.data.webhookId }, {
latestSentAt: new Date(),
latestStatus: res instanceof StatusError ? res.statusCode : 1,
});
if (res instanceof StatusError) {
// 4xx
if (!res.isRetryable) {
throw new Bull.UnrecoverableError(`${res.statusCode} ${res.statusMessage}`);
}
// 5xx etc.
throw new Error(`${res.statusCode} ${res.statusMessage}`);
} else {
// DNS error, socket error, timeout ...
throw res;
}
}
}
}

View File

@@ -13,10 +13,10 @@ import { HttpRequestService } from '@/core/HttpRequestService.js';
import { StatusError } from '@/misc/status-error.js';
import { bindThis } from '@/decorators.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type { WebhookDeliverJobData } from '../types.js';
import { UserWebhookDeliverJobData } from '../types.js';
@Injectable()
export class WebhookDeliverProcessorService {
export class UserWebhookDeliverProcessorService {
private logger: Logger;
constructor(
@@ -33,7 +33,7 @@ export class WebhookDeliverProcessorService {
}
@bindThis
public async process(job: Bull.Job<WebhookDeliverJobData>): Promise<string> {
public async process(job: Bull.Job<UserWebhookDeliverJobData>): Promise<string> {
try {
this.logger.debug(`delivering ${job.data.webhookId}`);

View File

@@ -106,7 +106,17 @@ export type EndedPollNotificationJobData = {
noteId: MiNote['id'];
};
export type WebhookDeliverJobData = {
export type SystemWebhookDeliverJobData = {
type: string;
content: unknown;
webhookId: MiWebhook['id'];
to: string;
secret: string;
createdAt: number;
eventId: string;
};
export type UserWebhookDeliverJobData = {
type: string;
content: unknown;
webhookId: MiWebhook['id'];