Merge branch 'develop' into fetch

This commit is contained in:
tamaina
2023-01-03 14:09:34 +00:00
139 changed files with 519 additions and 1012 deletions

View File

@@ -0,0 +1,11 @@
export class removeLatestRequestSentAt1672703171386 {
name = 'removeLatestRequestSentAt1672703171386'
async up(queryRunner) {
await queryRunner.query(`ALTER TABLE "instance" DROP COLUMN "latestRequestSentAt"`);
}
async down(queryRunner) {
await queryRunner.query(`ALTER TABLE "instance" ADD "latestRequestSentAt" TIMESTAMP WITH TIME ZONE`);
}
}

View File

@@ -0,0 +1,11 @@
export class removeLastCommunicatedAt1672704017999 {
name = 'removeLastCommunicatedAt1672704017999'
async up(queryRunner) {
await queryRunner.query(`ALTER TABLE "instance" DROP COLUMN "lastCommunicatedAt"`);
}
async down(queryRunner) {
await queryRunner.query(`ALTER TABLE "instance" ADD "lastCommunicatedAt" TIMESTAMP WITH TIME ZONE NOT NULL`);
}
}

View File

@@ -0,0 +1,11 @@
export class removeLatestStatus1672704136584 {
name = 'removeLatestStatus1672704136584'
async up(queryRunner) {
await queryRunner.query(`ALTER TABLE "instance" DROP COLUMN "latestStatus"`);
}
async down(queryRunner) {
await queryRunner.query(`ALTER TABLE "instance" ADD "latestStatus" integer`);
}
}

View File

@@ -37,7 +37,6 @@
"@nestjs/testing": "9.2.1",
"@peertube/http-signature": "1.7.0",
"@sinonjs/fake-timers": "10.0.2",
"@syuilo/aiscript": "0.11.1",
"accepts": "^1.3.8",
"ajv": "8.11.2",
"archiver": "5.3.1",

View File

@@ -22,7 +22,7 @@ export class FederatedInstanceService {
}
@bindThis
public async registerOrFetchInstanceDoc(host: string): Promise<Instance> {
public async fetch(host: string): Promise<Instance> {
host = this.utilityService.toPuny(host);
const cached = this.cache.get(host);
@@ -35,7 +35,6 @@ export class FederatedInstanceService {
id: this.idService.genId(),
host,
caughtAt: new Date(),
lastCommunicatedAt: new Date(),
}).then(x => this.instancesRepository.findOneByOrFail(x.identifiers[0]));
this.cache.set(host, i);
@@ -45,4 +44,17 @@ export class FederatedInstanceService {
return index;
}
}
@bindThis
public async updateCachePartial(host: string, data: Partial<Instance>): Promise<void> {
host = this.utilityService.toPuny(host);
const cached = this.cache.get(host);
if (cached == null) return;
this.cache.set(host, {
...cached,
...data,
});
}
}

View File

@@ -428,7 +428,7 @@ export class NoteCreateService {
// Register host
if (this.userEntityService.isRemoteUser(user)) {
this.federatedInstanceService.registerOrFetchInstanceDoc(user.host).then(i => {
this.federatedInstanceService.fetch(user.host).then(i => {
this.instancesRepository.increment({ id: i.id }, 'notesCount', 1);
this.instanceChart.updateNote(i.host, note, true);
});

View File

@@ -100,7 +100,7 @@ export class NoteDeleteService {
this.perUserNotesChart.update(user, note, false);
if (this.userEntityService.isRemoteUser(user)) {
this.federatedInstanceService.registerOrFetchInstanceDoc(user.host).then(i => {
this.federatedInstanceService.fetch(user.host).then(i => {
this.instancesRepository.decrement({ id: i.id }, 'notesCount', 1);
this.instanceChart.updateNote(i.host, note, false);
});

View File

@@ -205,12 +205,12 @@ export class UserFollowingService {
//#region Update instance stats
if (this.userEntityService.isRemoteUser(follower) && this.userEntityService.isLocalUser(followee)) {
this.federatedInstanceService.registerOrFetchInstanceDoc(follower.host).then(i => {
this.federatedInstanceService.fetch(follower.host).then(i => {
this.instancesRepository.increment({ id: i.id }, 'followingCount', 1);
this.instanceChart.updateFollowing(i.host, true);
});
} else if (this.userEntityService.isLocalUser(follower) && this.userEntityService.isRemoteUser(followee)) {
this.federatedInstanceService.registerOrFetchInstanceDoc(followee.host).then(i => {
this.federatedInstanceService.fetch(followee.host).then(i => {
this.instancesRepository.increment({ id: i.id }, 'followersCount', 1);
this.instanceChart.updateFollowers(i.host, true);
});
@@ -323,12 +323,12 @@ export class UserFollowingService {
//#region Update instance stats
if (this.userEntityService.isRemoteUser(follower) && this.userEntityService.isLocalUser(followee)) {
this.federatedInstanceService.registerOrFetchInstanceDoc(follower.host).then(i => {
this.federatedInstanceService.fetch(follower.host).then(i => {
this.instancesRepository.decrement({ id: i.id }, 'followingCount', 1);
this.instanceChart.updateFollowing(i.host, false);
});
} else if (this.userEntityService.isLocalUser(follower) && this.userEntityService.isRemoteUser(followee)) {
this.federatedInstanceService.registerOrFetchInstanceDoc(followee.host).then(i => {
this.federatedInstanceService.fetch(followee.host).then(i => {
this.instancesRepository.decrement({ id: i.id }, 'followersCount', 1);
this.instanceChart.updateFollowers(i.host, false);
});

View File

@@ -348,7 +348,7 @@ export class ApPersonService implements OnModuleInit {
}
// Register host
this.federatedInstanceService.registerOrFetchInstanceDoc(host).then(i => {
this.federatedInstanceService.fetch(host).then(i => {
this.instancesRepository.increment({ id: i.id }, 'usersCount', 1);
this.instanceChart.newUser(i.host);
this.fetchInstanceMetadataService.fetchInstanceMetadata(i);

View File

@@ -86,7 +86,7 @@ export default class FederationChart extends Chart<typeof schema> {
.where(`instance.host IN (${ subInstancesQuery.getQuery() })`)
.andWhere(meta.blockedHosts.length === 0 ? '1=1' : 'instance.host NOT IN (:...blocked)', { blocked: meta.blockedHosts })
.andWhere('instance.isSuspended = false')
.andWhere('instance.lastCommunicatedAt > :gt', { gt: new Date(Date.now() - (1000 * 60 * 60 * 24 * 30)) })
.andWhere('instance.isNotResponding = false')
.getRawOne()
.then(x => parseInt(x.count, 10)),
this.instancesRepository.createQueryBuilder('instance')
@@ -94,7 +94,7 @@ export default class FederationChart extends Chart<typeof schema> {
.where(`instance.host IN (${ pubInstancesQuery.getQuery() })`)
.andWhere(meta.blockedHosts.length === 0 ? '1=1' : 'instance.host NOT IN (:...blocked)', { blocked: meta.blockedHosts })
.andWhere('instance.isSuspended = false')
.andWhere('instance.lastCommunicatedAt > :gt', { gt: new Date(Date.now() - (1000 * 60 * 60 * 24 * 30)) })
.andWhere('instance.isNotResponding = false')
.getRawOne()
.then(x => parseInt(x.count, 10)),
]);

View File

@@ -7,8 +7,8 @@ import type { } from '@/models/entities/Blocking.js';
import type { User } from '@/models/entities/User.js';
import type { Instance } from '@/models/entities/Instance.js';
import { MetaService } from '@/core/MetaService.js';
import { UserEntityService } from './UserEntityService.js';
import { bindThis } from '@/decorators.js';
import { UserEntityService } from './UserEntityService.js';
@Injectable()
export class InstanceEntityService {
@@ -33,8 +33,6 @@ export class InstanceEntityService {
notesCount: instance.notesCount,
followingCount: instance.followingCount,
followersCount: instance.followersCount,
latestRequestSentAt: instance.latestRequestSentAt ? instance.latestRequestSentAt.toISOString() : null,
lastCommunicatedAt: instance.lastCommunicatedAt.toISOString(),
isNotResponding: instance.isNotResponding,
isSuspended: instance.isSuspended,
isBlocked: meta.blockedHosts.includes(instance.host),

View File

@@ -59,22 +59,6 @@ export class Instance {
})
public followersCount: number;
/**
* 直近のリクエスト送信日時
*/
@Column('timestamp with time zone', {
nullable: true,
})
public latestRequestSentAt: Date | null;
/**
* 直近のリクエスト送信時のHTTPステータスコード
*/
@Column('integer', {
nullable: true,
})
public latestStatus: number | null;
/**
* 直近のリクエスト受信日時
*/
@@ -83,12 +67,6 @@ export class Instance {
})
public latestRequestReceivedAt: Date | null;
/**
* このインスタンスと最後にやり取りした日時
*/
@Column('timestamp with time zone')
public lastCommunicatedAt: Date;
/**
* このインスタンスと不通かどうか
*/

View File

@@ -32,16 +32,6 @@ export const packedFederationInstanceSchema = {
type: 'number',
optional: false, nullable: false,
},
latestRequestSentAt: {
type: 'string',
optional: false, nullable: true,
format: 'date-time',
},
lastCommunicatedAt: {
type: 'string',
optional: false, nullable: false,
format: 'date-time',
},
isNotResponding: {
type: 'boolean',
optional: false, nullable: false,

View File

@@ -15,10 +15,10 @@ import ApRequestChart from '@/core/chart/charts/ap-request.js';
import FederationChart from '@/core/chart/charts/federation.js';
import { StatusError } from '@/misc/status-error.js';
import { UtilityService } from '@/core/UtilityService.js';
import { bindThis } from '@/decorators.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type Bull from 'bull';
import type { DeliverJobData } from '../types.js';
import { bindThis } from '@/decorators.js';
@Injectable()
export class DeliverProcessorService {
@@ -48,7 +48,6 @@ export class DeliverProcessorService {
) {
this.logger = this.queueLoggerService.logger.createSubLogger('deliver');
this.suspendedHostsCache = new Cache<Instance[]>(1000 * 60 * 60);
this.latest = null;
}
@bindThis
@@ -76,20 +75,18 @@ export class DeliverProcessorService {
}
try {
if (this.latest !== (this.latest = JSON.stringify(job.data.content, null, 2))) {
this.logger.debug(`delivering ${this.latest}`);
}
await this.apRequestService.signedPost(job.data.user, job.data.to, job.data.content);
// Update stats
this.federatedInstanceService.registerOrFetchInstanceDoc(host).then(i => {
this.instancesRepository.update(i.id, {
latestRequestSentAt: new Date(),
latestStatus: 200,
lastCommunicatedAt: new Date(),
isNotResponding: false,
});
this.federatedInstanceService.fetch(host).then(i => {
if (i.isNotResponding) {
this.instancesRepository.update(i.id, {
isNotResponding: false,
});
this.federatedInstanceService.updateCachePartial(host, {
isNotResponding: false,
});
}
this.fetchInstanceMetadataService.fetchInstanceMetadata(i);
@@ -100,13 +97,16 @@ export class DeliverProcessorService {
return 'Success';
} catch (res) {
// Update stats
this.federatedInstanceService.registerOrFetchInstanceDoc(host).then(i => {
this.instancesRepository.update(i.id, {
latestRequestSentAt: new Date(),
latestStatus: res instanceof StatusError ? res.statusCode : null,
isNotResponding: true,
});
// Update stats
this.federatedInstanceService.fetch(host).then(i => {
if (!i.isNotResponding) {
this.instancesRepository.update(i.id, {
isNotResponding: true,
});
this.federatedInstanceService.updateCachePartial(host, {
isNotResponding: true,
});
}
this.instanceChart.requestSent(i.host, false);
this.apRequestChart.deliverFail();
@@ -114,17 +114,17 @@ export class DeliverProcessorService {
});
if (res instanceof StatusError) {
// 4xx
// 4xx
if (res.isClientError) {
// HTTPステータスコード4xxはクライアントエラーであり、それはつまり
// 何回再送しても成功することはないということなのでエラーにはしないでおく
// HTTPステータスコード4xxはクライアントエラーであり、それはつまり
// 何回再送しても成功することはないということなのでエラーにはしないでおく
return `${res.statusCode} ${res.statusMessage}`;
}
// 5xx etc.
throw `${res.statusCode} ${res.statusMessage}`;
} else {
// DNS error, socket error, timeout ...
// DNS error, socket error, timeout ...
throw res;
}
}

View File

@@ -176,10 +176,12 @@ export class InboxProcessorService {
}
// Update stats
this.federatedInstanceService.registerOrFetchInstanceDoc(authUser.user.host).then(i => {
this.federatedInstanceService.fetch(authUser.user.host).then(i => {
this.instancesRepository.update(i.id, {
latestRequestReceivedAt: new Date(),
lastCommunicatedAt: new Date(),
isNotResponding: false,
});
this.federatedInstanceService.updateCachePartial(host, {
isNotResponding: false,
});

View File

@@ -64,8 +64,8 @@ export default class extends Endpoint<typeof meta, typeof paramDef> {
case '-followers': query.orderBy('instance.followersCount', 'ASC'); break;
case '+caughtAt': query.orderBy('instance.caughtAt', 'DESC'); break;
case '-caughtAt': query.orderBy('instance.caughtAt', 'ASC'); break;
case '+lastCommunicatedAt': query.orderBy('instance.lastCommunicatedAt', 'DESC'); break;
case '-lastCommunicatedAt': query.orderBy('instance.lastCommunicatedAt', 'ASC'); break;
case '+latestRequestReceivedAt': query.orderBy('instance.latestRequestReceivedAt', 'DESC'); break;
case '-latestRequestReceivedAt': query.orderBy('instance.latestRequestReceivedAt', 'ASC'); break;
default: query.orderBy('instance.id', 'DESC'); break;
}

View File

@@ -139,10 +139,12 @@ export default class extends Endpoint<typeof meta, typeof paramDef> {
throw new ApiError(meta.errors.noSuchUser);
}
if (me == null && ip != null) {
this.perUserPvChart.commitByVisitor(user, ip);
} else if (me && me.id !== user.id) {
this.perUserPvChart.commitByUser(user, me.id);
if (user.host == null) {
if (me == null && ip != null) {
this.perUserPvChart.commitByVisitor(user, ip);
} else if (me && me.id !== user.id) {
this.perUserPvChart.commitByUser(user, me.id);
}
}
return await this.userEntityService.pack(user, me, {