Introduce publishers directory

This commit is contained in:
Akihiko Odaki
2018-04-02 13:33:46 +09:00
parent 0a48ef59cd
commit ad38cd2605
27 changed files with 48 additions and 52 deletions

50
src/publishers/notify.ts Normal file
View File

@@ -0,0 +1,50 @@
import * as mongo from 'mongodb';
import Notification from '../models/notification';
import Mute from '../models/mute';
import { pack } from '../models/notification';
import stream from './stream';
export default (
notifiee: mongo.ObjectID,
notifier: mongo.ObjectID,
type: string,
content?: any
) => new Promise<any>(async (resolve, reject) => {
if (notifiee.equals(notifier)) {
return resolve();
}
// Create notification
const notification = await Notification.insert(Object.assign({
createdAt: new Date(),
notifieeId: notifiee,
notifierId: notifier,
type: type,
isRead: false
}, content));
resolve(notification);
// Publish notification event
stream(notifiee, 'notification',
await pack(notification));
// 3秒経っても(今回作成した)通知が既読にならなかったら「未読の通知がありますよ」イベントを発行する
setTimeout(async () => {
const fresh = await Notification.findOne({ _id: notification._id }, { isRead: true });
if (!fresh.isRead) {
//#region ただしミュートしているユーザーからの通知なら無視
const mute = await Mute.find({
muterId: notifiee,
deletedAt: { $exists: false }
});
const mutedUserIds = mute.map(m => m.muteeId.toString());
if (mutedUserIds.indexOf(notifier.toString()) != -1) {
return;
}
//#endregion
stream(notifiee, 'unread_notification', await pack(notification));
}
}, 3000);
});

52
src/publishers/push-sw.ts Normal file
View File

@@ -0,0 +1,52 @@
const push = require('web-push');
import * as mongo from 'mongodb';
import Subscription from '../models/sw-subscription';
import config from '../config';
if (config.sw) {
// アプリケーションの連絡先と、サーバーサイドの鍵ペアの情報を登録
push.setVapidDetails(
config.maintainer.url,
config.sw.public_key,
config.sw.private_key);
}
export default async function(userId: mongo.ObjectID | string, type, body?) {
if (!config.sw) return;
if (typeof userId === 'string') {
userId = new mongo.ObjectID(userId);
}
// Fetch
const subscriptions = await Subscription.find({
userId: userId
});
subscriptions.forEach(subscription => {
const pushSubscription = {
endpoint: subscription.endpoint,
keys: {
auth: subscription.auth,
p256dh: subscription.publickey
}
};
push.sendNotification(pushSubscription, JSON.stringify({
type, body
})).catch(err => {
//console.log(err.statusCode);
//console.log(err.headers);
//console.log(err.body);
if (err.statusCode == 410) {
Subscription.remove({
userId: userId,
endpoint: subscription.endpoint,
auth: subscription.auth,
publickey: subscription.publickey
});
}
});
});
}

73
src/publishers/stream.ts Normal file
View File

@@ -0,0 +1,73 @@
import * as mongo from 'mongodb';
import * as redis from 'redis';
import config from '../config';
type ID = string | mongo.ObjectID;
class MisskeyEvent {
private redisClient: redis.RedisClient;
constructor() {
// Connect to Redis
this.redisClient = redis.createClient(
config.redis.port, config.redis.host);
}
public publishUserStream(userId: ID, type: string, value?: any): void {
this.publish(`user-stream:${userId}`, type, typeof value === 'undefined' ? null : value);
}
public publishDriveStream(userId: ID, type: string, value?: any): void {
this.publish(`drive-stream:${userId}`, type, typeof value === 'undefined' ? null : value);
}
public publishPostStream(postId: ID, type: string, value?: any): void {
this.publish(`post-stream:${postId}`, type, typeof value === 'undefined' ? null : value);
}
public publishMessagingStream(userId: ID, otherpartyId: ID, type: string, value?: any): void {
this.publish(`messaging-stream:${userId}-${otherpartyId}`, type, typeof value === 'undefined' ? null : value);
}
public publishMessagingIndexStream(userId: ID, type: string, value?: any): void {
this.publish(`messaging-index-stream:${userId}`, type, typeof value === 'undefined' ? null : value);
}
public publishOthelloStream(userId: ID, type: string, value?: any): void {
this.publish(`othello-stream:${userId}`, type, typeof value === 'undefined' ? null : value);
}
public publishOthelloGameStream(gameId: ID, type: string, value?: any): void {
this.publish(`othello-game-stream:${gameId}`, type, typeof value === 'undefined' ? null : value);
}
public publishChannelStream(channelId: ID, type: string, value?: any): void {
this.publish(`channel-stream:${channelId}`, type, typeof value === 'undefined' ? null : value);
}
private publish(channel: string, type: string, value?: any): void {
const message = value == null ?
{ type: type } :
{ type: type, body: value };
this.redisClient.publish(`misskey:${channel}`, JSON.stringify(message));
}
}
const ev = new MisskeyEvent();
export default ev.publishUserStream.bind(ev);
export const publishDriveStream = ev.publishDriveStream.bind(ev);
export const publishPostStream = ev.publishPostStream.bind(ev);
export const publishMessagingStream = ev.publishMessagingStream.bind(ev);
export const publishMessagingIndexStream = ev.publishMessagingIndexStream.bind(ev);
export const publishOthelloStream = ev.publishOthelloStream.bind(ev);
export const publishOthelloGameStream = ev.publishOthelloGameStream.bind(ev);
export const publishChannelStream = ev.publishChannelStream.bind(ev);