Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kill any from streaming API Implementation #14251

Merged
merged 7 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Note
- デッキUIの新着ノートをサウンドで通知する機能の追加(v2024.5.0)に伴い、以前から動作しなくなっていたクライアント設定内の「アンテナ受信」「チャンネル通知」サウンドを削除しました。
- Streaming APIにて入力が不正な場合にはそのメッセージを無視するようになりました。 #14251

### General
- Feat: 通報を受けた際、または解決した際に、予め登録した宛先に通知を飛ばせるように(mail or webhook) #13705
Expand Down Expand Up @@ -76,6 +77,7 @@
- Fix: ソーシャルタイムラインにローカルタイムラインに表示される自分へのリプライが表示されない問題を修正
- Fix: リノートのミュートが適用されるまでに時間がかかることがある問題を修正
(Cherry-picked from https://github.com/Type4ny-Project/Type4ny/commit/e9601029b52e0ad43d9131b555b614e56c84ebc1)
- Fix: Steaming APIが不正なデータを受けた場合の動作が不安定である問題 #14251

### Misskey.js
- Feat: `/drive/files/create` のリクエストに対応(`multipart/form-data`に対応)
Expand Down
28 changes: 17 additions & 11 deletions packages/backend/src/core/GlobalEventService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,10 @@ type SerializedAll<T> = {
[K in keyof T]: Serialized<T[K]>;
};

type UndefinedAsNullAll<T> = {
[K in keyof T]: T[K] extends undefined ? null : T[K];
}

export interface InternalEventTypes {
userChangeSuspendedState: { id: MiUser['id']; isSuspended: MiUser['isSuspended']; };
userChangeDeletedState: { id: MiUser['id']; isDeleted: MiUser['isDeleted']; };
Expand Down Expand Up @@ -248,55 +252,57 @@ export interface InternalEventTypes {
userKeypairUpdated: { userId: MiUser['id']; };
}

type EventTypesToEventPayload<T> = EventUnionFromDictionary<UndefinedAsNullAll<SerializedAll<T>>>;

// name/messages(spec) pairs dictionary
export type GlobalEvents = {
internal: {
name: 'internal';
payload: EventUnionFromDictionary<SerializedAll<InternalEventTypes>>;
payload: EventTypesToEventPayload<InternalEventTypes>;
};
broadcast: {
name: 'broadcast';
payload: EventUnionFromDictionary<SerializedAll<BroadcastTypes>>;
payload: EventTypesToEventPayload<BroadcastTypes>;
};
main: {
name: `mainStream:${MiUser['id']}`;
payload: EventUnionFromDictionary<SerializedAll<MainEventTypes>>;
payload: EventTypesToEventPayload<MainEventTypes>;
};
drive: {
name: `driveStream:${MiUser['id']}`;
payload: EventUnionFromDictionary<SerializedAll<DriveEventTypes>>;
payload: EventTypesToEventPayload<DriveEventTypes>;
};
note: {
name: `noteStream:${MiNote['id']}`;
payload: EventUnionFromDictionary<SerializedAll<NoteStreamEventTypes>>;
payload: EventTypesToEventPayload<NoteStreamEventTypes>;
};
userList: {
name: `userListStream:${MiUserList['id']}`;
payload: EventUnionFromDictionary<SerializedAll<UserListEventTypes>>;
payload: EventTypesToEventPayload<UserListEventTypes>;
};
roleTimeline: {
name: `roleTimelineStream:${MiRole['id']}`;
payload: EventUnionFromDictionary<SerializedAll<RoleTimelineEventTypes>>;
payload: EventTypesToEventPayload<RoleTimelineEventTypes>;
};
antenna: {
name: `antennaStream:${MiAntenna['id']}`;
payload: EventUnionFromDictionary<SerializedAll<AntennaEventTypes>>;
payload: EventTypesToEventPayload<AntennaEventTypes>;
};
admin: {
name: `adminStream:${MiUser['id']}`;
payload: EventUnionFromDictionary<SerializedAll<AdminEventTypes>>;
payload: EventTypesToEventPayload<AdminEventTypes>;
};
notes: {
name: 'notesStream';
payload: Serialized<Packed<'Note'>>;
};
reversi: {
name: `reversiStream:${MiUser['id']}`;
payload: EventUnionFromDictionary<SerializedAll<ReversiEventTypes>>;
payload: EventTypesToEventPayload<ReversiEventTypes>;
};
reversiGame: {
name: `reversiGameStream:${MiReversiGame['id']}`;
payload: EventUnionFromDictionary<SerializedAll<ReversiGameEventTypes>>;
payload: EventTypesToEventPayload<ReversiGameEventTypes>;
};
};

Expand Down
8 changes: 8 additions & 0 deletions packages/backend/src/misc/json-value.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/*
* SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/

export type JsonValue = JsonArray | JsonObject | string | number | boolean | null;
export type JsonObject = {[K in string]?: JsonValue};
export type JsonArray = JsonValue[];
59 changes: 36 additions & 23 deletions packages/backend/src/server/api/stream/Connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { CacheService } from '@/core/CacheService.js';
import { MiFollowing, MiUserProfile } from '@/models/_.js';
import type { StreamEventEmitter, GlobalEvents } from '@/core/GlobalEventService.js';
import { ChannelFollowingService } from '@/core/ChannelFollowingService.js';
import type { JsonObject } from '@/misc/json-value.js';
import type { ChannelsService } from './ChannelsService.js';
import type { EventEmitter } from 'events';
import type Channel from './channel.js';
Expand All @@ -28,7 +29,7 @@ export default class Connection {
private wsConnection: WebSocket.WebSocket;
public subscriber: StreamEventEmitter;
private channels: Channel[] = [];
private subscribingNotes: any = {};
private subscribingNotes: Partial<Record<string, number>> = {};
private cachedNotes: Packed<'Note'>[] = [];
public userProfile: MiUserProfile | null = null;
public following: Record<string, Pick<MiFollowing, 'withReplies'> | undefined> = {};
Expand Down Expand Up @@ -101,7 +102,7 @@ export default class Connection {
*/
@bindThis
private async onWsConnectionMessage(data: WebSocket.RawData) {
let obj: Record<string, any>;
let obj: JsonObject;

try {
obj = JSON.parse(data.toString());
Expand All @@ -111,6 +112,8 @@ export default class Connection {

const { type, body } = obj;

if (typeof body !== 'object' || body === null || Array.isArray(body)) return;

switch (type) {
case 'readNotification': this.onReadNotification(body); break;
case 'subNote': this.onSubscribeNote(body); break;
Expand Down Expand Up @@ -151,7 +154,7 @@ export default class Connection {
}

@bindThis
private readNote(body: any) {
private readNote(body: JsonObject) {
const id = body.id;

const note = this.cachedNotes.find(n => n.id === id);
Expand All @@ -163,24 +166,22 @@ export default class Connection {
}

@bindThis
private onReadNotification(payload: any) {
private onReadNotification(payload: JsonObject) {
this.notificationService.readAllNotification(this.user!.id);
}

/**
* 投稿購読要求時
*/
@bindThis
private onSubscribeNote(payload: any) {
if (!payload.id) return;

if (this.subscribingNotes[payload.id] == null) {
this.subscribingNotes[payload.id] = 0;
}
private onSubscribeNote(payload: JsonObject) {
if (!payload.id || typeof payload.id !== 'string') return;

this.subscribingNotes[payload.id]++;
const current = this.subscribingNotes[payload.id] ?? 0;
const updated = current + 1;
this.subscribingNotes[payload.id] = updated;

if (this.subscribingNotes[payload.id] === 1) {
if (updated === 1) {
this.subscriber.on(`noteStream:${payload.id}`, this.onNoteStreamMessage);
}
}
Expand All @@ -189,11 +190,14 @@ export default class Connection {
* 投稿購読解除要求時
*/
@bindThis
private onUnsubscribeNote(payload: any) {
if (!payload.id) return;

this.subscribingNotes[payload.id]--;
if (this.subscribingNotes[payload.id] <= 0) {
private onUnsubscribeNote(payload: JsonObject) {
if (!payload.id || typeof payload.id !== 'string') return;

const current = this.subscribingNotes[payload.id];
if (current == null) return;
const updated = current - 1;
this.subscribingNotes[payload.id] = updated;
if (updated <= 0) {
delete this.subscribingNotes[payload.id];
this.subscriber.off(`noteStream:${payload.id}`, this.onNoteStreamMessage);
}
Expand All @@ -212,25 +216,30 @@ export default class Connection {
* チャンネル接続要求時
*/
@bindThis
private onChannelConnectRequested(payload: any) {
private onChannelConnectRequested(payload: JsonObject) {
const { channel, id, params, pong } = payload;
this.connectChannel(id, params, channel, pong);
if (typeof id !== 'string') return;
if (typeof channel !== 'string') return;
if (typeof pong !== 'boolean' && typeof pong !== 'undefined' && pong !== null) return;
if (typeof params !== 'undefined' && (typeof params !== 'object' || params === null || Array.isArray(params))) return;
this.connectChannel(id, params, channel, pong ?? undefined);
}

/**
* チャンネル切断要求時
*/
@bindThis
private onChannelDisconnectRequested(payload: any) {
private onChannelDisconnectRequested(payload: JsonObject) {
const { id } = payload;
if (typeof id !== 'string') return;
this.disconnectChannel(id);
}

/**
* クライアントにメッセージ送信
*/
@bindThis
public sendMessageToWs(type: string, payload: any) {
public sendMessageToWs(type: string, payload: JsonObject) {
this.wsConnection.send(JSON.stringify({
type: type,
body: payload,
Expand All @@ -241,7 +250,7 @@ export default class Connection {
* チャンネルに接続
*/
@bindThis
public connectChannel(id: string, params: any, channel: string, pong = false) {
public connectChannel(id: string, params: JsonObject | undefined, channel: string, pong = false) {
const channelService = this.channelsService.getChannelService(channel);

if (channelService.requireCredential && this.user == null) {
Expand Down Expand Up @@ -288,7 +297,11 @@ export default class Connection {
* @param data メッセージ
*/
@bindThis
private onChannelMessageRequested(data: any) {
private onChannelMessageRequested(data: JsonObject) {
if (typeof data.id !== 'string') return;
if (typeof data.type !== 'string') return;
if (typeof data.body === 'undefined') return;

const channel = this.channels.find(c => c.id === data.id);
if (channel != null && channel.onMessage != null) {
channel.onMessage(data.type, data.body);
Expand Down
13 changes: 8 additions & 5 deletions packages/backend/src/server/api/stream/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { isInstanceMuted } from '@/misc/is-instance-muted.js';
import { isUserRelated } from '@/misc/is-user-related.js';
import { isRenotePacked, isQuotePacked } from '@/misc/is-renote.js';
import type { Packed } from '@/misc/json-schema.js';
import type { JsonObject, JsonValue } from '@/misc/json-value.js';
import type Connection from './Connection.js';

/**
Expand Down Expand Up @@ -81,10 +82,12 @@ export default abstract class Channel {
this.connection = connection;
}

public send(payload: { type: string, body: JsonValue }): void
public send(type: string, payload: JsonValue): void
@bindThis
public send(typeOrPayload: any, payload?: any) {
const type = payload === undefined ? typeOrPayload.type : typeOrPayload;
const body = payload === undefined ? typeOrPayload.body : payload;
public send(typeOrPayload: { type: string, body: JsonValue } | string, payload?: JsonValue) {
const type = payload === undefined ? (typeOrPayload as { type: string, body: JsonValue }).type : (typeOrPayload as string);
const body = payload === undefined ? (typeOrPayload as { type: string, body: JsonValue }).body : payload;

this.connection.sendMessageToWs('channel', {
id: this.id,
Expand All @@ -93,11 +96,11 @@ export default abstract class Channel {
});
}

public abstract init(params: any): void;
public abstract init(params: JsonObject): void;

public dispose?(): void;

public onMessage?(type: string, body: any): void;
public onMessage?(type: string, body: JsonValue): void;
}

export type MiChannelService<T extends boolean> = {
Expand Down
3 changes: 2 additions & 1 deletion packages/backend/src/server/api/stream/channels/admin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import { Injectable } from '@nestjs/common';
import { bindThis } from '@/decorators.js';
import type { JsonObject } from '@/misc/json-value.js';
import Channel, { type MiChannelService } from '../channel.js';

class AdminChannel extends Channel {
Expand All @@ -14,7 +15,7 @@ class AdminChannel extends Channel {
public static kind = 'read:admin:stream';

@bindThis
public async init(params: any) {
public async init(params: JsonObject) {
// Subscribe admin stream
this.subscriber.on(`adminStream:${this.user!.id}`, data => {
this.send(data);
Expand Down
6 changes: 4 additions & 2 deletions packages/backend/src/server/api/stream/channels/antenna.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { Injectable } from '@nestjs/common';
import { NoteEntityService } from '@/core/entities/NoteEntityService.js';
import { bindThis } from '@/decorators.js';
import type { GlobalEvents } from '@/core/GlobalEventService.js';
import type { JsonObject } from '@/misc/json-value.js';
import Channel, { type MiChannelService } from '../channel.js';

class AntennaChannel extends Channel {
Expand All @@ -27,8 +28,9 @@ class AntennaChannel extends Channel {
}

@bindThis
public async init(params: any) {
this.antennaId = params.antennaId as string;
public async init(params: JsonObject) {
if (typeof params.antennaId !== 'string') return;
this.antennaId = params.antennaId;

// Subscribe stream
this.subscriber.on(`antennaStream:${this.antennaId}`, this.onEvent);
Expand Down
6 changes: 4 additions & 2 deletions packages/backend/src/server/api/stream/channels/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type { Packed } from '@/misc/json-schema.js';
import { NoteEntityService } from '@/core/entities/NoteEntityService.js';
import { bindThis } from '@/decorators.js';
import { isRenotePacked, isQuotePacked } from '@/misc/is-renote.js';
import type { JsonObject } from '@/misc/json-value.js';
import Channel, { type MiChannelService } from '../channel.js';

class ChannelChannel extends Channel {
Expand All @@ -27,8 +28,9 @@ class ChannelChannel extends Channel {
}

@bindThis
public async init(params: any) {
this.channelId = params.channelId as string;
public async init(params: JsonObject) {
if (typeof params.channelId !== 'string') return;
this.channelId = params.channelId;

// Subscribe stream
this.subscriber.on('notesStream', this.onNote);
Expand Down
3 changes: 2 additions & 1 deletion packages/backend/src/server/api/stream/channels/drive.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import { Injectable } from '@nestjs/common';
import { bindThis } from '@/decorators.js';
import type { JsonObject } from '@/misc/json-value.js';
import Channel, { type MiChannelService } from '../channel.js';

class DriveChannel extends Channel {
Expand All @@ -14,7 +15,7 @@ class DriveChannel extends Channel {
public static kind = 'read:account';

@bindThis
public async init(params: any) {
public async init(params: JsonObject) {
// Subscribe drive stream
this.subscriber.on(`driveStream:${this.user!.id}`, data => {
this.send(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { NoteEntityService } from '@/core/entities/NoteEntityService.js';
import { bindThis } from '@/decorators.js';
import { RoleService } from '@/core/RoleService.js';
import { isRenotePacked, isQuotePacked } from '@/misc/is-renote.js';
import type { JsonObject } from '@/misc/json-value.js';
import Channel, { type MiChannelService } from '../channel.js';

class GlobalTimelineChannel extends Channel {
Expand All @@ -32,12 +33,12 @@ class GlobalTimelineChannel extends Channel {
}

@bindThis
public async init(params: any) {
public async init(params: JsonObject) {
const policies = await this.roleService.getUserPolicies(this.user ? this.user.id : null);
if (!policies.gtlAvailable) return;

this.withRenotes = params.withRenotes ?? true;
this.withFiles = params.withFiles ?? false;
this.withRenotes = !!(params.withRenotes ?? true);
this.withFiles = !!(params.withFiles ?? false);

// Subscribe events
this.subscriber.on('notesStream', this.onNote);
Expand Down
Loading
Loading