merge: thread more imports (#187)

This commit is contained in:
Marie 2023-11-30 18:55:36 +01:00 committed by GitHub
commit acfb24517a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 67 additions and 39 deletions

View File

@ -278,14 +278,14 @@ export class QueueService {
} }
@bindThis @bindThis
public createImportMastoToDbJob(user: ThinUser, targets: string[]) { public createImportMastoToDbJob(user: ThinUser, targets: string[], note: MiNote['id'] | null) {
const jobs = targets.map(rel => this.generateToDbJobData('importMastoToDb', { user, target: rel })); const jobs = targets.map(rel => this.generateToDbJobData('importMastoToDb', { user, target: rel, note }));
return this.dbQueue.addBulk(jobs); return this.dbQueue.addBulk(jobs);
} }
@bindThis @bindThis
public createImportPleroToDbJob(user: ThinUser, targets: string[]) { public createImportPleroToDbJob(user: ThinUser, targets: string[], note: MiNote['id'] | null) {
const jobs = targets.map(rel => this.generateToDbJobData('importPleroToDb', { user, target: rel })); const jobs = targets.map(rel => this.generateToDbJobData('importPleroToDb', { user, target: rel, note }));
return this.dbQueue.addBulk(jobs); return this.dbQueue.addBulk(jobs);
} }

View File

@ -17,7 +17,7 @@ import { extractApHashtagObjects } from '@/core/activitypub/models/tag.js';
import { IdService } from '@/core/IdService.js'; import { IdService } from '@/core/IdService.js';
import { QueueLoggerService } from '../QueueLoggerService.js'; import { QueueLoggerService } from '../QueueLoggerService.js';
import type * as Bull from 'bullmq'; import type * as Bull from 'bullmq';
import type { DbNoteImportToDbJobData, DbNoteImportJobData, DbKeyNoteImportToDbJobData } from '../types.js'; import type { DbNoteImportToDbJobData, DbNoteImportJobData, DbNoteWithParentImportToDbJobData } from '../types.js';
@Injectable() @Injectable()
export class ImportNotesProcessorService { export class ImportNotesProcessorService {
@ -74,7 +74,7 @@ export class ImportNotesProcessorService {
// Function was taken from Firefish and modified for our needs // Function was taken from Firefish and modified for our needs
@bindThis @bindThis
private async recreateChain(idField: string, replyField: string, arr: any[]): Promise<any[]> { private async recreateChain(idFieldPath: string[], replyFieldPath: string[], arr: any[], includeOrphans: boolean): Promise<any[]> {
type NotesMap = { type NotesMap = {
[id: string]: any; [id: string]: any;
}; };
@ -83,28 +83,42 @@ export class ImportNotesProcessorService {
const notesWaitingForParent: NotesMap = {}; const notesWaitingForParent: NotesMap = {};
for await (const note of arr) { for await (const note of arr) {
noteById[note[idField]] = note; const noteId = idFieldPath.reduce(
(obj, step) => obj[step],
note,
);
noteById[noteId] = note;
note.childNotes = []; note.childNotes = [];
const children = notesWaitingForParent[note[idField]]; const children = notesWaitingForParent[noteId];
if (children) { if (children) {
note.childNotes.push(...children); note.childNotes.push(...children);
delete notesWaitingForParent[noteId];
} }
if (note[replyField] == null) { const noteReplyId = replyFieldPath.reduce(
(obj, step) => obj[step],
note,
);
if (noteReplyId == null) {
notesTree.push(note); notesTree.push(note);
continue; continue;
} }
const parent = noteById[note[replyField]]; const parent = noteById[noteReplyId];
if (parent) { if (parent) {
parent.childNotes.push(note); parent.childNotes.push(note);
} else { } else {
notesWaitingForParent[note[replyField]] ||= []; notesWaitingForParent[noteReplyId] ||= [];
notesWaitingForParent[note[replyField]].push(note); notesWaitingForParent[noteReplyId].push(note);
} }
} }
if (includeOrphans) {
notesTree.push(...Object.values(notesWaitingForParent).flat(1));
}
return notesTree; return notesTree;
} }
@ -176,7 +190,7 @@ export class ImportNotesProcessorService {
const tweets = Object.keys(fakeWindow.window.YTD.tweets.part0).reduce((m, key, i, obj) => { const tweets = Object.keys(fakeWindow.window.YTD.tweets.part0).reduce((m, key, i, obj) => {
return m.concat(fakeWindow.window.YTD.tweets.part0[key].tweet); return m.concat(fakeWindow.window.YTD.tweets.part0[key].tweet);
}, []); }, []);
const processedTweets = await this.recreateChain('id_str', 'in_reply_to_status_id_str', tweets); const processedTweets = await this.recreateChain(['id_str'], ['in_reply_to_status_id_str'], tweets, false);
this.queueService.createImportTweetsToDbJob(job.data.user, processedTweets, null); this.queueService.createImportTweetsToDbJob(job.data.user, processedTweets, null);
} finally { } finally {
cleanup(); cleanup();
@ -254,7 +268,8 @@ export class ImportNotesProcessorService {
if (isPleroma) { if (isPleroma) {
const outboxJson = fs.readFileSync(outputPath + '/outbox.json', 'utf-8'); const outboxJson = fs.readFileSync(outputPath + '/outbox.json', 'utf-8');
const outbox = JSON.parse(outboxJson); const outbox = JSON.parse(outboxJson);
this.queueService.createImportPleroToDbJob(job.data.user, outbox.orderedItems.filter((x: any) => x.type === 'Create' && x.object.type === 'Note')); const processedToots = await this.recreateChain(['object', 'id'], ['object', 'inReplyTo'], outbox.orderedItems.filter((x: any) => x.type === 'Create' && x.object.type === 'Note'), true);
this.queueService.createImportPleroToDbJob(job.data.user, processedToots, null);
} else { } else {
const outboxJson = fs.readFileSync(outputPath + '/outbox.json', 'utf-8'); const outboxJson = fs.readFileSync(outputPath + '/outbox.json', 'utf-8');
const outbox = JSON.parse(outboxJson); const outbox = JSON.parse(outboxJson);
@ -266,7 +281,8 @@ export class ImportNotesProcessorService {
if (fs.existsSync(outputPath + '/media_attachments/files') && mastoFolder) { if (fs.existsSync(outputPath + '/media_attachments/files') && mastoFolder) {
await this.uploadFiles(outputPath + '/media_attachments/files', user, mastoFolder.id); await this.uploadFiles(outputPath + '/media_attachments/files', user, mastoFolder.id);
} }
this.queueService.createImportMastoToDbJob(job.data.user, outbox.orderedItems.filter((x: any) => x.type === 'Create' && x.object.type === 'Note')); const processedToots = await this.recreateChain(['object', 'id'], ['object', 'inReplyTo'], outbox.orderedItems.filter((x: any) => x.type === 'Create' && x.object.type === 'Note'), true);
this.queueService.createImportMastoToDbJob(job.data.user, processedToots, null);
} }
} }
} finally { } finally {
@ -289,7 +305,7 @@ export class ImportNotesProcessorService {
const notesJson = fs.readFileSync(path, 'utf-8'); const notesJson = fs.readFileSync(path, 'utf-8');
const notes = JSON.parse(notesJson); const notes = JSON.parse(notesJson);
const processedNotes = await this.recreateChain('id', 'replyId', notes); const processedNotes = await this.recreateChain(['id'], ['replyId'], notes, false);
this.queueService.createImportKeyNotesToDbJob(job.data.user, processedNotes, null); this.queueService.createImportKeyNotesToDbJob(job.data.user, processedNotes, null);
cleanup(); cleanup();
} }
@ -298,7 +314,7 @@ export class ImportNotesProcessorService {
} }
@bindThis @bindThis
public async processKeyNotesToDb(job: Bull.Job<DbKeyNoteImportToDbJobData>): Promise<void> { public async processKeyNotesToDb(job: Bull.Job<DbNoteWithParentImportToDbJobData>): Promise<void> {
const note = job.data.target; const note = job.data.target;
const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
if (user == null) { if (user == null) {
@ -355,27 +371,32 @@ export class ImportNotesProcessorService {
} }
@bindThis @bindThis
public async processMastoToDb(job: Bull.Job<DbNoteImportToDbJobData>): Promise<void> { public async processMastoToDb(job: Bull.Job<DbNoteWithParentImportToDbJobData>): Promise<void> {
const toot = job.data.target; const toot = job.data.target;
const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
if (user == null) { if (user == null) {
return; return;
} }
if (toot.directMessage) return;
const date = new Date(toot.object.published); const date = new Date(toot.object.published);
let text = undefined; let text = undefined;
const files: MiDriveFile[] = []; const files: MiDriveFile[] = [];
let reply: MiNote | null = null; let reply: MiNote | null = null;
if (toot.object.inReplyTo != null) { if (toot.object.inReplyTo != null) {
const parentNote = job.data.note ? await this.notesRepository.findOneBy({ id: job.data.note }) : null;
if (parentNote) {
reply = parentNote;
} else {
try { try {
reply = await this.apNoteService.resolveNote(toot.object.inReplyTo); reply = await this.apNoteService.resolveNote(toot.object.inReplyTo);
} catch (error) { } catch (error) {
reply = null; reply = null;
} }
} }
}
if (toot.directMessage) return;
const hashtags = extractApHashtagObjects(toot.object.tag).map((x) => x.name).filter((x): x is string => x != null); const hashtags = extractApHashtagObjects(toot.object.tag).map((x) => x.name).filter((x): x is string => x != null);
@ -396,17 +417,20 @@ export class ImportNotesProcessorService {
} }
} }
await this.noteCreateService.import(user, { createdAt: date, text: text, files: files, apMentions: new Array(0), cw: toot.object.sensitive ? toot.object.summary : null, reply: reply }); const createdNote = await this.noteCreateService.import(user, { createdAt: date, text: text, files: files, apMentions: new Array(0), cw: toot.object.sensitive ? toot.object.summary : null, reply: reply });
if (toot.childNotes) this.queueService.createImportMastoToDbJob(user, toot.childNotes, createdNote.id);
} }
@bindThis @bindThis
public async processPleroToDb(job: Bull.Job<DbNoteImportToDbJobData>): Promise<void> { public async processPleroToDb(job: Bull.Job<DbNoteWithParentImportToDbJobData>): Promise<void> {
const post = job.data.target; const post = job.data.target;
const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
if (user == null) { if (user == null) {
return; return;
} }
if (post.directMessage) return;
const date = new Date(post.object.published); const date = new Date(post.object.published);
let text = undefined; let text = undefined;
const files: MiDriveFile[] = []; const files: MiDriveFile[] = [];
@ -416,14 +440,17 @@ export class ImportNotesProcessorService {
if (folder == null) return; if (folder == null) return;
if (post.object.inReplyTo != null) { if (post.object.inReplyTo != null) {
const parentNote = job.data.note ? await this.notesRepository.findOneBy({ id: job.data.note }) : null;
if (parentNote) {
reply = parentNote;
} else {
try { try {
reply = await this.apNoteService.resolveNote(post.object.inReplyTo); reply = await this.apNoteService.resolveNote(post.object.inReplyTo);
} catch (error) { } catch (error) {
reply = null; reply = null;
} }
} }
}
if (post.directMessage) return;
const hashtags = extractApHashtagObjects(post.object.tag).map((x) => x.name).filter((x): x is string => x != null); const hashtags = extractApHashtagObjects(post.object.tag).map((x) => x.name).filter((x): x is string => x != null);
@ -468,7 +495,8 @@ export class ImportNotesProcessorService {
} }
} }
await this.noteCreateService.import(user, { createdAt: date, text: text, files: files, apMentions: new Array(0), cw: post.object.sensitive ? post.object.summary : null, reply: reply }); const createdNote = await this.noteCreateService.import(user, { createdAt: date, text: text, files: files, apMentions: new Array(0), cw: post.object.sensitive ? post.object.summary : null, reply: reply });
if (post.childNotes) this.queueService.createImportPleroToDbJob(user, post.childNotes, createdNote.id);
} }
@bindThis @bindThis
@ -517,7 +545,7 @@ export class ImportNotesProcessorService {
} }
@bindThis @bindThis
public async processTwitterDb(job: Bull.Job<DbKeyNoteImportToDbJobData>): Promise<void> { public async processTwitterDb(job: Bull.Job<DbNoteWithParentImportToDbJobData>): Promise<void> {
const tweet = job.data.target; const tweet = job.data.target;
const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
if (user == null) { if (user == null) {

View File

@ -50,12 +50,12 @@ export type DbJobMap = {
exportUserLists: DbJobDataWithUser; exportUserLists: DbJobDataWithUser;
importAntennas: DBAntennaImportJobData; importAntennas: DBAntennaImportJobData;
importNotes: DbNoteImportJobData; importNotes: DbNoteImportJobData;
importTweetsToDb: DbKeyNoteImportToDbJobData; importTweetsToDb: DbNoteWithParentImportToDbJobData;
importIGToDb: DbNoteImportToDbJobData; importIGToDb: DbNoteImportToDbJobData;
importFBToDb: DbNoteImportToDbJobData; importFBToDb: DbNoteImportToDbJobData;
importMastoToDb: DbNoteImportToDbJobData; importMastoToDb: DbNoteWithParentImportToDbJobData;
importPleroToDb: DbNoteImportToDbJobData; importPleroToDb: DbNoteWithParentImportToDbJobData;
importKeyNotesToDb: DbKeyNoteImportToDbJobData; importKeyNotesToDb: DbNoteWithParentImportToDbJobData;
importFollowing: DbUserImportJobData; importFollowing: DbUserImportJobData;
importFollowingToDb: DbUserImportToDbJobData; importFollowingToDb: DbUserImportToDbJobData;
importMuting: DbUserImportJobData; importMuting: DbUserImportJobData;
@ -113,7 +113,7 @@ export type DbNoteImportToDbJobData = {
target: any; target: any;
}; };
export type DbKeyNoteImportToDbJobData = { export type DbNoteWithParentImportToDbJobData = {
user: ThinUser; user: ThinUser;
target: any; target: any;
note: MiNote['id'] | null; note: MiNote['id'] | null;