feat(backend): イベント用Redisを別サーバーに分離できるように
This commit is contained in:
		| @@ -1,8 +1,15 @@ | ||||
| import Redis from 'ioredis'; | ||||
| import { loadConfig } from './built/config.js'; | ||||
| import { createRedisConnection } from './built/redis.js'; | ||||
|  | ||||
| const config = loadConfig(); | ||||
| const redis = createRedisConnection(config); | ||||
| const redis = new Redis({ | ||||
| 	port: config.redis.port, | ||||
| 	host: config.redis.host, | ||||
| 	family: config.redis.family == null ? 0 : config.redis.family, | ||||
| 	password: config.redis.pass, | ||||
| 	keyPrefix: `${config.redis.prefix}:`, | ||||
| 	db: config.redis.db ?? 0, | ||||
| }); | ||||
|  | ||||
| redis.on('connect', () => redis.disconnect()); | ||||
| redis.on('error', (e) => { | ||||
|   | ||||
| @@ -2,18 +2,15 @@ import { setTimeout } from 'node:timers/promises'; | ||||
| import { Global, Inject, Module } from '@nestjs/common'; | ||||
| import Redis from 'ioredis'; | ||||
| import { DataSource } from 'typeorm'; | ||||
| import { createRedisConnection } from '@/redis.js'; | ||||
| import { DI } from './di-symbols.js'; | ||||
| import { loadConfig } from './config.js'; | ||||
| import { createPostgresDataSource } from './postgres.js'; | ||||
| import { RepositoryModule } from './models/RepositoryModule.js'; | ||||
| import type { Provider, OnApplicationShutdown } from '@nestjs/common'; | ||||
|  | ||||
| const config = loadConfig(); | ||||
|  | ||||
| const $config: Provider = { | ||||
| 	provide: DI.config, | ||||
| 	useValue: config, | ||||
| 	useValue: loadConfig(), | ||||
| }; | ||||
|  | ||||
| const $db: Provider = { | ||||
| @@ -28,18 +25,31 @@ const $db: Provider = { | ||||
| const $redis: Provider = { | ||||
| 	provide: DI.redis, | ||||
| 	useFactory: (config) => { | ||||
| 		const redisClient = createRedisConnection(config); | ||||
| 		return redisClient; | ||||
| 		return new Redis({ | ||||
| 			port: config.redis.port, | ||||
| 			host: config.redis.host, | ||||
| 			family: config.redis.family == null ? 0 : config.redis.family, | ||||
| 			password: config.redis.pass, | ||||
| 			keyPrefix: `${config.redis.prefix}:`, | ||||
| 			db: config.redis.db ?? 0, | ||||
| 		}); | ||||
| 	}, | ||||
| 	inject: [DI.config], | ||||
| }; | ||||
|  | ||||
| const $redisSubscriber: Provider = { | ||||
| 	provide: DI.redisSubscriber, | ||||
| const $redisForPubsub: Provider = { | ||||
| 	provide: DI.redisForPubsub, | ||||
| 	useFactory: (config) => { | ||||
| 		const redisSubscriber = createRedisConnection(config); | ||||
| 		redisSubscriber.subscribe(config.host); | ||||
| 		return redisSubscriber; | ||||
| 		const redis = new Redis({ | ||||
| 			port: config.redisForPubsub.port, | ||||
| 			host: config.redisForPubsub.host, | ||||
| 			family: config.redisForPubsub.family == null ? 0 : config.redisForPubsub.family, | ||||
| 			password: config.redisForPubsub.pass, | ||||
| 			keyPrefix: `${config.redisForPubsub.prefix}:`, | ||||
| 			db: config.redisForPubsub.db ?? 0, | ||||
| 		}); | ||||
| 		redis.subscribe(config.host); | ||||
| 		return redis; | ||||
| 	}, | ||||
| 	inject: [DI.config], | ||||
| }; | ||||
| @@ -47,14 +57,14 @@ const $redisSubscriber: Provider = { | ||||
| @Global() | ||||
| @Module({ | ||||
| 	imports: [RepositoryModule], | ||||
| 	providers: [$config, $db, $redis, $redisSubscriber], | ||||
| 	exports: [$config, $db, $redis, $redisSubscriber, RepositoryModule], | ||||
| 	providers: [$config, $db, $redis, $redisForPubsub], | ||||
| 	exports: [$config, $db, $redis, $redisForPubsub, RepositoryModule], | ||||
| }) | ||||
| export class GlobalModule implements OnApplicationShutdown { | ||||
| 	constructor( | ||||
| 		@Inject(DI.db) private db: DataSource, | ||||
| 		@Inject(DI.redis) private redisClient: Redis.Redis, | ||||
| 		@Inject(DI.redisSubscriber) private redisSubscriber: Redis.Redis, | ||||
| 		@Inject(DI.redisForPubsub) private redisForPubsub: Redis.Redis, | ||||
| 	) {} | ||||
|  | ||||
| 	async onApplicationShutdown(signal: string): Promise<void> { | ||||
| @@ -69,7 +79,7 @@ export class GlobalModule implements OnApplicationShutdown { | ||||
| 		await Promise.all([ | ||||
| 			this.db.destroy(), | ||||
| 			this.redisClient.disconnect(), | ||||
| 			this.redisSubscriber.disconnect(), | ||||
| 			this.redisForPubsub.disconnect(), | ||||
| 		]); | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -33,6 +33,14 @@ export type Source = { | ||||
| 		db?: number; | ||||
| 		prefix?: string; | ||||
| 	}; | ||||
| 	redisForPubsub?: { | ||||
| 		host: string; | ||||
| 		port: number; | ||||
| 		family?: number; | ||||
| 		pass: string; | ||||
| 		db?: number; | ||||
| 		prefix?: string; | ||||
| 	}; | ||||
| 	elasticsearch: { | ||||
| 		host: string; | ||||
| 		port: number; | ||||
| @@ -151,6 +159,7 @@ export function loadConfig() { | ||||
| 		: null; | ||||
|  | ||||
| 	if (!config.redis.prefix) config.redis.prefix = mixin.host; | ||||
| 	if (config.redisForPubsub == null) config.redisForPubsub = config.redis; | ||||
|  | ||||
| 	return Object.assign(config, mixin); | ||||
| } | ||||
|   | ||||
| @@ -27,8 +27,8 @@ export class AntennaService implements OnApplicationShutdown { | ||||
| 		@Inject(DI.redis) | ||||
| 		private redisClient: Redis.Redis, | ||||
|  | ||||
| 		@Inject(DI.redisSubscriber) | ||||
| 		private redisSubscriber: Redis.Redis, | ||||
| 		@Inject(DI.redisForPubsub) | ||||
| 		private redisForPubsub: Redis.Redis, | ||||
|  | ||||
| 		@Inject(DI.mutingsRepository) | ||||
| 		private mutingsRepository: MutingsRepository, | ||||
| @@ -52,12 +52,12 @@ export class AntennaService implements OnApplicationShutdown { | ||||
| 		this.antennasFetched = false; | ||||
| 		this.antennas = []; | ||||
|  | ||||
| 		this.redisSubscriber.on('message', this.onRedisMessage); | ||||
| 		this.redisForPubsub.on('message', this.onRedisMessage); | ||||
| 	} | ||||
|  | ||||
| 	@bindThis | ||||
| 	public onApplicationShutdown(signal?: string | undefined) { | ||||
| 		this.redisSubscriber.off('message', this.onRedisMessage); | ||||
| 		this.redisForPubsub.off('message', this.onRedisMessage); | ||||
| 	} | ||||
|  | ||||
| 	@bindThis | ||||
|   | ||||
| @@ -14,8 +14,8 @@ export class MetaService implements OnApplicationShutdown { | ||||
| 	private intervalId: NodeJS.Timer; | ||||
|  | ||||
| 	constructor( | ||||
| 		@Inject(DI.redisSubscriber) | ||||
| 		private redisSubscriber: Redis.Redis, | ||||
| 		@Inject(DI.redisForPubsub) | ||||
| 		private redisForPubsub: Redis.Redis, | ||||
|  | ||||
| 		@Inject(DI.db) | ||||
| 		private db: DataSource, | ||||
| @@ -33,7 +33,7 @@ export class MetaService implements OnApplicationShutdown { | ||||
| 			}, 1000 * 60 * 5); | ||||
| 		} | ||||
|  | ||||
| 		this.redisSubscriber.on('message', this.onMessage); | ||||
| 		this.redisForPubsub.on('message', this.onMessage); | ||||
| 	} | ||||
|  | ||||
| 	@bindThis | ||||
| @@ -122,6 +122,6 @@ export class MetaService implements OnApplicationShutdown { | ||||
| 	@bindThis | ||||
| 	public onApplicationShutdown(signal?: string | undefined) { | ||||
| 		clearInterval(this.intervalId); | ||||
| 		this.redisSubscriber.off('message', this.onMessage); | ||||
| 		this.redisForPubsub.off('message', this.onMessage); | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -64,8 +64,8 @@ export class RoleService implements OnApplicationShutdown { | ||||
| 	public static NotAssignedError = class extends Error {}; | ||||
|  | ||||
| 	constructor( | ||||
| 		@Inject(DI.redisSubscriber) | ||||
| 		private redisSubscriber: Redis.Redis, | ||||
| 		@Inject(DI.redisForPubsub) | ||||
| 		private redisForPubsub: Redis.Redis, | ||||
|  | ||||
| 		@Inject(DI.usersRepository) | ||||
| 		private usersRepository: UsersRepository, | ||||
| @@ -87,7 +87,7 @@ export class RoleService implements OnApplicationShutdown { | ||||
| 		this.rolesCache = new MemorySingleCache<Role[]>(Infinity); | ||||
| 		this.roleAssignmentByUserIdCache = new MemoryKVCache<RoleAssignment[]>(Infinity); | ||||
|  | ||||
| 		this.redisSubscriber.on('message', this.onMessage); | ||||
| 		this.redisForPubsub.on('message', this.onMessage); | ||||
| 	} | ||||
|  | ||||
| 	@bindThis | ||||
| @@ -400,6 +400,6 @@ export class RoleService implements OnApplicationShutdown { | ||||
|  | ||||
| 	@bindThis | ||||
| 	public onApplicationShutdown(signal?: string | undefined) { | ||||
| 		this.redisSubscriber.off('message', this.onMessage); | ||||
| 		this.redisForPubsub.off('message', this.onMessage); | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -13,14 +13,14 @@ export class WebhookService implements OnApplicationShutdown { | ||||
| 	private webhooks: Webhook[] = []; | ||||
|  | ||||
| 	constructor( | ||||
| 		@Inject(DI.redisSubscriber) | ||||
| 		private redisSubscriber: Redis.Redis, | ||||
| 		@Inject(DI.redisForPubsub) | ||||
| 		private redisForPubsub: Redis.Redis, | ||||
|  | ||||
| 		@Inject(DI.webhooksRepository) | ||||
| 		private webhooksRepository: WebhooksRepository, | ||||
| 	) { | ||||
| 		//this.onMessage = this.onMessage.bind(this); | ||||
| 		this.redisSubscriber.on('message', this.onMessage); | ||||
| 		this.redisForPubsub.on('message', this.onMessage); | ||||
| 	} | ||||
|  | ||||
| 	@bindThis | ||||
| @@ -82,6 +82,6 @@ export class WebhookService implements OnApplicationShutdown { | ||||
|  | ||||
| 	@bindThis | ||||
| 	public onApplicationShutdown(signal?: string | undefined) { | ||||
| 		this.redisSubscriber.off('message', this.onMessage); | ||||
| 		this.redisForPubsub.off('message', this.onMessage); | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -2,7 +2,7 @@ export const DI = { | ||||
| 	config: Symbol('config'), | ||||
| 	db: Symbol('db'), | ||||
| 	redis: Symbol('redis'), | ||||
| 	redisSubscriber: Symbol('redisSubscriber'), | ||||
| 	redisForPubsub: Symbol('redisForPubsub'), | ||||
|  | ||||
| 	//#region Repositories | ||||
| 	usersRepository: Symbol('usersRepository'), | ||||
|   | ||||
| @@ -1,13 +0,0 @@ | ||||
| import Redis from 'ioredis'; | ||||
| import { Config } from '@/config.js'; | ||||
|  | ||||
| export function createRedisConnection(config: Config): Redis.Redis { | ||||
| 	return new Redis({ | ||||
| 		port: config.redis.port, | ||||
| 		host: config.redis.host, | ||||
| 		family: config.redis.family == null ? 0 : config.redis.family, | ||||
| 		password: config.redis.pass, | ||||
| 		keyPrefix: `${config.redis.prefix}:`, | ||||
| 		db: config.redis.db ?? 0, | ||||
| 	}); | ||||
| } | ||||
| @@ -22,8 +22,8 @@ export class StreamingApiServerService { | ||||
| 		@Inject(DI.config) | ||||
| 		private config: Config, | ||||
|  | ||||
| 		@Inject(DI.redisSubscriber) | ||||
| 		private redisSubscriber: Redis.Redis, | ||||
| 		@Inject(DI.redisForPubsub) | ||||
| 		private redisForPubsub: Redis.Redis, | ||||
|  | ||||
| 		@Inject(DI.usersRepository) | ||||
| 		private usersRepository: UsersRepository, | ||||
| @@ -81,7 +81,7 @@ export class StreamingApiServerService { | ||||
| 				ev.emit(parsed.channel, parsed.message); | ||||
| 			} | ||||
|  | ||||
| 			this.redisSubscriber.on('message', onRedisMessage); | ||||
| 			this.redisForPubsub.on('message', onRedisMessage); | ||||
|  | ||||
| 			const main = new MainStreamConnection( | ||||
| 				this.channelsService, | ||||
| @@ -111,7 +111,7 @@ export class StreamingApiServerService { | ||||
| 			connection.once('close', () => { | ||||
| 				ev.removeAllListeners(); | ||||
| 				main.dispose(); | ||||
| 				this.redisSubscriber.off('message', onRedisMessage); | ||||
| 				this.redisForPubsub.off('message', onRedisMessage); | ||||
| 				if (intervalId) clearInterval(intervalId); | ||||
| 			}); | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 syuilo
					syuilo