151 lines
		
	
	
		
			5.1 KiB
		
	
	
	
		
			TypeScript
		
	
	
	
	
	
			
		
		
	
	
			151 lines
		
	
	
		
			5.1 KiB
		
	
	
	
		
			TypeScript
		
	
	
	
	
	
| /*
 | |
|  * SPDX-FileCopyrightText: syuilo and misskey-project
 | |
|  * SPDX-License-Identifier: AGPL-3.0-only
 | |
|  */
 | |
| 
 | |
| import { Inject, Injectable } from '@nestjs/common';
 | |
| import * as Bull from 'bullmq';
 | |
| import { Not } from 'typeorm';
 | |
| import { DI } from '@/di-symbols.js';
 | |
| import type { InstancesRepository, MiMeta } from '@/models/_.js';
 | |
| import type Logger from '@/logger.js';
 | |
| import { ApRequestService } from '@/core/activitypub/ApRequestService.js';
 | |
| import { FederatedInstanceService } from '@/core/FederatedInstanceService.js';
 | |
| import { FetchInstanceMetadataService } from '@/core/FetchInstanceMetadataService.js';
 | |
| import { MemorySingleCache } from '@/misc/cache.js';
 | |
| import type { MiInstance } from '@/models/Instance.js';
 | |
| import InstanceChart from '@/core/chart/charts/instance.js';
 | |
| import ApRequestChart from '@/core/chart/charts/ap-request.js';
 | |
| import FederationChart from '@/core/chart/charts/federation.js';
 | |
| import { StatusError } from '@/misc/status-error.js';
 | |
| import { UtilityService } from '@/core/UtilityService.js';
 | |
| import { bindThis } from '@/decorators.js';
 | |
| import { QueueLoggerService } from '../QueueLoggerService.js';
 | |
| import type { DeliverJobData } from '../types.js';
 | |
| 
 | |
| @Injectable()
 | |
| export class DeliverProcessorService {
 | |
| 	private logger: Logger;
 | |
| 	private suspendedHostsCache: MemorySingleCache<MiInstance[]>;
 | |
| 	private latest: string | null;
 | |
| 
 | |
| 	constructor(
 | |
| 		@Inject(DI.meta)
 | |
| 		private meta: MiMeta,
 | |
| 
 | |
| 		@Inject(DI.instancesRepository)
 | |
| 		private instancesRepository: InstancesRepository,
 | |
| 
 | |
| 		private utilityService: UtilityService,
 | |
| 		private federatedInstanceService: FederatedInstanceService,
 | |
| 		private fetchInstanceMetadataService: FetchInstanceMetadataService,
 | |
| 		private apRequestService: ApRequestService,
 | |
| 		private instanceChart: InstanceChart,
 | |
| 		private apRequestChart: ApRequestChart,
 | |
| 		private federationChart: FederationChart,
 | |
| 		private queueLoggerService: QueueLoggerService,
 | |
| 	) {
 | |
| 		this.logger = this.queueLoggerService.logger.createSubLogger('deliver');
 | |
| 		this.suspendedHostsCache = new MemorySingleCache<MiInstance[]>(1000 * 60 * 60); // 1h
 | |
| 	}
 | |
| 
 | |
| 	@bindThis
 | |
| 	public async process(job: Bull.Job<DeliverJobData>): Promise<string> {
 | |
| 		const { host } = new URL(job.data.to);
 | |
| 
 | |
| 		if (!this.utilityService.isFederationAllowedUri(job.data.to)) {
 | |
| 			return 'skip (blocked)';
 | |
| 		}
 | |
| 
 | |
| 		// isSuspendedなら中断
 | |
| 		let suspendedHosts = this.suspendedHostsCache.get();
 | |
| 		if (suspendedHosts == null) {
 | |
| 			suspendedHosts = await this.instancesRepository.find({
 | |
| 				where: {
 | |
| 					suspensionState: Not('none'),
 | |
| 				},
 | |
| 			});
 | |
| 			this.suspendedHostsCache.set(suspendedHosts);
 | |
| 		}
 | |
| 		if (suspendedHosts.map(x => x.host).includes(this.utilityService.toPuny(host))) {
 | |
| 			return 'skip (suspended)';
 | |
| 		}
 | |
| 
 | |
| 		try {
 | |
| 			await this.apRequestService.signedPost(job.data.user, job.data.to, job.data.content, job.data.digest);
 | |
| 
 | |
| 			// Update stats
 | |
| 			this.federatedInstanceService.fetch(host).then(i => {
 | |
| 				if (i.isNotResponding) {
 | |
| 					this.federatedInstanceService.update(i.id, {
 | |
| 						isNotResponding: false,
 | |
| 						notRespondingSince: null,
 | |
| 					});
 | |
| 				}
 | |
| 
 | |
| 				this.fetchInstanceMetadataService.fetchInstanceMetadata(i);
 | |
| 				this.apRequestChart.deliverSucc();
 | |
| 				this.federationChart.deliverd(i.host, true);
 | |
| 
 | |
| 				if (this.meta.enableChartsForFederatedInstances) {
 | |
| 					this.instanceChart.requestSent(i.host, true);
 | |
| 				}
 | |
| 			});
 | |
| 
 | |
| 			return 'Success';
 | |
| 		} catch (res) {
 | |
| 			// Update stats
 | |
| 			this.federatedInstanceService.fetch(host).then(i => {
 | |
| 				if (!i.isNotResponding) {
 | |
| 					this.federatedInstanceService.update(i.id, {
 | |
| 						isNotResponding: true,
 | |
| 						notRespondingSince: new Date(),
 | |
| 					});
 | |
| 				} else if (i.notRespondingSince) {
 | |
| 					// 1週間以上不通ならサスペンド
 | |
| 					if (i.suspensionState === 'none' && i.notRespondingSince.getTime() <= Date.now() - 1000 * 60 * 60 * 24 * 7) {
 | |
| 						this.federatedInstanceService.update(i.id, {
 | |
| 							suspensionState: 'autoSuspendedForNotResponding',
 | |
| 						});
 | |
| 					}
 | |
| 				} else {
 | |
| 					// isNotRespondingがtrueでnotRespondingSinceがnullの場合はnotRespondingSinceをセット
 | |
| 					// notRespondingSinceは新たな機能なので、それ以前のデータにはnotRespondingSinceがない場合がある
 | |
| 					this.federatedInstanceService.update(i.id, {
 | |
| 						notRespondingSince: new Date(),
 | |
| 					});
 | |
| 				}
 | |
| 
 | |
| 				this.apRequestChart.deliverFail();
 | |
| 				this.federationChart.deliverd(i.host, false);
 | |
| 
 | |
| 				if (this.meta.enableChartsForFederatedInstances) {
 | |
| 					this.instanceChart.requestSent(i.host, false);
 | |
| 				}
 | |
| 			});
 | |
| 
 | |
| 			if (res instanceof StatusError) {
 | |
| 				// 4xx
 | |
| 				if (!res.isRetryable) {
 | |
| 					// 相手が閉鎖していることを明示しているため、配送停止する
 | |
| 					if (job.data.isSharedInbox && res.statusCode === 410) {
 | |
| 						this.federatedInstanceService.fetch(host).then(i => {
 | |
| 							this.federatedInstanceService.update(i.id, {
 | |
| 								suspensionState: 'goneSuspended',
 | |
| 							});
 | |
| 						});
 | |
| 						throw new Bull.UnrecoverableError(`${host} is gone`);
 | |
| 					}
 | |
| 					throw new Bull.UnrecoverableError(`${res.statusCode} ${res.statusMessage}`);
 | |
| 				}
 | |
| 
 | |
| 				// 5xx etc.
 | |
| 				throw new Error(`${res.statusCode} ${res.statusMessage}`);
 | |
| 			} else {
 | |
| 				// DNS error, socket error, timeout ...
 | |
| 				throw res;
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | 
