fix(backend): イベント用redis分離が上手く動かない問題を修正
This commit is contained in:
		| @@ -37,8 +37,24 @@ const $redis: Provider = { | |||||||
| 	inject: [DI.config], | 	inject: [DI.config], | ||||||
| }; | }; | ||||||
|  |  | ||||||
| const $redisForPubsub: Provider = { | const $redisForPub: Provider = { | ||||||
| 	provide: DI.redisForPubsub, | 	provide: DI.redisForPub, | ||||||
|  | 	useFactory: (config) => { | ||||||
|  | 		const redis = new Redis({ | ||||||
|  | 			port: config.redisForPubsub.port, | ||||||
|  | 			host: config.redisForPubsub.host, | ||||||
|  | 			family: config.redisForPubsub.family == null ? 0 : config.redisForPubsub.family, | ||||||
|  | 			password: config.redisForPubsub.pass, | ||||||
|  | 			keyPrefix: `${config.redisForPubsub.prefix}:`, | ||||||
|  | 			db: config.redisForPubsub.db ?? 0, | ||||||
|  | 		}); | ||||||
|  | 		return redis; | ||||||
|  | 	}, | ||||||
|  | 	inject: [DI.config], | ||||||
|  | }; | ||||||
|  |  | ||||||
|  | const $redisForSub: Provider = { | ||||||
|  | 	provide: DI.redisForSub, | ||||||
| 	useFactory: (config) => { | 	useFactory: (config) => { | ||||||
| 		const redis = new Redis({ | 		const redis = new Redis({ | ||||||
| 			port: config.redisForPubsub.port, | 			port: config.redisForPubsub.port, | ||||||
| @@ -57,14 +73,15 @@ const $redisForPubsub: Provider = { | |||||||
| @Global() | @Global() | ||||||
| @Module({ | @Module({ | ||||||
| 	imports: [RepositoryModule], | 	imports: [RepositoryModule], | ||||||
| 	providers: [$config, $db, $redis, $redisForPubsub], | 	providers: [$config, $db, $redis, $redisForPub, $redisForSub], | ||||||
| 	exports: [$config, $db, $redis, $redisForPubsub, RepositoryModule], | 	exports: [$config, $db, $redis, $redisForPub, $redisForSub, RepositoryModule], | ||||||
| }) | }) | ||||||
| export class GlobalModule implements OnApplicationShutdown { | export class GlobalModule implements OnApplicationShutdown { | ||||||
| 	constructor( | 	constructor( | ||||||
| 		@Inject(DI.db) private db: DataSource, | 		@Inject(DI.db) private db: DataSource, | ||||||
| 		@Inject(DI.redis) private redisClient: Redis.Redis, | 		@Inject(DI.redis) private redisClient: Redis.Redis, | ||||||
| 		@Inject(DI.redisForPubsub) private redisForPubsub: Redis.Redis, | 		@Inject(DI.redisForPub) private redisForPub: Redis.Redis, | ||||||
|  | 		@Inject(DI.redisForSub) private redisForSub: Redis.Redis, | ||||||
| 	) {} | 	) {} | ||||||
|  |  | ||||||
| 	async onApplicationShutdown(signal: string): Promise<void> { | 	async onApplicationShutdown(signal: string): Promise<void> { | ||||||
| @@ -79,7 +96,8 @@ export class GlobalModule implements OnApplicationShutdown { | |||||||
| 		await Promise.all([ | 		await Promise.all([ | ||||||
| 			this.db.destroy(), | 			this.db.destroy(), | ||||||
| 			this.redisClient.disconnect(), | 			this.redisClient.disconnect(), | ||||||
| 			this.redisForPubsub.disconnect(), | 			this.redisForPub.disconnect(), | ||||||
|  | 			this.redisForSub.disconnect(), | ||||||
| 		]); | 		]); | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|   | |||||||
| @@ -27,8 +27,8 @@ export class AntennaService implements OnApplicationShutdown { | |||||||
| 		@Inject(DI.redis) | 		@Inject(DI.redis) | ||||||
| 		private redisClient: Redis.Redis, | 		private redisClient: Redis.Redis, | ||||||
|  |  | ||||||
| 		@Inject(DI.redisForPubsub) | 		@Inject(DI.redisForSub) | ||||||
| 		private redisForPubsub: Redis.Redis, | 		private redisForSub: Redis.Redis, | ||||||
|  |  | ||||||
| 		@Inject(DI.mutingsRepository) | 		@Inject(DI.mutingsRepository) | ||||||
| 		private mutingsRepository: MutingsRepository, | 		private mutingsRepository: MutingsRepository, | ||||||
| @@ -52,12 +52,12 @@ export class AntennaService implements OnApplicationShutdown { | |||||||
| 		this.antennasFetched = false; | 		this.antennasFetched = false; | ||||||
| 		this.antennas = []; | 		this.antennas = []; | ||||||
|  |  | ||||||
| 		this.redisForPubsub.on('message', this.onRedisMessage); | 		this.redisForSub.on('message', this.onRedisMessage); | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	@bindThis | 	@bindThis | ||||||
| 	public onApplicationShutdown(signal?: string | undefined) { | 	public onApplicationShutdown(signal?: string | undefined) { | ||||||
| 		this.redisForPubsub.off('message', this.onRedisMessage); | 		this.redisForSub.off('message', this.onRedisMessage); | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	@bindThis | 	@bindThis | ||||||
|   | |||||||
| @@ -27,8 +27,8 @@ export class CacheService implements OnApplicationShutdown { | |||||||
| 		@Inject(DI.redis) | 		@Inject(DI.redis) | ||||||
| 		private redisClient: Redis.Redis, | 		private redisClient: Redis.Redis, | ||||||
|  |  | ||||||
| 		@Inject(DI.redisForPubsub) | 		@Inject(DI.redisForSub) | ||||||
| 		private redisForPubsub: Redis.Redis, | 		private redisForSub: Redis.Redis, | ||||||
|  |  | ||||||
| 		@Inject(DI.usersRepository) | 		@Inject(DI.usersRepository) | ||||||
| 		private usersRepository: UsersRepository, | 		private usersRepository: UsersRepository, | ||||||
| @@ -116,7 +116,7 @@ export class CacheService implements OnApplicationShutdown { | |||||||
| 			fromRedisConverter: (value) => new Set(JSON.parse(value)), | 			fromRedisConverter: (value) => new Set(JSON.parse(value)), | ||||||
| 		}); | 		}); | ||||||
|  |  | ||||||
| 		this.redisForPubsub.on('message', this.onMessage); | 		this.redisForSub.on('message', this.onMessage); | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	@bindThis | 	@bindThis | ||||||
| @@ -167,6 +167,6 @@ export class CacheService implements OnApplicationShutdown { | |||||||
|  |  | ||||||
| 	@bindThis | 	@bindThis | ||||||
| 	public onApplicationShutdown(signal?: string | undefined) { | 	public onApplicationShutdown(signal?: string | undefined) { | ||||||
| 		this.redisForPubsub.off('message', this.onMessage); | 		this.redisForSub.off('message', this.onMessage); | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|   | |||||||
| @@ -26,8 +26,8 @@ export class GlobalEventService { | |||||||
| 		@Inject(DI.config) | 		@Inject(DI.config) | ||||||
| 		private config: Config, | 		private config: Config, | ||||||
|  |  | ||||||
| 		@Inject(DI.redis) | 		@Inject(DI.redisForPub) | ||||||
| 		private redisClient: Redis.Redis, | 		private redisForPub: Redis.Redis, | ||||||
| 	) { | 	) { | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| @@ -37,7 +37,7 @@ export class GlobalEventService { | |||||||
| 			{ type: type, body: null } : | 			{ type: type, body: null } : | ||||||
| 			{ type: type, body: value }; | 			{ type: type, body: value }; | ||||||
|  |  | ||||||
| 		this.redisClient.publish(this.config.host, JSON.stringify({ | 		this.redisForPub.publish(this.config.host, JSON.stringify({ | ||||||
| 			channel: channel, | 			channel: channel, | ||||||
| 			message: message, | 			message: message, | ||||||
| 		})); | 		})); | ||||||
|   | |||||||
| @@ -14,8 +14,8 @@ export class MetaService implements OnApplicationShutdown { | |||||||
| 	private intervalId: NodeJS.Timer; | 	private intervalId: NodeJS.Timer; | ||||||
|  |  | ||||||
| 	constructor( | 	constructor( | ||||||
| 		@Inject(DI.redisForPubsub) | 		@Inject(DI.redisForSub) | ||||||
| 		private redisForPubsub: Redis.Redis, | 		private redisForSub: Redis.Redis, | ||||||
|  |  | ||||||
| 		@Inject(DI.db) | 		@Inject(DI.db) | ||||||
| 		private db: DataSource, | 		private db: DataSource, | ||||||
| @@ -33,7 +33,7 @@ export class MetaService implements OnApplicationShutdown { | |||||||
| 			}, 1000 * 60 * 5); | 			}, 1000 * 60 * 5); | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		this.redisForPubsub.on('message', this.onMessage); | 		this.redisForSub.on('message', this.onMessage); | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	@bindThis | 	@bindThis | ||||||
| @@ -122,6 +122,6 @@ export class MetaService implements OnApplicationShutdown { | |||||||
| 	@bindThis | 	@bindThis | ||||||
| 	public onApplicationShutdown(signal?: string | undefined) { | 	public onApplicationShutdown(signal?: string | undefined) { | ||||||
| 		clearInterval(this.intervalId); | 		clearInterval(this.intervalId); | ||||||
| 		this.redisForPubsub.off('message', this.onMessage); | 		this.redisForSub.off('message', this.onMessage); | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|   | |||||||
| @@ -64,8 +64,8 @@ export class RoleService implements OnApplicationShutdown { | |||||||
| 	public static NotAssignedError = class extends Error {}; | 	public static NotAssignedError = class extends Error {}; | ||||||
|  |  | ||||||
| 	constructor( | 	constructor( | ||||||
| 		@Inject(DI.redisForPubsub) | 		@Inject(DI.redisForSub) | ||||||
| 		private redisForPubsub: Redis.Redis, | 		private redisForSub: Redis.Redis, | ||||||
|  |  | ||||||
| 		@Inject(DI.usersRepository) | 		@Inject(DI.usersRepository) | ||||||
| 		private usersRepository: UsersRepository, | 		private usersRepository: UsersRepository, | ||||||
| @@ -87,7 +87,7 @@ export class RoleService implements OnApplicationShutdown { | |||||||
| 		this.rolesCache = new MemorySingleCache<Role[]>(1000 * 60 * 60 * 1); | 		this.rolesCache = new MemorySingleCache<Role[]>(1000 * 60 * 60 * 1); | ||||||
| 		this.roleAssignmentByUserIdCache = new MemoryKVCache<RoleAssignment[]>(1000 * 60 * 60 * 1); | 		this.roleAssignmentByUserIdCache = new MemoryKVCache<RoleAssignment[]>(1000 * 60 * 60 * 1); | ||||||
|  |  | ||||||
| 		this.redisForPubsub.on('message', this.onMessage); | 		this.redisForSub.on('message', this.onMessage); | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	@bindThis | 	@bindThis | ||||||
| @@ -400,6 +400,6 @@ export class RoleService implements OnApplicationShutdown { | |||||||
|  |  | ||||||
| 	@bindThis | 	@bindThis | ||||||
| 	public onApplicationShutdown(signal?: string | undefined) { | 	public onApplicationShutdown(signal?: string | undefined) { | ||||||
| 		this.redisForPubsub.off('message', this.onMessage); | 		this.redisForSub.off('message', this.onMessage); | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|   | |||||||
| @@ -13,14 +13,14 @@ export class WebhookService implements OnApplicationShutdown { | |||||||
| 	private webhooks: Webhook[] = []; | 	private webhooks: Webhook[] = []; | ||||||
|  |  | ||||||
| 	constructor( | 	constructor( | ||||||
| 		@Inject(DI.redisForPubsub) | 		@Inject(DI.redisForSub) | ||||||
| 		private redisForPubsub: Redis.Redis, | 		private redisForSub: Redis.Redis, | ||||||
|  |  | ||||||
| 		@Inject(DI.webhooksRepository) | 		@Inject(DI.webhooksRepository) | ||||||
| 		private webhooksRepository: WebhooksRepository, | 		private webhooksRepository: WebhooksRepository, | ||||||
| 	) { | 	) { | ||||||
| 		//this.onMessage = this.onMessage.bind(this); | 		//this.onMessage = this.onMessage.bind(this); | ||||||
| 		this.redisForPubsub.on('message', this.onMessage); | 		this.redisForSub.on('message', this.onMessage); | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	@bindThis | 	@bindThis | ||||||
| @@ -82,6 +82,6 @@ export class WebhookService implements OnApplicationShutdown { | |||||||
|  |  | ||||||
| 	@bindThis | 	@bindThis | ||||||
| 	public onApplicationShutdown(signal?: string | undefined) { | 	public onApplicationShutdown(signal?: string | undefined) { | ||||||
| 		this.redisForPubsub.off('message', this.onMessage); | 		this.redisForSub.off('message', this.onMessage); | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|   | |||||||
| @@ -2,7 +2,8 @@ export const DI = { | |||||||
| 	config: Symbol('config'), | 	config: Symbol('config'), | ||||||
| 	db: Symbol('db'), | 	db: Symbol('db'), | ||||||
| 	redis: Symbol('redis'), | 	redis: Symbol('redis'), | ||||||
| 	redisForPubsub: Symbol('redisForPubsub'), | 	redisForPub: Symbol('redisForPub'), | ||||||
|  | 	redisForSub: Symbol('redisForSub'), | ||||||
|  |  | ||||||
| 	//#region Repositories | 	//#region Repositories | ||||||
| 	usersRepository: Symbol('usersRepository'), | 	usersRepository: Symbol('usersRepository'), | ||||||
|   | |||||||
| @@ -22,8 +22,8 @@ export class StreamingApiServerService { | |||||||
| 		@Inject(DI.config) | 		@Inject(DI.config) | ||||||
| 		private config: Config, | 		private config: Config, | ||||||
|  |  | ||||||
| 		@Inject(DI.redisForPubsub) | 		@Inject(DI.redisForSub) | ||||||
| 		private redisForPubsub: Redis.Redis, | 		private redisForSub: Redis.Redis, | ||||||
|  |  | ||||||
| 		@Inject(DI.usersRepository) | 		@Inject(DI.usersRepository) | ||||||
| 		private usersRepository: UsersRepository, | 		private usersRepository: UsersRepository, | ||||||
| @@ -81,7 +81,7 @@ export class StreamingApiServerService { | |||||||
| 				ev.emit(parsed.channel, parsed.message); | 				ev.emit(parsed.channel, parsed.message); | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 			this.redisForPubsub.on('message', onRedisMessage); | 			this.redisForSub.on('message', onRedisMessage); | ||||||
|  |  | ||||||
| 			const main = new MainStreamConnection( | 			const main = new MainStreamConnection( | ||||||
| 				this.channelsService, | 				this.channelsService, | ||||||
| @@ -111,7 +111,7 @@ export class StreamingApiServerService { | |||||||
| 			connection.once('close', () => { | 			connection.once('close', () => { | ||||||
| 				ev.removeAllListeners(); | 				ev.removeAllListeners(); | ||||||
| 				main.dispose(); | 				main.dispose(); | ||||||
| 				this.redisForPubsub.off('message', onRedisMessage); | 				this.redisForSub.off('message', onRedisMessage); | ||||||
| 				if (intervalId) clearInterval(intervalId); | 				if (intervalId) clearInterval(intervalId); | ||||||
| 			}); | 			}); | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 syuilo
					syuilo