From 311a31da58ebfc079653f860ea4cf4ed9a051d42 Mon Sep 17 00:00:00 2001 From: dakkar Date: Thu, 15 Aug 2024 11:35:51 +0100 Subject: [PATCH 1/8] rough rate limiting for websockets --- .../server/api/StreamingApiServerService.ts | 48 +++++++++++++++++++ .../src/server/api/stream/Connection.ts | 7 +++ 2 files changed, 55 insertions(+) diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts index b8f448477b..7ac1bcf469 100644 --- a/packages/backend/src/server/api/StreamingApiServerService.ts +++ b/packages/backend/src/server/api/StreamingApiServerService.ts @@ -19,7 +19,12 @@ import { ChannelFollowingService } from '@/core/ChannelFollowingService.js'; import { AuthenticateService, AuthenticationError } from './AuthenticateService.js'; import MainStreamConnection from './stream/Connection.js'; import { ChannelsService } from './stream/ChannelsService.js'; +import { RateLimiterService } from './RateLimiterService.js'; +import { RoleService } from '@/core/RoleService.js'; +import { getIpHash } from '@/misc/get-ip-hash.js'; +import ms from 'ms'; import type * as http from 'node:http'; +import type { IEndpointMeta } from './endpoints.js'; @Injectable() export class StreamingApiServerService { @@ -41,9 +46,32 @@ export class StreamingApiServerService { private notificationService: NotificationService, private usersService: UserService, private channelFollowingService: ChannelFollowingService, + private rateLimiterService: RateLimiterService, + private roleService: RoleService, ) { } + @bindThis + private async rateLimitThis( + user: MiLocalUser | null | undefined, + requestIp: string | undefined, + limit: IEndpointMeta['limit'] & { key: NonNullable }, + ) : Promise { + let limitActor: string; + if (user) { + limitActor = user.id; + } else { + limitActor = getIpHash(requestIp || 'wtf'); + } + + const factor = user ? (await this.roleService.getUserPolicies(user.id)).rateLimitFactor : 1; + + if (factor <= 0) return false; + + // Rate limit + return await this.rateLimiterService.limit(limit, limitActor, factor).then(() => { return false }).catch(err => { return true }); + } + @bindThis public attach(server: http.Server): void { this.#wss = new WebSocket.WebSocketServer({ @@ -57,6 +85,17 @@ export class StreamingApiServerService { return; } + if (await this.rateLimitThis(null, request.socket.remoteAddress, { + key: 'wsconnect', + duration: ms('1min'), + max: 20, + minInterval: ms('1sec'), + })) { + socket.write('HTTP/1.1 429 Rate Limit Exceeded\r\n\r\n'); + socket.destroy(); + return; + } + const q = new URL(request.url, `http://${request.headers.host}`).searchParams; let user: MiLocalUser | null = null; @@ -94,6 +133,14 @@ export class StreamingApiServerService { return; } + const rateLimiter = () => { + return this.rateLimitThis(user, request.socket.remoteAddress, { + key: 'wsmessage', + duration: ms('1sec'), + max: 100, + }); + }; + const stream = new MainStreamConnection( this.channelsService, this.noteReadService, @@ -101,6 +148,7 @@ export class StreamingApiServerService { this.cacheService, this.channelFollowingService, user, app, + rateLimiter, ); await stream.init(); diff --git a/packages/backend/src/server/api/stream/Connection.ts b/packages/backend/src/server/api/stream/Connection.ts index 7dd7db24e5..dfc6f0d298 100644 --- a/packages/backend/src/server/api/stream/Connection.ts +++ b/packages/backend/src/server/api/stream/Connection.ts @@ -25,6 +25,7 @@ import type Channel from './channel.js'; export default class Connection { public user?: MiUser; public token?: MiAccessToken; + private rateLimiter?: () => Promise; private wsConnection: WebSocket.WebSocket; public subscriber: StreamEventEmitter; private channels: Channel[] = []; @@ -48,9 +49,11 @@ export default class Connection { user: MiUser | null | undefined, token: MiAccessToken | null | undefined, + rateLimiter: () => Promise, ) { if (user) this.user = user; if (token) this.token = token; + if (rateLimiter) this.rateLimiter = rateLimiter; } @bindThis @@ -103,6 +106,10 @@ export default class Connection { private async onWsConnectionMessage(data: WebSocket.RawData) { let obj: Record; + if (this.rateLimiter && await this.rateLimiter()) { + return; + } + try { obj = JSON.parse(data.toString()); } catch (e) { From 4cd44130e0abd47f1f9c4b7fd74c5c49c16bd79c Mon Sep 17 00:00:00 2001 From: dakkar Date: Fri, 16 Aug 2024 18:00:50 +0100 Subject: [PATCH 2/8] use the correct remote address we're doing the same thing that Fastify does in the non-streaming ServerService --- packages/backend/package.json | 1 + .../src/server/api/StreamingApiServerService.ts | 14 +++++++++++--- pnpm-lock.yaml | 3 +++ 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/packages/backend/package.json b/packages/backend/package.json index 8e8d76bf23..65eda6153c 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -156,6 +156,7 @@ "pkce-challenge": "4.1.0", "probe-image-size": "7.2.3", "promise-limit": "2.7.0", + "proxy-addr": "^2.0.7", "pug": "3.0.2", "punycode": "2.3.1", "qrcode": "1.5.3", diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts index 7ac1bcf469..1435169812 100644 --- a/packages/backend/src/server/api/StreamingApiServerService.ts +++ b/packages/backend/src/server/api/StreamingApiServerService.ts @@ -22,6 +22,7 @@ import { ChannelsService } from './stream/ChannelsService.js'; import { RateLimiterService } from './RateLimiterService.js'; import { RoleService } from '@/core/RoleService.js'; import { getIpHash } from '@/misc/get-ip-hash.js'; +import proxyAddr from 'proxy-addr'; import ms from 'ms'; import type * as http from 'node:http'; import type { IEndpointMeta } from './endpoints.js'; @@ -69,7 +70,9 @@ export class StreamingApiServerService { if (factor <= 0) return false; // Rate limit - return await this.rateLimiterService.limit(limit, limitActor, factor).then(() => { return false }).catch(err => { return true }); + return await this.rateLimiterService.limit(limit, limitActor, factor) + .then(() => { return false; }) + .catch(err => { return true; }); } @bindThis @@ -85,7 +88,12 @@ export class StreamingApiServerService { return; } - if (await this.rateLimitThis(null, request.socket.remoteAddress, { + // ServerServices sets `trustProxy: true`, which inside + // fastify/request.js ends up calling `proxyAddr` in this way, + // so we do the same + const requestIp = proxyAddr(request, () => { return true; } ); + + if (await this.rateLimitThis(null, requestIp, { key: 'wsconnect', duration: ms('1min'), max: 20, @@ -134,7 +142,7 @@ export class StreamingApiServerService { } const rateLimiter = () => { - return this.rateLimitThis(user, request.socket.remoteAddress, { + return this.rateLimitThis(user, requestIp, { key: 'wsmessage', duration: ms('1sec'), max: 100, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 1f3cd8216f..d6e9f1196a 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -352,6 +352,9 @@ importers: promise-limit: specifier: 2.7.0 version: 2.7.0 + proxy-addr: + specifier: ^2.0.7 + version: 2.0.7 pug: specifier: 3.0.2 version: 3.0.2 From 6d3f9503ed1fd04718396b248cc5a753245c0f67 Mon Sep 17 00:00:00 2001 From: Julia Johannesen Date: Fri, 16 Aug 2024 17:13:20 -0400 Subject: [PATCH 3/8] Limit number of rate limit requests --- .../server/api/StreamingApiServerService.ts | 5 ++++ .../src/server/api/stream/Connection.ts | 26 +++++++++++++++++-- 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts index 1435169812..f48af45fb1 100644 --- a/packages/backend/src/server/api/StreamingApiServerService.ts +++ b/packages/backend/src/server/api/StreamingApiServerService.ts @@ -26,12 +26,15 @@ import proxyAddr from 'proxy-addr'; import ms from 'ms'; import type * as http from 'node:http'; import type { IEndpointMeta } from './endpoints.js'; +import { LoggerService } from '@/core/LoggerService.js'; +import type Logger from '@/logger.js'; @Injectable() export class StreamingApiServerService { #wss: WebSocket.WebSocketServer; #connections = new Map(); #cleanConnectionsIntervalId: NodeJS.Timeout | null = null; + #logger: Logger; constructor( @Inject(DI.redisForSub) @@ -49,6 +52,7 @@ export class StreamingApiServerService { private channelFollowingService: ChannelFollowingService, private rateLimiterService: RateLimiterService, private roleService: RoleService, + private loggerService: LoggerService, ) { } @@ -155,6 +159,7 @@ export class StreamingApiServerService { this.notificationService, this.cacheService, this.channelFollowingService, + this.loggerService, user, app, rateLimiter, ); diff --git a/packages/backend/src/server/api/stream/Connection.ts b/packages/backend/src/server/api/stream/Connection.ts index dfc6f0d298..0a7828d163 100644 --- a/packages/backend/src/server/api/stream/Connection.ts +++ b/packages/backend/src/server/api/stream/Connection.ts @@ -17,6 +17,8 @@ import { ChannelFollowingService } from '@/core/ChannelFollowingService.js'; import type { ChannelsService } from './ChannelsService.js'; import type { EventEmitter } from 'events'; import type Channel from './channel.js'; +import { LoggerService } from '@/core/LoggerService.js'; +import type Logger from '@/logger.js'; /** * Main stream connection @@ -39,6 +41,9 @@ export default class Connection { public userIdsWhoMeMutingRenotes: Set = new Set(); public userMutedInstances: Set = new Set(); private fetchIntervalId: NodeJS.Timeout | null = null; + private activeRateLimitRequests: number = 0; + private closingConnection: boolean = false; + private logger: Logger; constructor( private channelsService: ChannelsService, @@ -46,6 +51,7 @@ export default class Connection { private notificationService: NotificationService, private cacheService: CacheService, private channelFollowingService: ChannelFollowingService, + private loggerService: LoggerService, user: MiUser | null | undefined, token: MiAccessToken | null | undefined, @@ -54,6 +60,8 @@ export default class Connection { if (user) this.user = user; if (token) this.token = token; if (rateLimiter) this.rateLimiter = rateLimiter; + + this.logger = loggerService.getLogger('streaming', 'coral', false); } @bindThis @@ -106,8 +114,22 @@ export default class Connection { private async onWsConnectionMessage(data: WebSocket.RawData) { let obj: Record; - if (this.rateLimiter && await this.rateLimiter()) { - return; + if (this.closingConnection) return; + + if (this.rateLimiter) { + if (this.activeRateLimitRequests <= 128) { + this.activeRateLimitRequests++; + const shouldRateLimit = await this.rateLimiter(); + this.activeRateLimitRequests--; + + if (shouldRateLimit) return; + if (this.closingConnection) return; + } else { + this.logger.warn('Closing a connection due to an excessive influx of messages.'); + this.closingConnection = true; + this.wsConnection.close(1008, 'Please stop spamming the streaming API.'); + return; + } } try { From 76bbc10aa1305ce88c61783a6ac8aec4371eaa2b Mon Sep 17 00:00:00 2001 From: syuilo <4439005+syuilo@users.noreply.github.com> Date: Sat, 17 Aug 2024 09:30:03 +0900 Subject: [PATCH 4/8] =?UTF-8?q?fix(backend):=20=E7=84=A1=E5=88=B6=E9=99=90?= =?UTF-8?q?=E3=81=AB=E3=82=B9=E3=83=88=E3=83=AA=E3=83=BC=E3=83=9F=E3=83=B3?= =?UTF-8?q?=E3=82=B0=E3=81=AE=E3=83=81=E3=83=A3=E3=83=B3=E3=83=8D=E3=83=AB?= =?UTF-8?q?=E3=81=AB=E6=8E=A5=E7=B6=9A=E3=81=A7=E3=81=8D=E3=82=8B=E5=95=8F?= =?UTF-8?q?=E9=A1=8C=E3=82=92=E4=BF=AE=E6=AD=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/backend/src/server/api/stream/Connection.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/packages/backend/src/server/api/stream/Connection.ts b/packages/backend/src/server/api/stream/Connection.ts index 0a7828d163..e8cd557c1c 100644 --- a/packages/backend/src/server/api/stream/Connection.ts +++ b/packages/backend/src/server/api/stream/Connection.ts @@ -20,6 +20,8 @@ import type Channel from './channel.js'; import { LoggerService } from '@/core/LoggerService.js'; import type Logger from '@/logger.js'; +const MAX_CHANNELS_PER_CONNECTION = 32; + /** * Main stream connection */ @@ -283,6 +285,10 @@ export default class Connection { */ @bindThis public connectChannel(id: string, params: any, channel: string, pong = false) { + if (this.channels.length >= MAX_CHANNELS_PER_CONNECTION) { + return; + } + const channelService = this.channelsService.getChannelService(channel); if (channelService.requireCredential && this.user == null) { From 9c1c1e9f099c25cf33f5804fdf3fba547ba73c92 Mon Sep 17 00:00:00 2001 From: Julia Johannesen Date: Sat, 17 Aug 2024 13:08:46 -0400 Subject: [PATCH 5/8] Fix logging stuff --- packages/backend/src/server/api/StreamingApiServerService.ts | 1 - packages/backend/src/server/api/stream/Connection.ts | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts index f48af45fb1..db948122bf 100644 --- a/packages/backend/src/server/api/StreamingApiServerService.ts +++ b/packages/backend/src/server/api/StreamingApiServerService.ts @@ -34,7 +34,6 @@ export class StreamingApiServerService { #wss: WebSocket.WebSocketServer; #connections = new Map(); #cleanConnectionsIntervalId: NodeJS.Timeout | null = null; - #logger: Logger; constructor( @Inject(DI.redisForSub) diff --git a/packages/backend/src/server/api/stream/Connection.ts b/packages/backend/src/server/api/stream/Connection.ts index e8cd557c1c..0914cdbb22 100644 --- a/packages/backend/src/server/api/stream/Connection.ts +++ b/packages/backend/src/server/api/stream/Connection.ts @@ -53,7 +53,7 @@ export default class Connection { private notificationService: NotificationService, private cacheService: CacheService, private channelFollowingService: ChannelFollowingService, - private loggerService: LoggerService, + loggerService: LoggerService, user: MiUser | null | undefined, token: MiAccessToken | null | undefined, From aff57333d54e449c7b0096b5bbadd70268ab24af Mon Sep 17 00:00:00 2001 From: Julia Johannesen Date: Sat, 17 Aug 2024 13:12:16 -0400 Subject: [PATCH 6/8] Add @types/proxy-addr --- packages/backend/package.json | 1 + pnpm-lock.yaml | 18 ++++++++++++++---- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/packages/backend/package.json b/packages/backend/package.json index 65eda6153c..d7235de63e 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -217,6 +217,7 @@ "@types/oauth2orize": "1.11.5", "@types/oauth2orize-pkce": "0.1.2", "@types/pg": "8.11.5", + "@types/proxy-addr": "^2.0.3", "@types/pug": "2.0.10", "@types/punycode": "2.1.4", "@types/qrcode": "1.5.5", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index d6e9f1196a..8a538b4f3d 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -612,6 +612,9 @@ importers: '@types/pg': specifier: 8.11.5 version: 8.11.5 + '@types/proxy-addr': + specifier: ^2.0.3 + version: 2.0.3 '@types/pug': specifier: 2.0.10 version: 2.0.10 @@ -4731,6 +4734,9 @@ packages: '@types/prop-types@15.7.5': resolution: {integrity: sha512-JCB8C6SnDoQf0cNycqd/35A7MjcnK+ZTqE7judS6o7utxUCg6imJg3QK2qzHKszlTjcj2cn+NwMB2i96ubpj7w==} + '@types/proxy-addr@2.0.3': + resolution: {integrity: sha512-TgAHHO4tNG3HgLTUhB+hM4iwW6JUNeQHCLnF1DjaDA9c69PN+IasoFu2MYDhubFc+ZIw5c5t9DMtjvrD6R3Egg==} + '@types/pug@2.0.10': resolution: {integrity: sha512-Sk/uYFOBAB7mb74XcpizmH0KOR2Pv3D2Hmrh1Dmy5BmK3MpdSa5kqZcg6EKBdklU0bFXX9gCfzvpnyUehrPIuA==} @@ -11115,8 +11121,8 @@ packages: vue-component-type-helpers@2.0.16: resolution: {integrity: sha512-qisL/iAfdO++7w+SsfYQJVPj6QKvxp4i1MMxvsNO41z/8zu3KuAw9LkhKUfP/kcOWGDxESp+pQObWppXusejCA==} - vue-component-type-helpers@2.0.26: - resolution: {integrity: sha512-sO9qQ8oC520SW6kqlls0iqDak53gsTVSrYylajgjmkt1c0vcgjsGSy1KzlDrbEx8pm02IEYhlUkU5hCYf8rwtg==} + vue-component-type-helpers@2.0.29: + resolution: {integrity: sha512-58i+ZhUAUpwQ+9h5Hck0D+jr1qbYl4voRt5KffBx8qzELViQ4XdT/Tuo+mzq8u63teAG8K0lLaOiL5ofqW38rg==} vue-demi@0.14.7: resolution: {integrity: sha512-EOG8KXDQNwkJILkx/gPcoL/7vH+hORoBaKgGe+6W7VFMvCYJfmF2dGbvgDroVnI8LU7/kTu8mbjRZGBU1z9NTA==} @@ -15459,7 +15465,7 @@ snapshots: ts-dedent: 2.2.0 type-fest: 2.19.0 vue: 3.4.26(typescript@5.4.5) - vue-component-type-helpers: 2.0.26 + vue-component-type-helpers: 2.0.29 transitivePeerDependencies: - encoding - supports-color @@ -15974,6 +15980,10 @@ snapshots: '@types/prop-types@15.7.5': {} + '@types/proxy-addr@2.0.3': + dependencies: + '@types/node': 20.12.7 + '@types/pug@2.0.10': {} '@types/punycode@2.1.4': {} @@ -23658,7 +23668,7 @@ snapshots: vue-component-type-helpers@2.0.16: {} - vue-component-type-helpers@2.0.26: {} + vue-component-type-helpers@2.0.29: {} vue-demi@0.14.7(vue@3.4.26(typescript@5.4.5)): dependencies: From c5f7dcbb7e4be0c8d73e69df56df1c05e3413e24 Mon Sep 17 00:00:00 2001 From: Julia Johannesen Date: Sat, 17 Aug 2024 14:17:58 -0400 Subject: [PATCH 7/8] Come up with better limits --- .../backend/src/server/api/StreamingApiServerService.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts index db948122bf..19c78fd4d1 100644 --- a/packages/backend/src/server/api/StreamingApiServerService.ts +++ b/packages/backend/src/server/api/StreamingApiServerService.ts @@ -98,8 +98,8 @@ export class StreamingApiServerService { if (await this.rateLimitThis(null, requestIp, { key: 'wsconnect', - duration: ms('1min'), - max: 20, + duration: ms('5min'), + max: 32, minInterval: ms('1sec'), })) { socket.write('HTTP/1.1 429 Rate Limit Exceeded\r\n\r\n'); @@ -147,8 +147,8 @@ export class StreamingApiServerService { const rateLimiter = () => { return this.rateLimitThis(user, requestIp, { key: 'wsmessage', - duration: ms('1sec'), - max: 100, + duration: ms('5sec'), + max: 256, }); }; From 3dd993a76a5e5d87a0b31e1eff5093958f239021 Mon Sep 17 00:00:00 2001 From: Julia Johannesen Date: Sat, 17 Aug 2024 14:27:43 -0400 Subject: [PATCH 8/8] Add IP and user ID to connection close message --- .../backend/src/server/api/StreamingApiServerService.ts | 2 +- packages/backend/src/server/api/stream/Connection.ts | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts index 19c78fd4d1..2070ab6106 100644 --- a/packages/backend/src/server/api/StreamingApiServerService.ts +++ b/packages/backend/src/server/api/StreamingApiServerService.ts @@ -159,7 +159,7 @@ export class StreamingApiServerService { this.cacheService, this.channelFollowingService, this.loggerService, - user, app, + user, app, requestIp, rateLimiter, ); diff --git a/packages/backend/src/server/api/stream/Connection.ts b/packages/backend/src/server/api/stream/Connection.ts index 0914cdbb22..b71a99b89e 100644 --- a/packages/backend/src/server/api/stream/Connection.ts +++ b/packages/backend/src/server/api/stream/Connection.ts @@ -57,6 +57,7 @@ export default class Connection { user: MiUser | null | undefined, token: MiAccessToken | null | undefined, + private ip: string, rateLimiter: () => Promise, ) { if (user) this.user = user; @@ -127,7 +128,10 @@ export default class Connection { if (shouldRateLimit) return; if (this.closingConnection) return; } else { - this.logger.warn('Closing a connection due to an excessive influx of messages.'); + let connectionInfo = `IP ${this.ip}`; + if (this.user) connectionInfo += `, user ID ${this.user.id}`; + + this.logger.warn(`Closing a connection (${connectionInfo}) due to an excessive influx of messages.`); this.closingConnection = true; this.wsConnection.close(1008, 'Please stop spamming the streaming API.'); return;