From e392d523a7df4c22575b412b4ec0c632fe4ef3a4 Mon Sep 17 00:00:00 2001 From: Gianni Ceccarelli Date: Tue, 28 Nov 2023 09:45:51 +0000 Subject: [PATCH 1/4] prepare to import more notes `recreateChain` converts a list of notes into a forest of notes, using notes that are not replies as roots, and replies as child nodes, recursively. Previously, notes that are replies to notes not included in the export, and their children, were never put in the forest, and therefore wheren't imported. This can be fine when importing from Twitter, since we can't really link a note to a tweet. And, for the moment, it's acceptable when importing from *key, because the export doesn't contain the instance URL, so we can't resolve ids to remote notes. It's less fine when importing from Mastodon / Pleroma / Akkoma, because in those cases we _can_ link to the remote note that the user was replying to. This commit makes `recreateChain` optionally return "orphaned" note trees, so in the (near) future we can use it to properly thread imported notes from those services. --- .../processors/ImportNotesProcessorService.ts | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/packages/backend/src/queue/processors/ImportNotesProcessorService.ts b/packages/backend/src/queue/processors/ImportNotesProcessorService.ts index cbe1d41e35..5b167e46b2 100644 --- a/packages/backend/src/queue/processors/ImportNotesProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportNotesProcessorService.ts @@ -74,7 +74,7 @@ export class ImportNotesProcessorService { // Function was taken from Firefish and modified for our needs @bindThis - private async recreateChain(idField: string, replyField: string, arr: any[]): Promise { + private async recreateChain(idField: string, replyField: string, arr: any[], includeOrphans: boolean): Promise { type NotesMap = { [id: string]: any; }; @@ -83,28 +83,36 @@ export class ImportNotesProcessorService { const notesWaitingForParent: NotesMap = {}; for await (const note of arr) { - noteById[note[idField]] = note; + const noteId = note[idField]; + + noteById[noteId] = note; note.childNotes = []; - const children = notesWaitingForParent[note[idField]]; + const children = notesWaitingForParent[noteId]; if (children) { note.childNotes.push(...children); + delete notesWaitingForParent[noteId]; } - if (note[replyField] == null) { + const noteReplyId = note[replyField]; + if (noteReplyId == null) { notesTree.push(note); continue; } - const parent = noteById[note[replyField]]; + const parent = noteById[noteReplyId]; if (parent) { parent.childNotes.push(note); } else { - notesWaitingForParent[note[replyField]] ||= []; - notesWaitingForParent[note[replyField]].push(note); + notesWaitingForParent[noteReplyId] ||= []; + notesWaitingForParent[noteReplyId].push(note); } } + if (includeOrphans) { + notesTree.push(...Object.values(notesWaitingForParent).flat(1)); + } + return notesTree; } @@ -176,7 +184,7 @@ export class ImportNotesProcessorService { const tweets = Object.keys(fakeWindow.window.YTD.tweets.part0).reduce((m, key, i, obj) => { return m.concat(fakeWindow.window.YTD.tweets.part0[key].tweet); }, []); - const processedTweets = await this.recreateChain('id_str', 'in_reply_to_status_id_str', tweets); + const processedTweets = await this.recreateChain('id_str', 'in_reply_to_status_id_str', tweets, false); this.queueService.createImportTweetsToDbJob(job.data.user, processedTweets, null); } finally { cleanup(); @@ -289,7 +297,7 @@ export class ImportNotesProcessorService { const notesJson = fs.readFileSync(path, 'utf-8'); const notes = JSON.parse(notesJson); - const processedNotes = await this.recreateChain('id', 'replyId', notes); + const processedNotes = await this.recreateChain('id', 'replyId', notes, false); this.queueService.createImportKeyNotesToDbJob(job.data.user, processedNotes, null); cleanup(); } From 15503b96a07f1c522197ced44391ed227f3b4a7f Mon Sep 17 00:00:00 2001 From: dakkar Date: Thu, 30 Nov 2023 12:23:09 +0000 Subject: [PATCH 2/4] properly thread Mastodon imports --- packages/backend/src/core/QueueService.ts | 4 +-- .../processors/ImportNotesProcessorService.ts | 31 ++++++++++++------- packages/backend/src/queue/types.ts | 8 ++--- 3 files changed, 25 insertions(+), 18 deletions(-) diff --git a/packages/backend/src/core/QueueService.ts b/packages/backend/src/core/QueueService.ts index 0230c9a7b8..6acdc0ad89 100644 --- a/packages/backend/src/core/QueueService.ts +++ b/packages/backend/src/core/QueueService.ts @@ -278,8 +278,8 @@ export class QueueService { } @bindThis - public createImportMastoToDbJob(user: ThinUser, targets: string[]) { - const jobs = targets.map(rel => this.generateToDbJobData('importMastoToDb', { user, target: rel })); + public createImportMastoToDbJob(user: ThinUser, targets: string[], note: MiNote['id'] | null) { + const jobs = targets.map(rel => this.generateToDbJobData('importMastoToDb', { user, target: rel, note })); return this.dbQueue.addBulk(jobs); } diff --git a/packages/backend/src/queue/processors/ImportNotesProcessorService.ts b/packages/backend/src/queue/processors/ImportNotesProcessorService.ts index 5b167e46b2..49c8530b39 100644 --- a/packages/backend/src/queue/processors/ImportNotesProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportNotesProcessorService.ts @@ -17,7 +17,7 @@ import { extractApHashtagObjects } from '@/core/activitypub/models/tag.js'; import { IdService } from '@/core/IdService.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type * as Bull from 'bullmq'; -import type { DbNoteImportToDbJobData, DbNoteImportJobData, DbKeyNoteImportToDbJobData } from '../types.js'; +import type { DbNoteImportToDbJobData, DbNoteImportJobData, DbNoteWithParentImportToDbJobData } from '../types.js'; @Injectable() export class ImportNotesProcessorService { @@ -274,7 +274,8 @@ export class ImportNotesProcessorService { if (fs.existsSync(outputPath + '/media_attachments/files') && mastoFolder) { await this.uploadFiles(outputPath + '/media_attachments/files', user, mastoFolder.id); } - this.queueService.createImportMastoToDbJob(job.data.user, outbox.orderedItems.filter((x: any) => x.type === 'Create' && x.object.type === 'Note')); + const processedToots = await this.recreateChain('id', 'inReplyTo', outbox.orderedItems.filter((x: any) => x.type === 'Create' && x.object.type === 'Note'), true); + this.queueService.createImportMastoToDbJob(job.data.user, processedToots, null); } } } finally { @@ -306,7 +307,7 @@ export class ImportNotesProcessorService { } @bindThis - public async processKeyNotesToDb(job: Bull.Job): Promise { + public async processKeyNotesToDb(job: Bull.Job): Promise { const note = job.data.target; const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { @@ -363,28 +364,33 @@ export class ImportNotesProcessorService { } @bindThis - public async processMastoToDb(job: Bull.Job): Promise { + public async processMastoToDb(job: Bull.Job): Promise { const toot = job.data.target; const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { return; } + if (toot.directMessage) return; + const date = new Date(toot.object.published); let text = undefined; const files: MiDriveFile[] = []; let reply: MiNote | null = null; if (toot.object.inReplyTo != null) { - try { - reply = await this.apNoteService.resolveNote(toot.object.inReplyTo); - } catch (error) { - reply = null; + const parentNote = job.data.note ? await this.notesRepository.findOneBy({ id: job.data.note }) : null; + if (parentNote) { + reply = parentNote; + } else { + try { + reply = await this.apNoteService.resolveNote(toot.object.inReplyTo); + } catch (error) { + reply = null; + } } } - if (toot.directMessage) return; - const hashtags = extractApHashtagObjects(toot.object.tag).map((x) => x.name).filter((x): x is string => x != null); try { @@ -404,7 +410,8 @@ export class ImportNotesProcessorService { } } - await this.noteCreateService.import(user, { createdAt: date, text: text, files: files, apMentions: new Array(0), cw: toot.object.sensitive ? toot.object.summary : null, reply: reply }); + const createdNote = await this.noteCreateService.import(user, { createdAt: date, text: text, files: files, apMentions: new Array(0), cw: toot.object.sensitive ? toot.object.summary : null, reply: reply }); + if (toot.childNotes) this.queueService.createImportMastoToDbJob(user, toot.childNotes, createdNote.id); } @bindThis @@ -525,7 +532,7 @@ export class ImportNotesProcessorService { } @bindThis - public async processTwitterDb(job: Bull.Job): Promise { + public async processTwitterDb(job: Bull.Job): Promise { const tweet = job.data.target; const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { diff --git a/packages/backend/src/queue/types.ts b/packages/backend/src/queue/types.ts index 8d09e4e197..4404357549 100644 --- a/packages/backend/src/queue/types.ts +++ b/packages/backend/src/queue/types.ts @@ -50,12 +50,12 @@ export type DbJobMap = { exportUserLists: DbJobDataWithUser; importAntennas: DBAntennaImportJobData; importNotes: DbNoteImportJobData; - importTweetsToDb: DbKeyNoteImportToDbJobData; + importTweetsToDb: DbNoteWithParentImportToDbJobData; importIGToDb: DbNoteImportToDbJobData; importFBToDb: DbNoteImportToDbJobData; - importMastoToDb: DbNoteImportToDbJobData; + importMastoToDb: DbNoteWithParentImportToDbJobData; importPleroToDb: DbNoteImportToDbJobData; - importKeyNotesToDb: DbKeyNoteImportToDbJobData; + importKeyNotesToDb: DbNoteWithParentImportToDbJobData; importFollowing: DbUserImportJobData; importFollowingToDb: DbUserImportToDbJobData; importMuting: DbUserImportJobData; @@ -113,7 +113,7 @@ export type DbNoteImportToDbJobData = { target: any; }; -export type DbKeyNoteImportToDbJobData = { +export type DbNoteWithParentImportToDbJobData = { user: ThinUser; target: any; note: MiNote['id'] | null; From c59e74dfd5081603b881f6c0452596af34d7e04e Mon Sep 17 00:00:00 2001 From: dakkar Date: Thu, 30 Nov 2023 13:13:41 +0000 Subject: [PATCH 3/4] fix chaining for Mastodon notes the id / replyId are not at the top level, so now `recreateChain` takes a list of keys to walk, not just a single key --- .../processors/ImportNotesProcessorService.ts | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/packages/backend/src/queue/processors/ImportNotesProcessorService.ts b/packages/backend/src/queue/processors/ImportNotesProcessorService.ts index 49c8530b39..b9a3645f6d 100644 --- a/packages/backend/src/queue/processors/ImportNotesProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportNotesProcessorService.ts @@ -74,7 +74,7 @@ export class ImportNotesProcessorService { // Function was taken from Firefish and modified for our needs @bindThis - private async recreateChain(idField: string, replyField: string, arr: any[], includeOrphans: boolean): Promise { + private async recreateChain(idFieldPath: string[], replyFieldPath: string[], arr: any[], includeOrphans: boolean): Promise { type NotesMap = { [id: string]: any; }; @@ -83,7 +83,10 @@ export class ImportNotesProcessorService { const notesWaitingForParent: NotesMap = {}; for await (const note of arr) { - const noteId = note[idField]; + const noteId = idFieldPath.reduce( + (obj, step) => obj[step], + note, + ); noteById[noteId] = note; note.childNotes = []; @@ -94,7 +97,10 @@ export class ImportNotesProcessorService { delete notesWaitingForParent[noteId]; } - const noteReplyId = note[replyField]; + const noteReplyId = replyFieldPath.reduce( + (obj, step) => obj[step], + note, + ); if (noteReplyId == null) { notesTree.push(note); continue; @@ -184,7 +190,7 @@ export class ImportNotesProcessorService { const tweets = Object.keys(fakeWindow.window.YTD.tweets.part0).reduce((m, key, i, obj) => { return m.concat(fakeWindow.window.YTD.tweets.part0[key].tweet); }, []); - const processedTweets = await this.recreateChain('id_str', 'in_reply_to_status_id_str', tweets, false); + const processedTweets = await this.recreateChain(['id_str'], ['in_reply_to_status_id_str'], tweets, false); this.queueService.createImportTweetsToDbJob(job.data.user, processedTweets, null); } finally { cleanup(); @@ -274,7 +280,7 @@ export class ImportNotesProcessorService { if (fs.existsSync(outputPath + '/media_attachments/files') && mastoFolder) { await this.uploadFiles(outputPath + '/media_attachments/files', user, mastoFolder.id); } - const processedToots = await this.recreateChain('id', 'inReplyTo', outbox.orderedItems.filter((x: any) => x.type === 'Create' && x.object.type === 'Note'), true); + const processedToots = await this.recreateChain(['object', 'id'], ['object', 'inReplyTo'], outbox.orderedItems.filter((x: any) => x.type === 'Create' && x.object.type === 'Note'), true); this.queueService.createImportMastoToDbJob(job.data.user, processedToots, null); } } @@ -298,7 +304,7 @@ export class ImportNotesProcessorService { const notesJson = fs.readFileSync(path, 'utf-8'); const notes = JSON.parse(notesJson); - const processedNotes = await this.recreateChain('id', 'replyId', notes, false); + const processedNotes = await this.recreateChain(['id'], ['replyId'], notes, false); this.queueService.createImportKeyNotesToDbJob(job.data.user, processedNotes, null); cleanup(); } From c958d935e4e5b33b28c242a0de2c4319859f8541 Mon Sep 17 00:00:00 2001 From: dakkar Date: Thu, 30 Nov 2023 13:24:57 +0000 Subject: [PATCH 4/4] thread Pleroma imports as well I have _not_ tested this, but it should work fine, those exports are the same shape as Mastodon's --- packages/backend/src/core/QueueService.ts | 4 +-- .../processors/ImportNotesProcessorService.ts | 25 ++++++++++++------- packages/backend/src/queue/types.ts | 2 +- 3 files changed, 19 insertions(+), 12 deletions(-) diff --git a/packages/backend/src/core/QueueService.ts b/packages/backend/src/core/QueueService.ts index 6acdc0ad89..2ee61eb549 100644 --- a/packages/backend/src/core/QueueService.ts +++ b/packages/backend/src/core/QueueService.ts @@ -284,8 +284,8 @@ export class QueueService { } @bindThis - public createImportPleroToDbJob(user: ThinUser, targets: string[]) { - const jobs = targets.map(rel => this.generateToDbJobData('importPleroToDb', { user, target: rel })); + public createImportPleroToDbJob(user: ThinUser, targets: string[], note: MiNote['id'] | null) { + const jobs = targets.map(rel => this.generateToDbJobData('importPleroToDb', { user, target: rel, note })); return this.dbQueue.addBulk(jobs); } diff --git a/packages/backend/src/queue/processors/ImportNotesProcessorService.ts b/packages/backend/src/queue/processors/ImportNotesProcessorService.ts index b9a3645f6d..552b69d92d 100644 --- a/packages/backend/src/queue/processors/ImportNotesProcessorService.ts +++ b/packages/backend/src/queue/processors/ImportNotesProcessorService.ts @@ -268,7 +268,8 @@ export class ImportNotesProcessorService { if (isPleroma) { const outboxJson = fs.readFileSync(outputPath + '/outbox.json', 'utf-8'); const outbox = JSON.parse(outboxJson); - this.queueService.createImportPleroToDbJob(job.data.user, outbox.orderedItems.filter((x: any) => x.type === 'Create' && x.object.type === 'Note')); + const processedToots = await this.recreateChain(['object', 'id'], ['object', 'inReplyTo'], outbox.orderedItems.filter((x: any) => x.type === 'Create' && x.object.type === 'Note'), true); + this.queueService.createImportPleroToDbJob(job.data.user, processedToots, null); } else { const outboxJson = fs.readFileSync(outputPath + '/outbox.json', 'utf-8'); const outbox = JSON.parse(outboxJson); @@ -421,13 +422,15 @@ export class ImportNotesProcessorService { } @bindThis - public async processPleroToDb(job: Bull.Job): Promise { + public async processPleroToDb(job: Bull.Job): Promise { const post = job.data.target; const user = await this.usersRepository.findOneBy({ id: job.data.user.id }); if (user == null) { return; } + if (post.directMessage) return; + const date = new Date(post.object.published); let text = undefined; const files: MiDriveFile[] = []; @@ -437,15 +440,18 @@ export class ImportNotesProcessorService { if (folder == null) return; if (post.object.inReplyTo != null) { - try { - reply = await this.apNoteService.resolveNote(post.object.inReplyTo); - } catch (error) { - reply = null; + const parentNote = job.data.note ? await this.notesRepository.findOneBy({ id: job.data.note }) : null; + if (parentNote) { + reply = parentNote; + } else { + try { + reply = await this.apNoteService.resolveNote(post.object.inReplyTo); + } catch (error) { + reply = null; + } } } - if (post.directMessage) return; - const hashtags = extractApHashtagObjects(post.object.tag).map((x) => x.name).filter((x): x is string => x != null); try { @@ -489,7 +495,8 @@ export class ImportNotesProcessorService { } } - await this.noteCreateService.import(user, { createdAt: date, text: text, files: files, apMentions: new Array(0), cw: post.object.sensitive ? post.object.summary : null, reply: reply }); + const createdNote = await this.noteCreateService.import(user, { createdAt: date, text: text, files: files, apMentions: new Array(0), cw: post.object.sensitive ? post.object.summary : null, reply: reply }); + if (post.childNotes) this.queueService.createImportPleroToDbJob(user, post.childNotes, createdNote.id); } @bindThis diff --git a/packages/backend/src/queue/types.ts b/packages/backend/src/queue/types.ts index 4404357549..432b3d364f 100644 --- a/packages/backend/src/queue/types.ts +++ b/packages/backend/src/queue/types.ts @@ -54,7 +54,7 @@ export type DbJobMap = { importIGToDb: DbNoteImportToDbJobData; importFBToDb: DbNoteImportToDbJobData; importMastoToDb: DbNoteWithParentImportToDbJobData; - importPleroToDb: DbNoteImportToDbJobData; + importPleroToDb: DbNoteWithParentImportToDbJobData; importKeyNotesToDb: DbNoteWithParentImportToDbJobData; importFollowing: DbUserImportJobData; importFollowingToDb: DbUserImportToDbJobData;