Merge pull request #1394 from akihikodaki/duplicate
Retry HTTP requests
This commit is contained in:
		
							
								
								
									
										42
									
								
								src/following/distribute.ts
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										42
									
								
								src/following/distribute.ts
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,42 @@ | |||||||
|  | import User, { pack as packUser } from '../models/user'; | ||||||
|  | import FollowingLog from '../models/following-log'; | ||||||
|  | import FollowedLog from '../models/followed-log'; | ||||||
|  | import event from '../publishers/stream'; | ||||||
|  | import notify from '../publishers/notify'; | ||||||
|  |  | ||||||
|  | export default async (follower, followee) => Promise.all([ | ||||||
|  | 	// Increment following count | ||||||
|  | 	User.update(follower._id, { | ||||||
|  | 		$inc: { | ||||||
|  | 			followingCount: 1 | ||||||
|  | 		} | ||||||
|  | 	}), | ||||||
|  |  | ||||||
|  | 	FollowingLog.insert({ | ||||||
|  | 		createdAt: new Date(), | ||||||
|  | 		userId: followee._id, | ||||||
|  | 		count: follower.followingCount + 1 | ||||||
|  | 	}), | ||||||
|  |  | ||||||
|  | 	// Increment followers count | ||||||
|  | 	User.update({ _id: followee._id }, { | ||||||
|  | 		$inc: { | ||||||
|  | 			followersCount: 1 | ||||||
|  | 		} | ||||||
|  | 	}), | ||||||
|  |  | ||||||
|  | 	FollowedLog.insert({ | ||||||
|  | 		createdAt: new Date(), | ||||||
|  | 		userId: follower._id, | ||||||
|  | 		count: followee.followersCount + 1 | ||||||
|  | 	}), | ||||||
|  |  | ||||||
|  | 	followee.host === null && Promise.all([ | ||||||
|  | 		// Notify | ||||||
|  | 		notify(followee.id, follower.id, 'follow'), | ||||||
|  |  | ||||||
|  | 		// Publish follow event | ||||||
|  | 		packUser(follower, followee) | ||||||
|  | 			.then(packed => event(followee._id, 'followed', packed)) | ||||||
|  | 	]) | ||||||
|  | ]); | ||||||
| @@ -99,7 +99,7 @@ async function workerMain(opt) { | |||||||
|  |  | ||||||
| 	if (!opt['only-server']) { | 	if (!opt['only-server']) { | ||||||
| 		// start processor | 		// start processor | ||||||
| 		require('./processor').default(); | 		require('./queue').process(); | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Send a 'ready' message to parent process | 	// Send a 'ready' message to parent process | ||||||
|   | |||||||
| @@ -1,11 +1,14 @@ | |||||||
|  | import Channel from '../models/channel'; | ||||||
|  | import ChannelWatching from '../models/channel-watching'; | ||||||
|  | import Following from '../models/following'; | ||||||
| import Mute from '../models/mute'; | import Mute from '../models/mute'; | ||||||
| import Post, { pack } from '../models/post'; | import Post, { pack } from '../models/post'; | ||||||
| import Watching from '../models/post-watching'; | import Watching from '../models/post-watching'; | ||||||
| import User from '../models/user'; | import User, { isLocalUser } from '../models/user'; | ||||||
| import stream from '../publishers/stream'; | import stream, { publishChannelStream } from '../publishers/stream'; | ||||||
| import notify from '../publishers/notify'; | import notify from '../publishers/notify'; | ||||||
| import pushSw from '../publishers/push-sw'; | import pushSw from '../publishers/push-sw'; | ||||||
| import queue from '../queue'; | import { createHttp } from '../queue'; | ||||||
| import watch from './watch'; | import watch from './watch'; | ||||||
|  |  | ||||||
| export default async (user, mentions, post) => { | export default async (user, mentions, post) => { | ||||||
| @@ -21,10 +24,6 @@ export default async (user, mentions, post) => { | |||||||
| 				latestPost: post._id | 				latestPost: post._id | ||||||
| 			} | 			} | ||||||
| 		}), | 		}), | ||||||
| 		new Promise((resolve, reject) => queue.create('http', { |  | ||||||
| 			type: 'deliverPost', |  | ||||||
| 			id: post._id, |  | ||||||
| 		}).save(error => error ? reject(error) : resolve())), |  | ||||||
| 	] as Array<Promise<any>>; | 	] as Array<Promise<any>>; | ||||||
|  |  | ||||||
| 	function addMention(promisedMentionee, reason) { | 	function addMention(promisedMentionee, reason) { | ||||||
| @@ -50,6 +49,91 @@ export default async (user, mentions, post) => { | |||||||
| 		})); | 		})); | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	// タイムラインへの投稿 | ||||||
|  | 	if (!post.channelId) { | ||||||
|  | 		promises.push( | ||||||
|  | 			// Publish event to myself's stream | ||||||
|  | 			promisedPostObj.then(postObj => { | ||||||
|  | 				stream(post.userId, 'post', postObj); | ||||||
|  | 			}), | ||||||
|  |  | ||||||
|  | 			Promise.all([ | ||||||
|  | 				User.findOne({ _id: post.userId }), | ||||||
|  |  | ||||||
|  | 				// Fetch all followers | ||||||
|  | 				Following.aggregate([{ | ||||||
|  | 					$lookup: { | ||||||
|  | 						from: 'users', | ||||||
|  | 						localField: 'followerId', | ||||||
|  | 						foreignField: '_id', | ||||||
|  | 						as: 'follower' | ||||||
|  | 					} | ||||||
|  | 				}, { | ||||||
|  | 					$match: { | ||||||
|  | 						followeeId: post.userId | ||||||
|  | 					} | ||||||
|  | 				}], { | ||||||
|  | 					_id: false | ||||||
|  | 				}) | ||||||
|  | 			]).then(([user, followers]) => Promise.all(followers.map(following => { | ||||||
|  | 				if (isLocalUser(following.follower)) { | ||||||
|  | 					// Publish event to followers stream | ||||||
|  | 					return promisedPostObj.then(postObj => { | ||||||
|  | 						stream(following.followerId, 'post', postObj); | ||||||
|  | 					}); | ||||||
|  | 				} | ||||||
|  |  | ||||||
|  | 				return new Promise((resolve, reject) => { | ||||||
|  | 					createHttp({ | ||||||
|  | 						type: 'deliverPost', | ||||||
|  | 						fromId: user._id, | ||||||
|  | 						toId: following.followerId, | ||||||
|  | 						postId: post._id | ||||||
|  | 					}).save(error => { | ||||||
|  | 						if (error) { | ||||||
|  | 							reject(error); | ||||||
|  | 						} else { | ||||||
|  | 							resolve(); | ||||||
|  | 						} | ||||||
|  | 					}); | ||||||
|  | 				}); | ||||||
|  | 			}))) | ||||||
|  | 		); | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// チャンネルへの投稿 | ||||||
|  | 	if (post.channelId) { | ||||||
|  | 		promises.push( | ||||||
|  | 			// Increment channel index(posts count) | ||||||
|  | 			Channel.update({ _id: post.channelId }, { | ||||||
|  | 				$inc: { | ||||||
|  | 					index: 1 | ||||||
|  | 				} | ||||||
|  | 			}), | ||||||
|  |  | ||||||
|  | 			// Publish event to channel | ||||||
|  | 			promisedPostObj.then(postObj => { | ||||||
|  | 				publishChannelStream(post.channelId, 'post', postObj); | ||||||
|  | 			}), | ||||||
|  |  | ||||||
|  | 			Promise.all([ | ||||||
|  | 				promisedPostObj, | ||||||
|  |  | ||||||
|  | 				// Get channel watchers | ||||||
|  | 				ChannelWatching.find({ | ||||||
|  | 					channelId: post.channelId, | ||||||
|  | 					// 削除されたドキュメントは除く | ||||||
|  | 					deletedAt: { $exists: false } | ||||||
|  | 				}) | ||||||
|  | 			]).then(([postObj, watches]) => { | ||||||
|  | 				// チャンネルの視聴者(のタイムライン)に配信 | ||||||
|  | 				watches.forEach(w => { | ||||||
|  | 					stream(w.userId, 'post', postObj); | ||||||
|  | 				}); | ||||||
|  | 			}) | ||||||
|  | 		); | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	// If has in reply to post | 	// If has in reply to post | ||||||
| 	if (post.replyId) { | 	if (post.replyId) { | ||||||
| 		promises.push( | 		promises.push( | ||||||
|   | |||||||
| @@ -1,93 +0,0 @@ | |||||||
| import Channel from '../../models/channel'; |  | ||||||
| import Following from '../../models/following'; |  | ||||||
| import ChannelWatching from '../../models/channel-watching'; |  | ||||||
| import Post, { pack } from '../../models/post'; |  | ||||||
| import User, { isLocalUser } from '../../models/user'; |  | ||||||
| import stream, { publishChannelStream } from '../../publishers/stream'; |  | ||||||
| import context from '../../remote/activitypub/renderer/context'; |  | ||||||
| import renderCreate from '../../remote/activitypub/renderer/create'; |  | ||||||
| import renderNote from '../../remote/activitypub/renderer/note'; |  | ||||||
| import request from '../../remote/request'; |  | ||||||
|  |  | ||||||
| export default ({ data }) => Post.findOne({ _id: data.id }).then(post => { |  | ||||||
| 	const promisedPostObj = pack(post); |  | ||||||
| 	const promises = []; |  | ||||||
|  |  | ||||||
| 	// タイムラインへの投稿 |  | ||||||
| 	if (!post.channelId) { |  | ||||||
| 		promises.push( |  | ||||||
| 			// Publish event to myself's stream |  | ||||||
| 			promisedPostObj.then(postObj => { |  | ||||||
| 				stream(post.userId, 'post', postObj); |  | ||||||
| 			}), |  | ||||||
|  |  | ||||||
| 			Promise.all([ |  | ||||||
| 				User.findOne({ _id: post.userId }), |  | ||||||
|  |  | ||||||
| 				// Fetch all followers |  | ||||||
| 				Following.aggregate([{ |  | ||||||
| 					$lookup: { |  | ||||||
| 						from: 'users', |  | ||||||
| 						localField: 'followerId', |  | ||||||
| 						foreignField: '_id', |  | ||||||
| 						as: 'follower' |  | ||||||
| 					} |  | ||||||
| 				}, { |  | ||||||
| 					$match: { |  | ||||||
| 						followeeId: post.userId |  | ||||||
| 					} |  | ||||||
| 				}], { |  | ||||||
| 					_id: false |  | ||||||
| 				}) |  | ||||||
| 			]).then(([user, followers]) => Promise.all(followers.map(following => { |  | ||||||
| 				if (isLocalUser(following.follower)) { |  | ||||||
| 					// Publish event to followers stream |  | ||||||
| 					return promisedPostObj.then(postObj => { |  | ||||||
| 						stream(following.followerId, 'post', postObj); |  | ||||||
| 					}); |  | ||||||
| 				} |  | ||||||
|  |  | ||||||
| 				return renderNote(user, post).then(note => { |  | ||||||
| 					const create = renderCreate(note); |  | ||||||
| 					create['@context'] = context; |  | ||||||
| 					return request(user, following.follower[0].account.inbox, create); |  | ||||||
| 				}); |  | ||||||
| 			}))) |  | ||||||
| 		); |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// チャンネルへの投稿 |  | ||||||
| 	if (post.channelId) { |  | ||||||
| 		promises.push( |  | ||||||
| 			// Increment channel index(posts count) |  | ||||||
| 			Channel.update({ _id: post.channelId }, { |  | ||||||
| 				$inc: { |  | ||||||
| 					index: 1 |  | ||||||
| 				} |  | ||||||
| 			}), |  | ||||||
|  |  | ||||||
| 			// Publish event to channel |  | ||||||
| 			promisedPostObj.then(postObj => { |  | ||||||
| 				publishChannelStream(post.channelId, 'post', postObj); |  | ||||||
| 			}), |  | ||||||
|  |  | ||||||
| 			Promise.all([ |  | ||||||
| 				promisedPostObj, |  | ||||||
|  |  | ||||||
| 				// Get channel watchers |  | ||||||
| 				ChannelWatching.find({ |  | ||||||
| 					channelId: post.channelId, |  | ||||||
| 					// 削除されたドキュメントは除く |  | ||||||
| 					deletedAt: { $exists: false } |  | ||||||
| 				}) |  | ||||||
| 			]).then(([postObj, watches]) => { |  | ||||||
| 				// チャンネルの視聴者(のタイムライン)に配信 |  | ||||||
| 				watches.forEach(w => { |  | ||||||
| 					stream(w.userId, 'post', postObj); |  | ||||||
| 				}); |  | ||||||
| 			}) |  | ||||||
| 		); |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	return Promise.all(promises); |  | ||||||
| }); |  | ||||||
| @@ -1,69 +0,0 @@ | |||||||
| import User, { isLocalUser, pack as packUser } from '../../models/user'; |  | ||||||
| import Following from '../../models/following'; |  | ||||||
| import FollowingLog from '../../models/following-log'; |  | ||||||
| import FollowedLog from '../../models/followed-log'; |  | ||||||
| import event from '../../publishers/stream'; |  | ||||||
| import notify from '../../publishers/notify'; |  | ||||||
| import context from '../../remote/activitypub/renderer/context'; |  | ||||||
| import render from '../../remote/activitypub/renderer/follow'; |  | ||||||
| import request from '../../remote/request'; |  | ||||||
|  |  | ||||||
| export default ({ data }) => Following.findOne({ _id: data.following }).then(({ followerId, followeeId }) => { |  | ||||||
| 	const promisedFollower = User.findOne({ _id: followerId }); |  | ||||||
| 	const promisedFollowee = User.findOne({ _id: followeeId }); |  | ||||||
|  |  | ||||||
| 	return Promise.all([ |  | ||||||
| 		// Increment following count |  | ||||||
| 		User.update(followerId, { |  | ||||||
| 			$inc: { |  | ||||||
| 				followingCount: 1 |  | ||||||
| 			} |  | ||||||
| 		}), |  | ||||||
|  |  | ||||||
| 		promisedFollower.then(({ followingCount }) => FollowingLog.insert({ |  | ||||||
| 			createdAt: data.following.createdAt, |  | ||||||
| 			userId: followerId, |  | ||||||
| 			count: followingCount + 1 |  | ||||||
| 		})), |  | ||||||
|  |  | ||||||
| 		// Increment followers count |  | ||||||
| 		User.update({ _id: followeeId }, { |  | ||||||
| 			$inc: { |  | ||||||
| 				followersCount: 1 |  | ||||||
| 			} |  | ||||||
| 		}), |  | ||||||
|  |  | ||||||
| 		promisedFollowee.then(({ followersCount }) => FollowedLog.insert({ |  | ||||||
| 			createdAt: data.following.createdAt, |  | ||||||
| 			userId: followerId, |  | ||||||
| 			count: followersCount + 1 |  | ||||||
| 		})), |  | ||||||
|  |  | ||||||
| 		// Notify |  | ||||||
| 		promisedFollowee.then(followee => followee.host === null ? |  | ||||||
| 			notify(followeeId, followerId, 'follow') : null), |  | ||||||
|  |  | ||||||
| 		// Publish follow event |  | ||||||
| 		Promise.all([promisedFollower, promisedFollowee]).then(([follower, followee]) => { |  | ||||||
| 			let followerEvent; |  | ||||||
| 			let followeeEvent; |  | ||||||
|  |  | ||||||
| 			if (isLocalUser(follower)) { |  | ||||||
| 				followerEvent = packUser(followee, follower) |  | ||||||
| 					.then(packed => event(follower._id, 'follow', packed)); |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			if (isLocalUser(followee)) { |  | ||||||
| 				followeeEvent = packUser(follower, followee) |  | ||||||
| 					.then(packed => event(followee._id, 'followed', packed)); |  | ||||||
| 			} else if (isLocalUser(follower)) { |  | ||||||
| 				const rendered = render(follower, followee); |  | ||||||
| 				rendered['@context'] = context; |  | ||||||
|  |  | ||||||
| 				followeeEvent = request(follower, followee.account.inbox, rendered); |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			return Promise.all([followerEvent, followeeEvent]); |  | ||||||
| 		}) |  | ||||||
| 	]); |  | ||||||
| }); |  | ||||||
| @@ -1,7 +0,0 @@ | |||||||
| import User from '../../models/user'; |  | ||||||
| import act from '../../remote/activitypub/act'; |  | ||||||
| import Resolver from '../../remote/activitypub/resolver'; |  | ||||||
|  |  | ||||||
| export default ({ data }) => User.findOne({ _id: data.actor }) |  | ||||||
| 	.then(actor => act(new Resolver(), actor, data.outbox)) |  | ||||||
| 	.then(Promise.all); |  | ||||||
| @@ -1,56 +0,0 @@ | |||||||
| import FollowedLog from '../../models/followed-log'; |  | ||||||
| import Following from '../../models/following'; |  | ||||||
| import FollowingLog from '../../models/following-log'; |  | ||||||
| import User, { isRemoteUser, pack as packUser } from '../../models/user'; |  | ||||||
| import stream from '../../publishers/stream'; |  | ||||||
| import renderFollow from '../../remote/activitypub/renderer/follow'; |  | ||||||
| import renderUndo from '../../remote/activitypub/renderer/undo'; |  | ||||||
| import context from '../../remote/activitypub/renderer/context'; |  | ||||||
| import request from '../../remote/request'; |  | ||||||
|  |  | ||||||
| export default async ({ data }) => { |  | ||||||
| 	// Delete following |  | ||||||
| 	const following = await Following.findOneAndDelete({ _id: data.id }); |  | ||||||
| 	if (following === null) { |  | ||||||
| 		return; |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	const promisedFollower = User.findOne({ _id: following.followerId }); |  | ||||||
| 	const promisedFollowee = User.findOne({ _id: following.followeeId }); |  | ||||||
|  |  | ||||||
| 	await Promise.all([ |  | ||||||
| 		// Decrement following count |  | ||||||
| 		User.update({ _id: following.followerId }, { $inc: { followingCount: -1 } }), |  | ||||||
| 		promisedFollower.then(({ followingCount }) => FollowingLog.insert({ |  | ||||||
| 			createdAt: new Date(), |  | ||||||
| 			userId: following.followerId, |  | ||||||
| 			count: followingCount - 1 |  | ||||||
| 		})), |  | ||||||
|  |  | ||||||
| 		// Decrement followers count |  | ||||||
| 		User.update({ _id: following.followeeId }, { $inc: { followersCount: -1 } }), |  | ||||||
| 		promisedFollowee.then(({ followersCount }) => FollowedLog.insert({ |  | ||||||
| 			createdAt: new Date(), |  | ||||||
| 			userId: following.followeeId, |  | ||||||
| 			count: followersCount - 1 |  | ||||||
| 		})), |  | ||||||
|  |  | ||||||
| 		// Publish follow event |  | ||||||
| 		Promise.all([promisedFollower, promisedFollowee]).then(async ([follower, followee]) => { |  | ||||||
| 			if (isRemoteUser(follower)) { |  | ||||||
| 				return; |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			const promisedPackedUser = packUser(followee, follower); |  | ||||||
|  |  | ||||||
| 			if (isRemoteUser(followee)) { |  | ||||||
| 				const undo = renderUndo(renderFollow(follower, followee)); |  | ||||||
| 				undo['@context'] = context; |  | ||||||
|  |  | ||||||
| 				await request(follower, followee.account.inbox, undo); |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			stream(follower._id, 'unfollow', promisedPackedUser); |  | ||||||
| 		}) |  | ||||||
| 	]); |  | ||||||
| }; |  | ||||||
| @@ -1,18 +0,0 @@ | |||||||
| import queue from '../queue'; |  | ||||||
| import db from './db'; |  | ||||||
| import http from './http'; |  | ||||||
|  |  | ||||||
| export default () => { |  | ||||||
| 	queue.process('db', db); |  | ||||||
|  |  | ||||||
| 	/* |  | ||||||
| 		256 is the default concurrency limit of Mozilla Firefox and Google |  | ||||||
| 		Chromium. |  | ||||||
|  |  | ||||||
| 		a8af215e691f3a2205a3758d2d96e9d328e100ff - chromium/src.git - Git at Google |  | ||||||
| 		https://chromium.googlesource.com/chromium/src.git/+/a8af215e691f3a2205a3758d2d96e9d328e100ff |  | ||||||
| 		Network.http.max-connections - MozillaZine Knowledge Base |  | ||||||
| 		http://kb.mozillazine.org/Network.http.max-connections |  | ||||||
| 	*/ |  | ||||||
| 	queue.process('http', 256, http); |  | ||||||
| }; |  | ||||||
							
								
								
									
										10
									
								
								src/queue.ts
									
									
									
									
									
								
							
							
						
						
									
										10
									
								
								src/queue.ts
									
									
									
									
									
								
							| @@ -1,10 +0,0 @@ | |||||||
| import { createQueue } from 'kue'; |  | ||||||
| import config from './config'; |  | ||||||
|  |  | ||||||
| export default createQueue({ |  | ||||||
| 	redis: { |  | ||||||
| 		port: config.redis.port, |  | ||||||
| 		host: config.redis.host, |  | ||||||
| 		auth: config.redis.pass |  | ||||||
| 	} |  | ||||||
| }); |  | ||||||
							
								
								
									
										38
									
								
								src/queue/index.ts
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										38
									
								
								src/queue/index.ts
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,38 @@ | |||||||
|  | import { createQueue } from 'kue'; | ||||||
|  | import config from '../config'; | ||||||
|  | import db from './processors/db'; | ||||||
|  | import http from './processors/http'; | ||||||
|  |  | ||||||
|  | const queue = createQueue({ | ||||||
|  | 	redis: { | ||||||
|  | 		port: config.redis.port, | ||||||
|  | 		host: config.redis.host, | ||||||
|  | 		auth: config.redis.pass | ||||||
|  | 	} | ||||||
|  | }); | ||||||
|  |  | ||||||
|  | export function createHttp(data) { | ||||||
|  | 	return queue | ||||||
|  | 		.create('http', data) | ||||||
|  | 		.attempts(16) | ||||||
|  | 		.backoff({ delay: 16384, type: 'exponential' }); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | export function createDb(data) { | ||||||
|  | 	return queue.create('db', data); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | export function process() { | ||||||
|  | 	queue.process('db', db); | ||||||
|  |  | ||||||
|  | 	/* | ||||||
|  | 		256 is the default concurrency limit of Mozilla Firefox and Google | ||||||
|  | 		Chromium. | ||||||
|  |  | ||||||
|  | 		a8af215e691f3a2205a3758d2d96e9d328e100ff - chromium/src.git - Git at Google | ||||||
|  | 		https://chromium.googlesource.com/chromium/src.git/+/a8af215e691f3a2205a3758d2d96e9d328e100ff | ||||||
|  | 		Network.http.max-connections - MozillaZine Knowledge Base | ||||||
|  | 		http://kb.mozillazine.org/Network.http.max-connections | ||||||
|  | 	*/ | ||||||
|  | 	queue.process('http', 256, http); | ||||||
|  | } | ||||||
| @@ -1,9 +1,9 @@ | |||||||
| import Favorite from '../../models/favorite'; | import Favorite from '../../../models/favorite'; | ||||||
| import Notification from '../../models/notification'; | import Notification from '../../../models/notification'; | ||||||
| import PollVote from '../../models/poll-vote'; | import PollVote from '../../../models/poll-vote'; | ||||||
| import PostReaction from '../../models/post-reaction'; | import PostReaction from '../../../models/post-reaction'; | ||||||
| import PostWatching from '../../models/post-watching'; | import PostWatching from '../../../models/post-watching'; | ||||||
| import Post from '../../models/post'; | import Post from '../../../models/post'; | ||||||
| 
 | 
 | ||||||
| export default async ({ data }) => Promise.all([ | export default async ({ data }) => Promise.all([ | ||||||
| 	Favorite.remove({ postId: data._id }), | 	Favorite.remove({ postId: data._id }), | ||||||
							
								
								
									
										21
									
								
								src/queue/processors/http/deliver-post.ts
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										21
									
								
								src/queue/processors/http/deliver-post.ts
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,21 @@ | |||||||
|  | import Post from '../../../models/post'; | ||||||
|  | import User, { IRemoteUser } from '../../../models/user'; | ||||||
|  | import context from '../../../remote/activitypub/renderer/context'; | ||||||
|  | import renderCreate from '../../../remote/activitypub/renderer/create'; | ||||||
|  | import renderNote from '../../../remote/activitypub/renderer/note'; | ||||||
|  | import request from '../../../remote/request'; | ||||||
|  |  | ||||||
|  | export default async ({ data }) => { | ||||||
|  | 	const promisedTo = User.findOne({ _id: data.toId }) as Promise<IRemoteUser>; | ||||||
|  | 	const [from, post] = await Promise.all([ | ||||||
|  | 		User.findOne({ _id: data.fromId }), | ||||||
|  | 		Post.findOne({ _id: data.postId }) | ||||||
|  | 	]); | ||||||
|  | 	const note = await renderNote(from, post); | ||||||
|  | 	const to = await promisedTo; | ||||||
|  | 	const create = renderCreate(note); | ||||||
|  |  | ||||||
|  | 	create['@context'] = context; | ||||||
|  |  | ||||||
|  | 	return request(from, to.account.inbox, create); | ||||||
|  | }; | ||||||
							
								
								
									
										69
									
								
								src/queue/processors/http/follow.ts
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										69
									
								
								src/queue/processors/http/follow.ts
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,69 @@ | |||||||
|  | import User, { isLocalUser, isRemoteUser, pack as packUser } from '../../../models/user'; | ||||||
|  | import Following from '../../../models/following'; | ||||||
|  | import FollowingLog from '../../../models/following-log'; | ||||||
|  | import FollowedLog from '../../../models/followed-log'; | ||||||
|  | import event from '../../../publishers/stream'; | ||||||
|  | import notify from '../../../publishers/notify'; | ||||||
|  | import context from '../../../remote/activitypub/renderer/context'; | ||||||
|  | import render from '../../../remote/activitypub/renderer/follow'; | ||||||
|  | import request from '../../../remote/request'; | ||||||
|  | import Logger from '../../../utils/logger'; | ||||||
|  |  | ||||||
|  | export default async ({ data }) => { | ||||||
|  | 	const { followerId, followeeId } = await Following.findOne({ _id: data.following }); | ||||||
|  | 	const [follower, followee] = await Promise.all([ | ||||||
|  | 		User.findOne({ _id: followerId }), | ||||||
|  | 		User.findOne({ _id: followeeId }) | ||||||
|  | 	]); | ||||||
|  |  | ||||||
|  | 	if (isLocalUser(follower) && isRemoteUser(followee)) { | ||||||
|  | 		const rendered = render(follower, followee); | ||||||
|  | 		rendered['@context'] = context; | ||||||
|  |  | ||||||
|  | 		await request(follower, followee.account.inbox, rendered); | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	try { | ||||||
|  | 		await Promise.all([ | ||||||
|  | 			// Increment following count | ||||||
|  | 			User.update(followerId, { | ||||||
|  | 				$inc: { | ||||||
|  | 					followingCount: 1 | ||||||
|  | 				} | ||||||
|  | 			}), | ||||||
|  |  | ||||||
|  | 			FollowingLog.insert({ | ||||||
|  | 				createdAt: data.following.createdAt, | ||||||
|  | 				userId: followerId, | ||||||
|  | 				count: follower.followingCount + 1 | ||||||
|  | 			}), | ||||||
|  |  | ||||||
|  | 			// Increment followers count | ||||||
|  | 			User.update({ _id: followeeId }, { | ||||||
|  | 				$inc: { | ||||||
|  | 					followersCount: 1 | ||||||
|  | 				} | ||||||
|  | 			}), | ||||||
|  |  | ||||||
|  | 			FollowedLog.insert({ | ||||||
|  | 				createdAt: data.following.createdAt, | ||||||
|  | 				userId: followerId, | ||||||
|  | 				count: followee.followersCount + 1 | ||||||
|  | 			}), | ||||||
|  |  | ||||||
|  | 			// Publish follow event | ||||||
|  | 			isLocalUser(follower) && packUser(followee, follower) | ||||||
|  | 				.then(packed => event(follower._id, 'follow', packed)), | ||||||
|  |  | ||||||
|  | 			isLocalUser(followee) && Promise.all([ | ||||||
|  | 				packUser(follower, followee) | ||||||
|  | 					.then(packed => event(followee._id, 'followed', packed)), | ||||||
|  |  | ||||||
|  | 				// Notify | ||||||
|  | 				isLocalUser(followee) && notify(followeeId, followerId, 'follow') | ||||||
|  | 			]) | ||||||
|  | 		]); | ||||||
|  | 	} catch (error) { | ||||||
|  | 		Logger.error(error.toString()); | ||||||
|  | 	} | ||||||
|  | }; | ||||||
							
								
								
									
										7
									
								
								src/queue/processors/http/perform-activitypub.ts
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										7
									
								
								src/queue/processors/http/perform-activitypub.ts
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,7 @@ | |||||||
|  | import User from '../../../models/user'; | ||||||
|  | import act from '../../../remote/activitypub/act'; | ||||||
|  | import Resolver from '../../../remote/activitypub/resolver'; | ||||||
|  |  | ||||||
|  | export default ({ data }) => User.findOne({ _id: data.actor }) | ||||||
|  | 	.then(actor => act(new Resolver(), actor, data.outbox)) | ||||||
|  | 	.then(Promise.all); | ||||||
| @@ -1,9 +1,9 @@ | |||||||
| import { verifySignature } from 'http-signature'; | import { verifySignature } from 'http-signature'; | ||||||
| import parseAcct from '../../acct/parse'; | import parseAcct from '../../../acct/parse'; | ||||||
| import User, { IRemoteUser } from '../../models/user'; | import User, { IRemoteUser } from '../../../models/user'; | ||||||
| import act from '../../remote/activitypub/act'; | import act from '../../../remote/activitypub/act'; | ||||||
| import resolvePerson from '../../remote/activitypub/resolve-person'; | import resolvePerson from '../../../remote/activitypub/resolve-person'; | ||||||
| import Resolver from '../../remote/activitypub/resolver'; | import Resolver from '../../../remote/activitypub/resolver'; | ||||||
| 
 | 
 | ||||||
| export default async ({ data }): Promise<void> => { | export default async ({ data }): Promise<void> => { | ||||||
| 	const keyIdLower = data.signature.keyId.toLowerCase(); | 	const keyIdLower = data.signature.keyId.toLowerCase(); | ||||||
| @@ -1,6 +1,6 @@ | |||||||
| import * as request from 'request-promise-native'; | import * as request from 'request-promise-native'; | ||||||
| import User from '../../models/user'; | import User from '../../../models/user'; | ||||||
| const createPost = require('../../server/api/endpoints/posts/create'); | const createPost = require('../../../server/api/endpoints/posts/create'); | ||||||
| 
 | 
 | ||||||
| export default async ({ data }) => { | export default async ({ data }) => { | ||||||
| 	const asyncBot = User.findOne({ _id: data.userId }); | 	const asyncBot = User.findOne({ _id: data.userId }); | ||||||
							
								
								
									
										63
									
								
								src/queue/processors/http/unfollow.ts
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										63
									
								
								src/queue/processors/http/unfollow.ts
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,63 @@ | |||||||
|  | import FollowedLog from '../../../models/followed-log'; | ||||||
|  | import Following from '../../../models/following'; | ||||||
|  | import FollowingLog from '../../../models/following-log'; | ||||||
|  | import User, { isLocalUser, isRemoteUser, pack as packUser } from '../../../models/user'; | ||||||
|  | import stream from '../../../publishers/stream'; | ||||||
|  | import renderFollow from '../../../remote/activitypub/renderer/follow'; | ||||||
|  | import renderUndo from '../../../remote/activitypub/renderer/undo'; | ||||||
|  | import context from '../../../remote/activitypub/renderer/context'; | ||||||
|  | import request from '../../../remote/request'; | ||||||
|  | import Logger from '../../../utils/logger'; | ||||||
|  |  | ||||||
|  | export default async ({ data }) => { | ||||||
|  | 	const following = await Following.findOne({ _id: data.id }); | ||||||
|  | 	if (following === null) { | ||||||
|  | 		return; | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	const [follower, followee] = await Promise.all([ | ||||||
|  | 		User.findOne({ _id: following.followerId }), | ||||||
|  | 		User.findOne({ _id: following.followeeId }) | ||||||
|  | 	]); | ||||||
|  |  | ||||||
|  | 	if (isLocalUser(follower) && isRemoteUser(followee)) { | ||||||
|  | 		const undo = renderUndo(renderFollow(follower, followee)); | ||||||
|  | 		undo['@context'] = context; | ||||||
|  |  | ||||||
|  | 		await request(follower, followee.account.inbox, undo); | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	try { | ||||||
|  | 		await Promise.all([ | ||||||
|  | 			// Delete following | ||||||
|  | 			Following.findOneAndDelete({ _id: data.id }), | ||||||
|  |  | ||||||
|  | 			// Decrement following count | ||||||
|  | 			User.update({ _id: follower._id }, { $inc: { followingCount: -1 } }), | ||||||
|  | 			FollowingLog.insert({ | ||||||
|  | 				createdAt: new Date(), | ||||||
|  | 				userId: follower._id, | ||||||
|  | 				count: follower.followingCount - 1 | ||||||
|  | 			}), | ||||||
|  |  | ||||||
|  | 			// Decrement followers count | ||||||
|  | 			User.update({ _id: followee._id }, { $inc: { followersCount: -1 } }), | ||||||
|  | 			FollowedLog.insert({ | ||||||
|  | 				createdAt: new Date(), | ||||||
|  | 				userId: followee._id, | ||||||
|  | 				count: followee.followersCount - 1 | ||||||
|  | 			}) | ||||||
|  | 		]); | ||||||
|  |  | ||||||
|  | 		if (isLocalUser(follower)) { | ||||||
|  | 			return; | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		const promisedPackedUser = packUser(followee, follower); | ||||||
|  |  | ||||||
|  | 		// Publish follow event | ||||||
|  | 		stream(follower._id, 'unfollow', promisedPackedUser); | ||||||
|  | 	} catch (error) { | ||||||
|  | 		Logger.error(error.toString()); | ||||||
|  | 	} | ||||||
|  | }; | ||||||
| @@ -3,7 +3,7 @@ import parseAcct from '../../../acct/parse'; | |||||||
| import Following, { IFollowing } from '../../../models/following'; | import Following, { IFollowing } from '../../../models/following'; | ||||||
| import User from '../../../models/user'; | import User from '../../../models/user'; | ||||||
| import config from '../../../config'; | import config from '../../../config'; | ||||||
| import queue from '../../../queue'; | import { createHttp } from '../../../queue'; | ||||||
| import context from '../renderer/context'; | import context from '../renderer/context'; | ||||||
| import renderAccept from '../renderer/accept'; | import renderAccept from '../renderer/accept'; | ||||||
| import request from '../../request'; | import request from '../../request'; | ||||||
| @@ -44,7 +44,7 @@ export default async (resolver: Resolver, actor, activity, distribute) => { | |||||||
| 		followerId: actor._id, | 		followerId: actor._id, | ||||||
| 		followeeId: followee._id | 		followeeId: followee._id | ||||||
| 	}).then(following => new Promise((resolve, reject) => { | 	}).then(following => new Promise((resolve, reject) => { | ||||||
| 		queue.create('http', { | 		createHttp({ | ||||||
| 			type: 'follow', | 			type: 'follow', | ||||||
| 			following: following._id | 			following: following._id | ||||||
| 		}).save(error => { | 		}).save(error => { | ||||||
|   | |||||||
| @@ -1,7 +1,7 @@ | |||||||
| import queue from '../../../../queue'; | import { createHttp } from '../../../../queue'; | ||||||
|  |  | ||||||
| export default ({ $id }) => new Promise((resolve, reject) => { | export default ({ $id }) => new Promise((resolve, reject) => { | ||||||
| 	queue.create('http', { type: 'unfollow', id: $id }).save(error => { | 	createHttp({ type: 'unfollow', id: $id }).save(error => { | ||||||
| 		if (error) { | 		if (error) { | ||||||
| 			reject(error); | 			reject(error); | ||||||
| 		} else { | 		} else { | ||||||
|   | |||||||
| @@ -1,10 +1,10 @@ | |||||||
| import Post from '../../../models/post'; | import Post from '../../../models/post'; | ||||||
| import queue from '../../../queue'; | import { createDb } from '../../../queue'; | ||||||
|  |  | ||||||
| export default async ({ $id }) => { | export default async ({ $id }) => { | ||||||
| 	const promisedDeletion = Post.findOneAndDelete({ _id: $id }); | 	const promisedDeletion = Post.findOneAndDelete({ _id: $id }); | ||||||
|  |  | ||||||
| 	await new Promise((resolve, reject) => queue.create('db', { | 	await new Promise((resolve, reject) => createDb({ | ||||||
| 		type: 'deletePostDependents', | 		type: 'deletePostDependents', | ||||||
| 		id: $id | 		id: $id | ||||||
| 	}).delay(65536).save(error => error ? reject(error) : resolve())); | 	}).delay(65536).save(error => error ? reject(error) : resolve())); | ||||||
|   | |||||||
| @@ -1,7 +1,7 @@ | |||||||
| import { JSDOM } from 'jsdom'; | import { JSDOM } from 'jsdom'; | ||||||
| import { toUnicode } from 'punycode'; | import { toUnicode } from 'punycode'; | ||||||
| import User, { validateUsername, isValidName, isValidDescription } from '../../models/user'; | import User, { validateUsername, isValidName, isValidDescription } from '../../models/user'; | ||||||
| import queue from '../../queue'; | import { createHttp } from '../../queue'; | ||||||
| import webFinger from '../webfinger'; | import webFinger from '../webfinger'; | ||||||
| import create from './create'; | import create from './create'; | ||||||
| import Resolver from './resolver'; | import Resolver from './resolver'; | ||||||
| @@ -69,7 +69,7 @@ export default async (value, verifier?: string) => { | |||||||
| 		}, | 		}, | ||||||
| 	}); | 	}); | ||||||
|  |  | ||||||
| 	queue.create('http', { | 	createHttp({ | ||||||
| 		type: 'performActivityPub', | 		type: 'performActivityPub', | ||||||
| 		actor: user._id, | 		actor: user._id, | ||||||
| 		outbox | 		outbox | ||||||
|   | |||||||
| @@ -1,7 +1,7 @@ | |||||||
| import * as bodyParser from 'body-parser'; | import * as bodyParser from 'body-parser'; | ||||||
| import * as express from 'express'; | import * as express from 'express'; | ||||||
| import { parseRequest } from 'http-signature'; | import { parseRequest } from 'http-signature'; | ||||||
| import queue from '../../queue'; | import { createHttp } from '../../queue'; | ||||||
|  |  | ||||||
| const app = express(); | const app = express(); | ||||||
|  |  | ||||||
| @@ -22,7 +22,7 @@ app.post('/@:user/inbox', bodyParser.json({ | |||||||
| 		return res.sendStatus(401); | 		return res.sendStatus(401); | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	queue.create('http', { | 	createHttp({ | ||||||
| 		type: 'processInbox', | 		type: 'processInbox', | ||||||
| 		inbox: req.body, | 		inbox: req.body, | ||||||
| 		signature, | 		signature, | ||||||
|   | |||||||
| @@ -4,7 +4,7 @@ | |||||||
| import $ from 'cafy'; | import $ from 'cafy'; | ||||||
| import User from '../../../../models/user'; | import User from '../../../../models/user'; | ||||||
| import Following from '../../../../models/following'; | import Following from '../../../../models/following'; | ||||||
| import queue from '../../../../queue'; | import { createHttp } from '../../../../queue'; | ||||||
|  |  | ||||||
| /** | /** | ||||||
|  * Follow a user |  * Follow a user | ||||||
| @@ -56,7 +56,7 @@ module.exports = (params, user) => new Promise(async (res, rej) => { | |||||||
| 		followeeId: followee._id | 		followeeId: followee._id | ||||||
| 	}); | 	}); | ||||||
|  |  | ||||||
| 	queue.create('http', { type: 'follow', following: _id }).save(); | 	createHttp({ type: 'follow', following: _id }).save(); | ||||||
|  |  | ||||||
| 	// Send response | 	// Send response | ||||||
| 	res(); | 	res(); | ||||||
|   | |||||||
| @@ -4,7 +4,7 @@ | |||||||
| import $ from 'cafy'; | import $ from 'cafy'; | ||||||
| import User from '../../../../models/user'; | import User from '../../../../models/user'; | ||||||
| import Following from '../../../../models/following'; | import Following from '../../../../models/following'; | ||||||
| import queue from '../../../../queue'; | import { createHttp } from '../../../../queue'; | ||||||
|  |  | ||||||
| /** | /** | ||||||
|  * Unfollow a user |  * Unfollow a user | ||||||
| @@ -49,7 +49,7 @@ module.exports = (params, user) => new Promise(async (res, rej) => { | |||||||
| 		return rej('already not following'); | 		return rej('already not following'); | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	queue.create('http', { | 	createHttp({ | ||||||
| 		type: 'unfollow', | 		type: 'unfollow', | ||||||
| 		id: exist._id | 		id: exist._id | ||||||
| 	}).save(error => { | 	}).save(error => { | ||||||
|   | |||||||
| @@ -3,7 +3,7 @@ import * as express from 'express'; | |||||||
| //const crypto = require('crypto'); | //const crypto = require('crypto'); | ||||||
| import User from '../../../models/user'; | import User from '../../../models/user'; | ||||||
| import config from '../../../config'; | import config from '../../../config'; | ||||||
| import queue from '../../../queue'; | import { createHttp } from '../../../queue'; | ||||||
|  |  | ||||||
| module.exports = async (app: express.Application) => { | module.exports = async (app: express.Application) => { | ||||||
| 	if (config.github_bot == null) return; | 	if (config.github_bot == null) return; | ||||||
| @@ -42,7 +42,7 @@ module.exports = async (app: express.Application) => { | |||||||
| 				const commit = event.commit; | 				const commit = event.commit; | ||||||
| 				const parent = commit.parents[0]; | 				const parent = commit.parents[0]; | ||||||
|  |  | ||||||
| 				queue.create('http', { | 				createHttp({ | ||||||
| 					type: 'gitHubFailureReport', | 					type: 'gitHubFailureReport', | ||||||
| 					userId: bot._id, | 					userId: bot._id, | ||||||
| 					parentUrl: parent.url, | 					parentUrl: parent.url, | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 syuilo
					syuilo