sharkey/packages/backend/src/core/QueueService.ts
Namekuji d28866f71a
enhance: account migration (#10592)
* copy block and mute then create follow and unfollow jobs

* copy block and mute and update lists when detecting an account has moved

* no need to care promise orders

* refactor updating actor and target

* automatically accept if a locked account had accepted an old account

* fix exception format

* prevent the old account from calling some endpoints

* do not unfollow when moving

* adjust following and follower counts

* check movedToUri when receiving a follow request

* skip if no need to adjust

* Revert "disable account migration"

This reverts commit 2321214c98.

* fix translation specifier

* fix checking alsoKnownAs and uri

* fix updating account

* fix refollowing locked account

* decrease followersCount if followed by the old account

* adjust following and followers counts when unfollowing

* fix copying mutings

* prohibit moved account from moving again

* fix move service

* allow app creation after moving

* fix lint

* remove unnecessary field

* fix cache update

* add e2e test

* add e2e test of accepting the new account automatically

* force follow if any error happens

* remove unnecessary joins

* use Array.map instead of for const of

* ユーザーリストの移行は追加のみを行う

* nanka iroiro

* fix misskey-js?

* ✌️

* 移行を行ったアカウントからのフォローリクエストの自動許可を調整

* newUriを外に出す

* newUriを外に出す2

* clean up

* fix newUri

* prevent moving if the destination account has already moved

* set alsoKnownAs via /i/update

* fix database initialization

* add return type

* prohibit updating alsoKnownAs after moving

* skip to add to alsoKnownAs if toUrl is known

* skip adding to the list if it already has

* use Acct.parse instead

* rename error code

* 🎨

* 制限を5から10に緩和

* movedTo(Uri), alsoKnownAsはユーザーidを返すように

* test api res

* fix

* 元アカウントはミュートし続ける

* 🎨

* unfollow

* fix

* getUserUriをUserEntityServiceに

* ?

* job!

* 🎨

* instance => server

* accountMovedShort, forbiddenBecauseYouAreMigrated

* accountMovedShort

* fix test

* import, pin禁止

* 実績を凍結する

* clean up

* ✌️

* change message

* ブロック, フォロー, ミュート, リストのインポートファイルの制限を32MiBに

* Revert "ブロック, フォロー, ミュート, リストのインポートファイルの制限を32MiBに"

This reverts commit 3bd7be35d8aa455cb01ae58f8172a71a50485db1.

* validateAlsoKnownAs

* 移行後2時間以内はインポート可能なファイルサイズを拡大

* clean up

* どうせactorをupdatePersonで更新するならupdatePersonしか移行処理を発行しないことにする

* handle error?

* リモートからの移行処理の条件を是正

* log, port

* fix

* fix

* enhance(dev): non-production環境でhttpサーバー間でもユーザー、ノートの連合が可能なように

* refactor (use checkHttps)

* MISSKEY_WEBFINGER_USE_HTTP

* Environment Variable readme

* NEVER USE IN PRODUCTION

* fix punyHost

* fix indent

* fix

* experimental

---------

Co-authored-by: tamaina <tamaina@hotmail.co.jp>
Co-authored-by: syuilo <Syuilotan@yahoo.co.jp>
2023-04-30 00:09:29 +09:00

356 lines
9.0 KiB
TypeScript

import { Inject, Injectable } from '@nestjs/common';
import { v4 as uuid } from 'uuid';
import type { IActivity } from '@/core/activitypub/type.js';
import type { DriveFile } from '@/models/entities/DriveFile.js';
import type { Webhook, webhookEventTypes } from '@/models/entities/Webhook.js';
import type { Config } from '@/config.js';
import { DI } from '@/di-symbols.js';
import { bindThis } from '@/decorators.js';
import type { DbQueue, DeliverQueue, EndedPollNotificationQueue, InboxQueue, ObjectStorageQueue, RelationshipQueue, SystemQueue, WebhookDeliverQueue } from './QueueModule.js';
import type { DbJobData, RelationshipJobData, ThinUser } from '../queue/types.js';
import type httpSignature from '@peertube/http-signature';
import Bull from 'bull';
@Injectable()
export class QueueService {
constructor(
@Inject(DI.config)
private config: Config,
@Inject('queue:system') public systemQueue: SystemQueue,
@Inject('queue:endedPollNotification') public endedPollNotificationQueue: EndedPollNotificationQueue,
@Inject('queue:deliver') public deliverQueue: DeliverQueue,
@Inject('queue:inbox') public inboxQueue: InboxQueue,
@Inject('queue:db') public dbQueue: DbQueue,
@Inject('queue:relationship') public relationshipQueue: RelationshipQueue,
@Inject('queue:objectStorage') public objectStorageQueue: ObjectStorageQueue,
@Inject('queue:webhookDeliver') public webhookDeliverQueue: WebhookDeliverQueue,
) {}
@bindThis
public deliver(user: ThinUser, content: IActivity | null, to: string | null, isSharedInbox: boolean) {
if (content == null) return null;
if (to == null) return null;
const data = {
user: {
id: user.id,
},
content,
to,
isSharedInbox,
};
return this.deliverQueue.add(data, {
attempts: this.config.deliverJobMaxAttempts ?? 12,
timeout: 1 * 60 * 1000, // 1min
backoff: {
type: 'apBackoff',
},
removeOnComplete: true,
removeOnFail: true,
});
}
@bindThis
public inbox(activity: IActivity, signature: httpSignature.IParsedSignature) {
const data = {
activity: activity,
signature,
};
return this.inboxQueue.add(data, {
attempts: this.config.inboxJobMaxAttempts ?? 8,
timeout: 5 * 60 * 1000, // 5min
backoff: {
type: 'apBackoff',
},
removeOnComplete: true,
removeOnFail: true,
});
}
@bindThis
public createDeleteDriveFilesJob(user: ThinUser) {
return this.dbQueue.add('deleteDriveFiles', {
user: { id: user.id },
}, {
removeOnComplete: true,
removeOnFail: true,
});
}
@bindThis
public createExportCustomEmojisJob(user: ThinUser) {
return this.dbQueue.add('exportCustomEmojis', {
user: { id: user.id },
}, {
removeOnComplete: true,
removeOnFail: true,
});
}
@bindThis
public createExportNotesJob(user: ThinUser) {
return this.dbQueue.add('exportNotes', {
user: { id: user.id },
}, {
removeOnComplete: true,
removeOnFail: true,
});
}
@bindThis
public createExportFavoritesJob(user: ThinUser) {
return this.dbQueue.add('exportFavorites', {
user: { id: user.id },
}, {
removeOnComplete: true,
removeOnFail: true,
});
}
@bindThis
public createExportFollowingJob(user: ThinUser, excludeMuting = false, excludeInactive = false) {
return this.dbQueue.add('exportFollowing', {
user: { id: user.id },
excludeMuting,
excludeInactive,
}, {
removeOnComplete: true,
removeOnFail: true,
});
}
@bindThis
public createExportMuteJob(user: ThinUser) {
return this.dbQueue.add('exportMuting', {
user: { id: user.id },
}, {
removeOnComplete: true,
removeOnFail: true,
});
}
@bindThis
public createExportBlockingJob(user: ThinUser) {
return this.dbQueue.add('exportBlocking', {
user: { id: user.id },
}, {
removeOnComplete: true,
removeOnFail: true,
});
}
@bindThis
public createExportUserListsJob(user: ThinUser) {
return this.dbQueue.add('exportUserLists', {
user: { id: user.id },
}, {
removeOnComplete: true,
removeOnFail: true,
});
}
@bindThis
public createImportFollowingJob(user: ThinUser, fileId: DriveFile['id']) {
return this.dbQueue.add('importFollowing', {
user: { id: user.id },
fileId: fileId,
}, {
removeOnComplete: true,
removeOnFail: true,
});
}
@bindThis
public createImportFollowingToDbJob(user: ThinUser, targets: string[]) {
const jobs = targets.map(rel => this.generateToDbJobData('importFollowingToDb', { user, target: rel }));
return this.dbQueue.addBulk(jobs);
}
@bindThis
public createImportMutingJob(user: ThinUser, fileId: DriveFile['id']) {
return this.dbQueue.add('importMuting', {
user: { id: user.id },
fileId: fileId,
}, {
removeOnComplete: true,
removeOnFail: true,
});
}
@bindThis
public createImportBlockingJob(user: ThinUser, fileId: DriveFile['id']) {
return this.dbQueue.add('importBlocking', {
user: { id: user.id },
fileId: fileId,
}, {
removeOnComplete: true,
removeOnFail: true,
});
}
@bindThis
public createImportBlockingToDbJob(user: ThinUser, targets: string[]) {
const jobs = targets.map(rel => this.generateToDbJobData('importBlockingToDb', { user, target: rel }));
return this.dbQueue.addBulk(jobs);
}
@bindThis
private generateToDbJobData<T extends 'importFollowingToDb' | 'importBlockingToDb', D extends DbJobData<T>>(name: T, data: D): {
name: string,
data: D,
opts: Bull.JobOptions,
} {
return {
name,
data,
opts: {
removeOnComplete: true,
removeOnFail: true,
},
};
}
@bindThis
public createImportUserListsJob(user: ThinUser, fileId: DriveFile['id']) {
return this.dbQueue.add('importUserLists', {
user: { id: user.id },
fileId: fileId,
}, {
removeOnComplete: true,
removeOnFail: true,
});
}
@bindThis
public createImportCustomEmojisJob(user: ThinUser, fileId: DriveFile['id']) {
return this.dbQueue.add('importCustomEmojis', {
user: { id: user.id },
fileId: fileId,
}, {
removeOnComplete: true,
removeOnFail: true,
});
}
@bindThis
public createDeleteAccountJob(user: ThinUser, opts: { soft?: boolean; } = {}) {
return this.dbQueue.add('deleteAccount', {
user: { id: user.id },
soft: opts.soft,
}, {
removeOnComplete: true,
removeOnFail: true,
});
}
@bindThis
public createFollowJob(followings: { from: ThinUser, to: ThinUser, requestId?: string, silent?: boolean }[]) {
const jobs = followings.map(rel => this.generateRelationshipJobData('follow', rel));
return this.relationshipQueue.addBulk(jobs);
}
@bindThis
public createUnfollowJob(followings: { from: ThinUser, to: ThinUser, requestId?: string }[]) {
const jobs = followings.map(rel => this.generateRelationshipJobData('unfollow', rel));
return this.relationshipQueue.addBulk(jobs);
}
@bindThis
public createDelayedUnfollowJob(followings: { from: ThinUser, to: ThinUser, requestId?: string }[], delay: number) {
const jobs = followings.map(rel => this.generateRelationshipJobData('unfollow', rel, { delay }));
return this.relationshipQueue.addBulk(jobs);
}
@bindThis
public createBlockJob(blockings: { from: ThinUser, to: ThinUser, silent?: boolean }[]) {
const jobs = blockings.map(rel => this.generateRelationshipJobData('block', rel));
return this.relationshipQueue.addBulk(jobs);
}
@bindThis
public createUnblockJob(blockings: { from: ThinUser, to: ThinUser, silent?: boolean }[]) {
const jobs = blockings.map(rel => this.generateRelationshipJobData('unblock', rel));
return this.relationshipQueue.addBulk(jobs);
}
@bindThis
private generateRelationshipJobData(name: 'follow' | 'unfollow' | 'block' | 'unblock', data: RelationshipJobData, opts: Bull.JobOptions = {}): {
name: string,
data: RelationshipJobData,
opts: Bull.JobOptions,
} {
return {
name,
data: {
from: { id: data.from.id },
to: { id: data.to.id },
silent: data.silent,
requestId: data.requestId,
},
opts: {
removeOnComplete: true,
removeOnFail: true,
...opts,
},
};
}
@bindThis
public createDeleteObjectStorageFileJob(key: string) {
return this.objectStorageQueue.add('deleteFile', {
key: key,
}, {
removeOnComplete: true,
removeOnFail: true,
});
}
@bindThis
public createCleanRemoteFilesJob() {
return this.objectStorageQueue.add('cleanRemoteFiles', {}, {
removeOnComplete: true,
removeOnFail: true,
});
}
@bindThis
public webhookDeliver(webhook: Webhook, type: typeof webhookEventTypes[number], content: unknown) {
const data = {
type,
content,
webhookId: webhook.id,
userId: webhook.userId,
to: webhook.url,
secret: webhook.secret,
createdAt: Date.now(),
eventId: uuid(),
};
return this.webhookDeliverQueue.add(data, {
attempts: 4,
timeout: 1 * 60 * 1000, // 1min
backoff: {
type: 'apBackoff',
},
removeOnComplete: true,
removeOnFail: true,
});
}
@bindThis
public destroy() {
this.deliverQueue.once('cleaned', (jobs, status) => {
//deliverLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
});
this.deliverQueue.clean(0, 'delayed');
this.inboxQueue.once('cleaned', (jobs, status) => {
//inboxLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
});
this.inboxQueue.clean(0, 'delayed');
}
}