Skip to content

Commit

Permalink
refactor: publishHogeStreamとStreamのEventEmitterに型定義する (#7769)
Browse files Browse the repository at this point in the history
* wip

* wip

* wip

* ✌️

* add main stream

* packedNotificationSchemaを更新

* read:gallery, write:gallery, read:gallery-likes, write:gallery-likesに翻訳を追加

* fix

* ok

* add header, choice, invitation

* add header, choice, invitation

* test

* fix

* fix

* yatta

* remove no longer needed "as PackedUser/PackedNote"

* clean up

* add simple-schema

* fix lint

* fix lint

* wip

* wip!

* wip

* fix

* wip

* wip

* ✌️

* 送信側に型エラーがないことを3回確認した

* ✌️

* wip

* update typescript

* define items in full Schema

* edit comment

* edit comment

* edit comment

* Update src/prelude/types.ts

Co-authored-by: Acid Chicken (硫酸鶏) <[email protected]>

* #7769 (comment)

* user packとnote packの型不整合を修正

* revert #7772 (comment)

* revert #7772 (comment)

* user packとnote packの型不整合を修正

* add prelude/types.ts

* emoji

* signin

* game

* matching

* clean up

* ev => data

* refactor

* clean up

* add type

* antenna

* channel

* fix

* add Packed type

* add PackedRef

* fix lint

* add emoji schema

* add reversiGame

* add reversiMatching

* remove signin schema (use Signin entity)

* add schemas refs, fix Packed type

* wip PackedHoge => Packed<'Hoge'>

* add Packed type

* note-reaction

* user

* user-group

* user-list

* note

* app, messaging-message

* notification

* drive-file

* drive-folder

* following

* muting

* blocking

* hashtag

* page

* app (with modifying schema)

* import user?

* channel

* antenna

* clip

* gallery-post

* emoji

* Packed

* reversi-matching

* update stream.ts

* #7769 (comment)

* fix lint

* clean up?

* add changelog

* add changelog

* add changelog

* fix: アンテナが既読にならないのを修正

* revert fix

* #7769 (comment)

* spec => payload

* edit commetn

Co-authored-by: Acid Chicken (硫酸鶏) <[email protected]>
  • Loading branch information
tamaina and acid-chicken authored Oct 20, 2021
1 parent 5ca6e6b commit 69b56f6
Show file tree
Hide file tree
Showing 12 changed files with 383 additions and 60 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@
"seedrandom": "3.0.5",
"sharp": "0.29.1",
"speakeasy": "2.0.0",
"strict-event-emitter-types": "2.0.0",
"stringz": "2.1.0",
"style-loader": "3.3.0",
"summaly": "2.4.1",
Expand Down
2 changes: 1 addition & 1 deletion src/models/repositories/signin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Signin } from '@/models/entities/signin';
@EntityRepository(Signin)
export class SigninRepository extends Repository<Signin> {
public async pack(
src: any,
src: Signin,
) {
return src;
}
Expand Down
2 changes: 1 addition & 1 deletion src/server/api/common/read-messaging-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ export async function readGroupMessagingMessage(
id: In(messageIds)
});

const reads = [];
const reads: MessagingMessage['id'][] = [];

for (const message of messages) {
if (message.userId === userId) continue;
Expand Down
2 changes: 1 addition & 1 deletion src/server/api/endpoints/antennas/update.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ export default define(meta, async (ps, user) => {
notify: ps.notify,
});

publishInternalEvent('antennaUpdated', Antennas.findOneOrFail(antenna.id));
publishInternalEvent('antennaUpdated', await Antennas.findOneOrFail(antenna.id));

return await Antennas.pack(antenna.id);
});
11 changes: 5 additions & 6 deletions src/server/api/stream/channels/antenna.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import Channel from '../channel';
import { Notes } from '@/models/index';
import { isMutedUserRelated } from '@/misc/is-muted-user-related';
import { isBlockerUserRelated } from '@/misc/is-blocker-user-related';
import { StreamMessages } from '../types';

export default class extends Channel {
public readonly chName = 'antenna';
Expand All @@ -19,11 +20,9 @@ export default class extends Channel {
}

@autobind
private async onEvent(data: any) {
const { type, body } = data;

if (type === 'note') {
const note = await Notes.pack(body.id, this.user, { detail: true });
private async onEvent(data: StreamMessages['antenna']['payload']) {
if (data.type === 'note') {
const note = await Notes.pack(data.body.id, this.user, { detail: true });

// 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する
if (isMutedUserRelated(note, this.muting)) return;
Expand All @@ -34,7 +33,7 @@ export default class extends Channel {

this.send('note', note);
} else {
this.send(type, body);
this.send(data.type, data.body);
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/server/api/stream/channels/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Notes, Users } from '@/models/index';
import { isMutedUserRelated } from '@/misc/is-muted-user-related';
import { isBlockerUserRelated } from '@/misc/is-blocker-user-related';
import { User } from '@/models/entities/user';
import { StreamMessages } from '../types';
import { Packed } from '@/misc/schema';

export default class extends Channel {
Expand Down Expand Up @@ -52,7 +53,7 @@ export default class extends Channel {
}

@autobind
private onEvent(data: any) {
private onEvent(data: StreamMessages['channel']['payload']) {
if (data.type === 'typing') {
const id = data.body;
const begin = this.typers[id] == null;
Expand Down
24 changes: 11 additions & 13 deletions src/server/api/stream/channels/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,33 @@ export default class extends Channel {
public async init(params: any) {
// Subscribe main stream channel
this.subscriber.on(`mainStream:${this.user!.id}`, async data => {
const { type } = data;
let { body } = data;

switch (type) {
switch (data.type) {
case 'notification': {
if (this.muting.has(body.userId)) return;
if (body.note && body.note.isHidden) {
const note = await Notes.pack(body.note.id, this.user, {
if (data.body.userId && this.muting.has(data.body.userId)) return;

if (data.body.note && data.body.note.isHidden) {
const note = await Notes.pack(data.body.note.id, this.user, {
detail: true
});
this.connection.cacheNote(note);
body.note = note;
data.body.note = note;
}
break;
}
case 'mention': {
if (this.muting.has(body.userId)) return;
if (body.isHidden) {
const note = await Notes.pack(body.id, this.user, {
if (this.muting.has(data.body.userId)) return;
if (data.body.isHidden) {
const note = await Notes.pack(data.body.id, this.user, {
detail: true
});
this.connection.cacheNote(note);
body = note;
data.body = note;
}
break;
}
}

this.send(type, body);
this.send(data.type, data.body);
});
}
}
6 changes: 4 additions & 2 deletions src/server/api/stream/channels/messaging.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { readUserMessagingMessage, readGroupMessagingMessage, deliverReadActivit
import Channel from '../channel';
import { UserGroupJoinings, Users, MessagingMessages } from '@/models/index';
import { User, ILocalUser, IRemoteUser } from '@/models/entities/user';
import { UserGroup } from '@/models/entities/user-group';
import { StreamMessages } from '../types';

export default class extends Channel {
public readonly chName = 'messaging';
Expand All @@ -12,7 +14,7 @@ export default class extends Channel {
private otherpartyId: string | null;
private otherparty: User | null;
private groupId: string | null;
private subCh: string;
private subCh: `messagingStream:${User['id']}-${User['id']}` | `messagingStream:${UserGroup['id']}`;
private typers: Record<User['id'], Date> = {};
private emitTypersIntervalId: ReturnType<typeof setInterval>;

Expand Down Expand Up @@ -45,7 +47,7 @@ export default class extends Channel {
}

@autobind
private onEvent(data: any) {
private onEvent(data: StreamMessages['messaging']['payload'] | StreamMessages['groupMessaging']['payload']) {
if (data.type === 'typing') {
const id = data.body;
const begin = this.typers[id] == null;
Expand Down
35 changes: 17 additions & 18 deletions src/server/api/stream/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { AccessToken } from '@/models/entities/access-token';
import { UserProfile } from '@/models/entities/user-profile';
import { publishChannelStream, publishGroupMessagingStream, publishMessagingStream } from '@/services/stream';
import { UserGroup } from '@/models/entities/user-group';
import { StreamEventEmitter, StreamMessages } from './types';
import { Packed } from '@/misc/schema';

/**
Expand All @@ -28,7 +29,7 @@ export default class Connection {
public followingChannels: Set<ChannelModel['id']> = new Set();
public token?: AccessToken;
private wsConnection: websocket.connection;
public subscriber: EventEmitter;
public subscriber: StreamEventEmitter;
private channels: Channel[] = [];
private subscribingNotes: any = {};
private cachedNotes: Packed<'Note'>[] = [];
Expand All @@ -46,8 +47,8 @@ export default class Connection {

this.wsConnection.on('message', this.onWsConnectionMessage);

this.subscriber.on('broadcast', async ({ type, body }) => {
this.onBroadcastMessage(type, body);
this.subscriber.on('broadcast', data => {
this.onBroadcastMessage(data);
});

if (this.user) {
Expand All @@ -57,43 +58,41 @@ export default class Connection {
this.updateFollowingChannels();
this.updateUserProfile();

this.subscriber.on(`user:${this.user.id}`, ({ type, body }) => {
this.onUserEvent(type, body);
});
this.subscriber.on(`user:${this.user.id}`, this.onUserEvent);
}
}

@autobind
private onUserEvent(type: string, body: any) {
switch (type) {
private onUserEvent(data: StreamMessages['user']['payload']) { // { type, body }と展開するとそれぞれ型が分離してしまう
switch (data.type) {
case 'follow':
this.following.add(body.id);
this.following.add(data.body.id);
break;

case 'unfollow':
this.following.delete(body.id);
this.following.delete(data.body.id);
break;

case 'mute':
this.muting.add(body.id);
this.muting.add(data.body.id);
break;

case 'unmute':
this.muting.delete(body.id);
this.muting.delete(data.body.id);
break;

// TODO: block events

case 'followChannel':
this.followingChannels.add(body.id);
this.followingChannels.add(data.body.id);
break;

case 'unfollowChannel':
this.followingChannels.delete(body.id);
this.followingChannels.delete(data.body.id);
break;

case 'updateUserProfile':
this.userProfile = body;
this.userProfile = data.body;
break;

case 'terminate':
Expand Down Expand Up @@ -145,8 +144,8 @@ export default class Connection {
}

@autobind
private onBroadcastMessage(type: string, body: any) {
this.sendMessageToWs(type, body);
private onBroadcastMessage(data: StreamMessages['broadcast']['payload']) {
this.sendMessageToWs(data.type, data.body);
}

@autobind
Expand Down Expand Up @@ -249,7 +248,7 @@ export default class Connection {
}

@autobind
private async onNoteStreamMessage(data: any) {
private async onNoteStreamMessage(data: StreamMessages['note']['payload']) {
this.sendMessageToWs('noteUpdated', {
id: data.body.id,
type: data.type,
Expand Down
Loading

0 comments on commit 69b56f6

Please sign in to comment.