3fd2b55406
This reverts commit 89eea5df52
.
414 lines
11 KiB
TypeScript
414 lines
11 KiB
TypeScript
import WS from 'ws'
|
|
import dayjs, { Dayjs } from 'dayjs'
|
|
import { v4 as uuid } from 'uuid'
|
|
import { EventEmitter } from 'events'
|
|
import { WebSocketInterface } from '../megalodon'
|
|
import proxyAgent, { ProxyConfig } from '../proxy_config'
|
|
import MisskeyAPI from './api_client'
|
|
import { UnknownNotificationTypeError } from '../notification'
|
|
|
|
/**
|
|
* WebSocket
|
|
* Misskey is not support http streaming. It supports websocket instead of streaming.
|
|
* So this class connect to Misskey server with WebSocket.
|
|
*/
|
|
export default class WebSocket extends EventEmitter implements WebSocketInterface {
|
|
public url: string
|
|
public channel: 'user' | 'localTimeline' | 'hybridTimeline' | 'globalTimeline' | 'conversation' | 'list'
|
|
public parser: Parser
|
|
public headers: { [key: string]: string }
|
|
public proxyConfig: ProxyConfig | false = false
|
|
public listId: string | null = null
|
|
private _accessToken: string
|
|
private _reconnectInterval: number
|
|
private _reconnectMaxAttempts: number
|
|
private _reconnectCurrentAttempts: number
|
|
private _connectionClosed: boolean
|
|
private _client: WS | null = null
|
|
private _channelID: string
|
|
private _pongReceivedTimestamp: Dayjs
|
|
private _heartbeatInterval: number = 60000
|
|
private _pongWaiting: boolean = false
|
|
|
|
/**
|
|
* @param url Full url of websocket: e.g. wss://misskey.io/streaming
|
|
* @param channel Channel name is user, localTimeline, hybridTimeline, globalTimeline, conversation or list.
|
|
* @param accessToken The access token.
|
|
* @param listId This parameter is required when you specify list as channel.
|
|
*/
|
|
constructor(
|
|
url: string,
|
|
channel: 'user' | 'localTimeline' | 'hybridTimeline' | 'globalTimeline' | 'conversation' | 'list',
|
|
accessToken: string,
|
|
listId: string | undefined,
|
|
userAgent: string,
|
|
proxyConfig: ProxyConfig | false = false
|
|
) {
|
|
super()
|
|
this.url = url
|
|
this.parser = new Parser()
|
|
this.channel = channel
|
|
this.headers = {
|
|
'User-Agent': userAgent
|
|
}
|
|
if (listId === undefined) {
|
|
this.listId = null
|
|
} else {
|
|
this.listId = listId
|
|
}
|
|
this.proxyConfig = proxyConfig
|
|
this._accessToken = accessToken
|
|
this._reconnectInterval = 10000
|
|
this._reconnectMaxAttempts = Infinity
|
|
this._reconnectCurrentAttempts = 0
|
|
this._connectionClosed = false
|
|
this._channelID = uuid()
|
|
this._pongReceivedTimestamp = dayjs()
|
|
}
|
|
|
|
/**
|
|
* Start websocket connection.
|
|
*/
|
|
public start() {
|
|
this._connectionClosed = false
|
|
this._resetRetryParams()
|
|
this._startWebSocketConnection()
|
|
}
|
|
|
|
/**
|
|
* Reset connection and start new websocket connection.
|
|
*/
|
|
private _startWebSocketConnection() {
|
|
this._resetConnection()
|
|
this._setupParser()
|
|
this._client = this._connect()
|
|
this._bindSocket(this._client)
|
|
}
|
|
|
|
/**
|
|
* Stop current connection.
|
|
*/
|
|
public stop() {
|
|
this._connectionClosed = true
|
|
this._resetConnection()
|
|
this._resetRetryParams()
|
|
}
|
|
|
|
/**
|
|
* Clean up current connection, and listeners.
|
|
*/
|
|
private _resetConnection() {
|
|
if (this._client) {
|
|
this._client.close(1000)
|
|
this._client.removeAllListeners()
|
|
this._client = null
|
|
}
|
|
|
|
if (this.parser) {
|
|
this.parser.removeAllListeners()
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Resets the parameters used in reconnect.
|
|
*/
|
|
private _resetRetryParams() {
|
|
this._reconnectCurrentAttempts = 0
|
|
}
|
|
|
|
/**
|
|
* Connect to the endpoint.
|
|
*/
|
|
private _connect(): WS {
|
|
let options: WS.ClientOptions = {
|
|
headers: this.headers
|
|
}
|
|
if (this.proxyConfig) {
|
|
options = Object.assign(options, {
|
|
agent: proxyAgent(this.proxyConfig)
|
|
})
|
|
}
|
|
const cli: WS = new WS(`${this.url}?i=${this._accessToken}`, options)
|
|
return cli
|
|
}
|
|
|
|
/**
|
|
* Connect specified channels in websocket.
|
|
*/
|
|
private _channel() {
|
|
if (!this._client) {
|
|
return
|
|
}
|
|
switch (this.channel) {
|
|
case 'conversation':
|
|
this._client.send(
|
|
JSON.stringify({
|
|
type: 'connect',
|
|
body: {
|
|
channel: 'main',
|
|
id: this._channelID
|
|
}
|
|
})
|
|
)
|
|
break
|
|
case 'user':
|
|
this._client.send(
|
|
JSON.stringify({
|
|
type: 'connect',
|
|
body: {
|
|
channel: 'main',
|
|
id: this._channelID
|
|
}
|
|
})
|
|
)
|
|
this._client.send(
|
|
JSON.stringify({
|
|
type: 'connect',
|
|
body: {
|
|
channel: 'homeTimeline',
|
|
id: this._channelID
|
|
}
|
|
})
|
|
)
|
|
break
|
|
case 'list':
|
|
this._client.send(
|
|
JSON.stringify({
|
|
type: 'connect',
|
|
body: {
|
|
channel: 'userList',
|
|
id: this._channelID,
|
|
params: {
|
|
listId: this.listId
|
|
}
|
|
}
|
|
})
|
|
)
|
|
break
|
|
default:
|
|
this._client.send(
|
|
JSON.stringify({
|
|
type: 'connect',
|
|
body: {
|
|
channel: this.channel,
|
|
id: this._channelID
|
|
}
|
|
})
|
|
)
|
|
break
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Reconnects to the same endpoint.
|
|
*/
|
|
|
|
private _reconnect() {
|
|
setTimeout(() => {
|
|
// Skip reconnect when client is connecting.
|
|
// https://github.com/websockets/ws/blob/7.2.1/lib/websocket.js#L365
|
|
if (this._client && this._client.readyState === WS.CONNECTING) {
|
|
return
|
|
}
|
|
|
|
if (this._reconnectCurrentAttempts < this._reconnectMaxAttempts) {
|
|
this._reconnectCurrentAttempts++
|
|
this._clearBinding()
|
|
if (this._client) {
|
|
// In reconnect, we want to close the connection immediately,
|
|
// because recoonect is necessary when some problems occur.
|
|
this._client.terminate()
|
|
}
|
|
// Call connect methods
|
|
console.log('Reconnecting')
|
|
this._client = this._connect()
|
|
this._bindSocket(this._client)
|
|
}
|
|
}, this._reconnectInterval)
|
|
}
|
|
|
|
/**
|
|
* Clear binding event for websocket client.
|
|
*/
|
|
private _clearBinding() {
|
|
if (this._client) {
|
|
this._client.removeAllListeners('close')
|
|
this._client.removeAllListeners('pong')
|
|
this._client.removeAllListeners('open')
|
|
this._client.removeAllListeners('message')
|
|
this._client.removeAllListeners('error')
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Bind event for web socket client.
|
|
* @param client A WebSocket instance.
|
|
*/
|
|
private _bindSocket(client: WS) {
|
|
client.on('close', (code: number, _reason: Buffer) => {
|
|
if (code === 1000) {
|
|
this.emit('close', {})
|
|
} else {
|
|
console.log(`Closed connection with ${code}`)
|
|
if (!this._connectionClosed) {
|
|
this._reconnect()
|
|
}
|
|
}
|
|
})
|
|
client.on('pong', () => {
|
|
this._pongWaiting = false
|
|
this.emit('pong', {})
|
|
this._pongReceivedTimestamp = dayjs()
|
|
// It is required to anonymous function since get this scope in checkAlive.
|
|
setTimeout(() => this._checkAlive(this._pongReceivedTimestamp), this._heartbeatInterval)
|
|
})
|
|
client.on('open', () => {
|
|
this.emit('connect', {})
|
|
this._channel()
|
|
// Call first ping event.
|
|
setTimeout(() => {
|
|
client.ping('')
|
|
}, 10000)
|
|
})
|
|
client.on('message', (data: WS.Data, isBinary: boolean) => {
|
|
this.parser.parse(data, isBinary, this._channelID)
|
|
})
|
|
client.on('error', (err: Error) => {
|
|
this.emit('error', err)
|
|
})
|
|
}
|
|
|
|
/**
|
|
* Set up parser when receive message.
|
|
*/
|
|
private _setupParser() {
|
|
this.parser.on('update', (note: MisskeyAPI.Entity.Note) => {
|
|
this.emit('update', MisskeyAPI.Converter.note(note))
|
|
})
|
|
this.parser.on('notification', (notification: MisskeyAPI.Entity.Notification) => {
|
|
const n = MisskeyAPI.Converter.notification(notification)
|
|
if (n instanceof UnknownNotificationTypeError) {
|
|
console.warn(`Unknown notification event has received: ${notification}`)
|
|
} else {
|
|
this.emit('notification', n)
|
|
}
|
|
})
|
|
this.parser.on('conversation', (note: MisskeyAPI.Entity.Note) => {
|
|
this.emit('conversation', MisskeyAPI.Converter.noteToConversation(note))
|
|
})
|
|
this.parser.on('error', (err: Error) => {
|
|
this.emit('parser-error', err)
|
|
})
|
|
}
|
|
|
|
/**
|
|
* Call ping and wait to pong.
|
|
*/
|
|
private _checkAlive(timestamp: Dayjs) {
|
|
const now: Dayjs = dayjs()
|
|
// Block multiple calling, if multiple pong event occur.
|
|
// It the duration is less than interval, through ping.
|
|
if (now.diff(timestamp) > this._heartbeatInterval - 1000 && !this._connectionClosed) {
|
|
// Skip ping when client is connecting.
|
|
// https://github.com/websockets/ws/blob/7.2.1/lib/websocket.js#L289
|
|
if (this._client && this._client.readyState !== WS.CONNECTING) {
|
|
this._pongWaiting = true
|
|
this._client.ping('')
|
|
setTimeout(() => {
|
|
if (this._pongWaiting) {
|
|
this._pongWaiting = false
|
|
this._reconnect()
|
|
}
|
|
}, 10000)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Parser
|
|
* This class provides parser for websocket message.
|
|
*/
|
|
export class Parser extends EventEmitter {
|
|
/**
|
|
* @param message Message body of websocket.
|
|
* @param channelID Parse only messages which has same channelID.
|
|
*/
|
|
public parse(data: WS.Data, isBinary: boolean, channelID: string) {
|
|
const message = isBinary ? data : data.toString()
|
|
if (typeof message !== 'string') {
|
|
this.emit('heartbeat', {})
|
|
return
|
|
}
|
|
|
|
if (message === '') {
|
|
this.emit('heartbeat', {})
|
|
return
|
|
}
|
|
|
|
let obj: {
|
|
type: string
|
|
body: {
|
|
id: string
|
|
type: string
|
|
body: any
|
|
}
|
|
}
|
|
let body: {
|
|
id: string
|
|
type: string
|
|
body: any
|
|
}
|
|
|
|
try {
|
|
obj = JSON.parse(message)
|
|
if (obj.type !== 'channel') {
|
|
return
|
|
}
|
|
if (!obj.body) {
|
|
return
|
|
}
|
|
body = obj.body
|
|
if (body.id !== channelID) {
|
|
return
|
|
}
|
|
} catch (err) {
|
|
this.emit('error', new Error(`Error parsing websocket reply: ${message}, error message: ${err}`))
|
|
return
|
|
}
|
|
|
|
switch (body.type) {
|
|
case 'note':
|
|
this.emit('update', body.body as MisskeyAPI.Entity.Note)
|
|
break
|
|
case 'notification':
|
|
this.emit('notification', body.body as MisskeyAPI.Entity.Notification)
|
|
break
|
|
case 'mention': {
|
|
const note = body.body as MisskeyAPI.Entity.Note
|
|
if (note.visibility === 'specified') {
|
|
this.emit('conversation', note)
|
|
}
|
|
break
|
|
}
|
|
// When renote and followed event, the same notification will be received.
|
|
case 'renote':
|
|
case 'followed':
|
|
case 'follow':
|
|
case 'unfollow':
|
|
case 'receiveFollowRequest':
|
|
case 'meUpdated':
|
|
case 'readAllNotifications':
|
|
case 'readAllUnreadSpecifiedNotes':
|
|
case 'readAllAntennas':
|
|
case 'readAllUnreadMentions':
|
|
case 'unreadNotification':
|
|
// Ignore these events
|
|
break
|
|
default:
|
|
this.emit('error', new Error(`Unknown event has received: ${JSON.stringify(body)}`))
|
|
break
|
|
}
|
|
}
|
|
}
|