properly thread Mastodon imports
This commit is contained in:
parent
e392d523a7
commit
15503b96a0
@ -278,8 +278,8 @@ export class QueueService {
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public createImportMastoToDbJob(user: ThinUser, targets: string[]) {
|
||||
const jobs = targets.map(rel => this.generateToDbJobData('importMastoToDb', { user, target: rel }));
|
||||
public createImportMastoToDbJob(user: ThinUser, targets: string[], note: MiNote['id'] | null) {
|
||||
const jobs = targets.map(rel => this.generateToDbJobData('importMastoToDb', { user, target: rel, note }));
|
||||
return this.dbQueue.addBulk(jobs);
|
||||
}
|
||||
|
||||
|
@ -17,7 +17,7 @@ import { extractApHashtagObjects } from '@/core/activitypub/models/tag.js';
|
||||
import { IdService } from '@/core/IdService.js';
|
||||
import { QueueLoggerService } from '../QueueLoggerService.js';
|
||||
import type * as Bull from 'bullmq';
|
||||
import type { DbNoteImportToDbJobData, DbNoteImportJobData, DbKeyNoteImportToDbJobData } from '../types.js';
|
||||
import type { DbNoteImportToDbJobData, DbNoteImportJobData, DbNoteWithParentImportToDbJobData } from '../types.js';
|
||||
|
||||
@Injectable()
|
||||
export class ImportNotesProcessorService {
|
||||
@ -274,7 +274,8 @@ export class ImportNotesProcessorService {
|
||||
if (fs.existsSync(outputPath + '/media_attachments/files') && mastoFolder) {
|
||||
await this.uploadFiles(outputPath + '/media_attachments/files', user, mastoFolder.id);
|
||||
}
|
||||
this.queueService.createImportMastoToDbJob(job.data.user, outbox.orderedItems.filter((x: any) => x.type === 'Create' && x.object.type === 'Note'));
|
||||
const processedToots = await this.recreateChain('id', 'inReplyTo', outbox.orderedItems.filter((x: any) => x.type === 'Create' && x.object.type === 'Note'), true);
|
||||
this.queueService.createImportMastoToDbJob(job.data.user, processedToots, null);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
@ -306,7 +307,7 @@ export class ImportNotesProcessorService {
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public async processKeyNotesToDb(job: Bull.Job<DbKeyNoteImportToDbJobData>): Promise<void> {
|
||||
public async processKeyNotesToDb(job: Bull.Job<DbNoteWithParentImportToDbJobData>): Promise<void> {
|
||||
const note = job.data.target;
|
||||
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
|
||||
if (user == null) {
|
||||
@ -363,28 +364,33 @@ export class ImportNotesProcessorService {
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public async processMastoToDb(job: Bull.Job<DbNoteImportToDbJobData>): Promise<void> {
|
||||
public async processMastoToDb(job: Bull.Job<DbNoteWithParentImportToDbJobData>): Promise<void> {
|
||||
const toot = job.data.target;
|
||||
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
|
||||
if (user == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (toot.directMessage) return;
|
||||
|
||||
const date = new Date(toot.object.published);
|
||||
let text = undefined;
|
||||
const files: MiDriveFile[] = [];
|
||||
let reply: MiNote | null = null;
|
||||
|
||||
if (toot.object.inReplyTo != null) {
|
||||
try {
|
||||
reply = await this.apNoteService.resolveNote(toot.object.inReplyTo);
|
||||
} catch (error) {
|
||||
reply = null;
|
||||
const parentNote = job.data.note ? await this.notesRepository.findOneBy({ id: job.data.note }) : null;
|
||||
if (parentNote) {
|
||||
reply = parentNote;
|
||||
} else {
|
||||
try {
|
||||
reply = await this.apNoteService.resolveNote(toot.object.inReplyTo);
|
||||
} catch (error) {
|
||||
reply = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (toot.directMessage) return;
|
||||
|
||||
const hashtags = extractApHashtagObjects(toot.object.tag).map((x) => x.name).filter((x): x is string => x != null);
|
||||
|
||||
try {
|
||||
@ -404,7 +410,8 @@ export class ImportNotesProcessorService {
|
||||
}
|
||||
}
|
||||
|
||||
await this.noteCreateService.import(user, { createdAt: date, text: text, files: files, apMentions: new Array(0), cw: toot.object.sensitive ? toot.object.summary : null, reply: reply });
|
||||
const createdNote = await this.noteCreateService.import(user, { createdAt: date, text: text, files: files, apMentions: new Array(0), cw: toot.object.sensitive ? toot.object.summary : null, reply: reply });
|
||||
if (toot.childNotes) this.queueService.createImportMastoToDbJob(user, toot.childNotes, createdNote.id);
|
||||
}
|
||||
|
||||
@bindThis
|
||||
@ -525,7 +532,7 @@ export class ImportNotesProcessorService {
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public async processTwitterDb(job: Bull.Job<DbKeyNoteImportToDbJobData>): Promise<void> {
|
||||
public async processTwitterDb(job: Bull.Job<DbNoteWithParentImportToDbJobData>): Promise<void> {
|
||||
const tweet = job.data.target;
|
||||
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
|
||||
if (user == null) {
|
||||
|
@ -50,12 +50,12 @@ export type DbJobMap = {
|
||||
exportUserLists: DbJobDataWithUser;
|
||||
importAntennas: DBAntennaImportJobData;
|
||||
importNotes: DbNoteImportJobData;
|
||||
importTweetsToDb: DbKeyNoteImportToDbJobData;
|
||||
importTweetsToDb: DbNoteWithParentImportToDbJobData;
|
||||
importIGToDb: DbNoteImportToDbJobData;
|
||||
importFBToDb: DbNoteImportToDbJobData;
|
||||
importMastoToDb: DbNoteImportToDbJobData;
|
||||
importMastoToDb: DbNoteWithParentImportToDbJobData;
|
||||
importPleroToDb: DbNoteImportToDbJobData;
|
||||
importKeyNotesToDb: DbKeyNoteImportToDbJobData;
|
||||
importKeyNotesToDb: DbNoteWithParentImportToDbJobData;
|
||||
importFollowing: DbUserImportJobData;
|
||||
importFollowingToDb: DbUserImportToDbJobData;
|
||||
importMuting: DbUserImportJobData;
|
||||
@ -113,7 +113,7 @@ export type DbNoteImportToDbJobData = {
|
||||
target: any;
|
||||
};
|
||||
|
||||
export type DbKeyNoteImportToDbJobData = {
|
||||
export type DbNoteWithParentImportToDbJobData = {
|
||||
user: ThinUser;
|
||||
target: any;
|
||||
note: MiNote['id'] | null;
|
||||
|
Loading…
Reference in New Issue
Block a user