Typed stream (#13)

* Update streaming.ts

* Update streaming.ts

* wip
This commit is contained in:
syuilo
2021-05-17 19:50:31 +09:00
committed by GitHub
parent d7d02cd2bc
commit 99276028ae
3 changed files with 174 additions and 10 deletions

View File

@@ -3,6 +3,7 @@ import { EventEmitter } from 'eventemitter3';
import ReconnectingWebsocket from 'reconnecting-websocket';
import { stringify } from 'querystring';
import { markRaw } from '@vue/reactivity';
import { MeDetailed, MessagingMessage, Note, Notification, PageEvent, User } from './entities';
function urlQuery(obj: {}): string {
return stringify(Object.entries(obj)
@@ -10,10 +11,84 @@ function urlQuery(obj: {}): string {
.reduce((a, [k, v]) => (a[k] = v, a), {} as Record<string, any>));
}
type FIXME = any;
type ChannelDef = {
main: {
events: {
notification: (payload: Notification) => void;
mention: (payload: Note) => void;
reply: (payload: Note) => void;
renote: (payload: Note) => void;
follow: (payload: User) => void; // 自分が他人をフォローしたとき
followed: (payload: User) => void; // 他人が自分をフォローしたとき
unfollow: (payload: User) => void; // 自分が他人をフォロー解除したとき
meUpdated: (payload: MeDetailed) => void;
pageEvent: (payload: PageEvent) => void;
};
};
homeTimeline: {
events: {
note: (payload: Note) => void;
};
};
localTimeline: {
events: {
note: (payload: Note) => void;
};
};
hybridTimeline: {
events: {
note: (payload: Note) => void;
};
};
globalTimeline: {
events: {
note: (payload: Note) => void;
};
};
messaging: {
events: {
message: (payload: MessagingMessage) => void;
deleted: (payload: MessagingMessage['id']) => void;
read: (payload: MessagingMessage['id'][]) => void;
typing: (payload: User['id']) => void;
};
};
};
type NoteUpdatedEvent = {
id: Note['id'];
type: 'reacted';
body: {
reaction: string;
userId: User['id'];
};
} | {
id: Note['id'];
type: 'deleted';
body: {
deletedAt: string;
};
} | {
id: Note['id'];
type: 'pollVoted';
body: {
choice: number;
userId: User['id'];
};
};
type StreamEvents = {
_connected_: void;
_disconnected_: void;
noteUpdated: (payload: NoteUpdatedEvent) => void;
};
/**
* Misskey stream connection
*/
export default class Stream extends EventEmitter {
export default class Stream extends EventEmitter<StreamEvents> {
private stream: ReconnectingWebsocket;
public state: 'initializing' | 'reconnecting' | 'connected' = 'initializing';
private sharedConnectionPools: Pool[] = [];
@@ -38,7 +113,7 @@ export default class Stream extends EventEmitter {
}
@autobind
public useSharedConnection(channel: string, name?: string): SharedConnection {
public useSharedConnection<C extends keyof ChannelDef>(channel: C, name?: string): SharedConnection<ChannelDef[C]['events']> {
let pool = this.sharedConnectionPools.find(p => p.channel === channel);
if (pool == null) {
@@ -62,7 +137,7 @@ export default class Stream extends EventEmitter {
}
@autobind
public connectToChannel(channel: string, params?: any): NonSharedConnection {
public connectToChannel<C extends keyof ChannelDef>(channel: C, params?: any): NonSharedConnection<ChannelDef[C]['events']> {
const connection = markRaw(new NonSharedConnection(this, channel, params));
this.nonSharedConnections.push(connection);
return connection;
@@ -227,7 +302,7 @@ class Pool {
}
}
abstract class Connection extends EventEmitter {
abstract class Connection<Events extends Record<string, any> = any> extends EventEmitter<Events> {
public channel: string;
protected stream: Stream;
public abstract id: string;
@@ -261,7 +336,7 @@ abstract class Connection extends EventEmitter {
public abstract dispose(): void;
}
class SharedConnection extends Connection {
class SharedConnection<Events = any> extends Connection<Events> {
private pool: Pool;
public get id(): string {
@@ -288,7 +363,7 @@ class SharedConnection extends Connection {
}
}
class NonSharedConnection extends Connection {
class NonSharedConnection<Events = any> extends Connection<Events> {
public id: string;
protected params: any;