Reconnect, or the organization of reconnection to the server, is a primary factor when working with websockets, since Network breaks, server crashes, or other errors that cause a disconnection can cause the application to crash.
It is important to note that reconnection attempts should not be too frequent and should not continue indefinitely, since this behavior is capable of hanging the client.
"Subscriber-subscriber (publisher-subscriber or pub / pub) is a behavioral design pattern of messaging in which message senders, called publishers, are not directly tied to the subscribers' program code ). Instead, messages are divided into classes and do not contain information about their subscribers, if any. Similarly, subscribers deal with one or more classes of messages, abstracting from specific publishers. ”
0x80, <length - one or several bytes>, <message body>
“A data model is an abstract, self-sufficient, logical definition of objects, operators, and other elements that together make up an abstract data access machine with which the user interacts. These objects allow you to model the data structure, and the operators - the behavior of the data. "
export interface IWebsocketService { // addEventListener<T>(topics: string[], id?: number): Observable<T>; runtimeIgnore(topics: string[]): void; runtimeRemoveIgnore(topics: string[]): void; sendMessage(event: string, data: any): void; } export interface WebSocketConfig { // DI url: string; ignore?: string[]; garbageCollectInterval?: number; options?: Options; } export interface ITopic<T> { // Pub/Sub [hash: string]: MessageSubject<T>; } export interface IListeners { // [topic: string]: ITopic<any>; } export interface IBuffer { // ws.message type: string; data: number[]; } export interface IWsMessage { // ws.message event: string; buffer: IBuffer; } export interface IMessage { // id: number; text: string; } export type ITopicDataType = IMessage[] | number | string[]; // callMessage
export class MessageSubject<T> extends Subject<T> { constructor( private listeners: IListeners, // private topic: string, // private id: string // id ) { super(); } /* * next, * , * garbageCollect */ public next(value?: T): void { if (this.closed) { throw new ObjectUnsubscribedError(); } if (!this.isStopped) { const {observers} = this; const len = observers.length; const copy = observers.slice(); for (let i = 0; i < len; i++) { copy[i].next(value); } if (!len) { this.garbageCollect(); // } } } /* * garbage collector * */ private garbageCollect(): void { delete this.listeners[this.topic][this.id]; // Subject if (!Object.keys(this.listeners[this.topic]).length) { // delete this.listeners[this.topic]; } } }
export const WS_API = { EVENTS: { MESSAGES: 'messages', COUNTER: 'counter', UPDATE_TEXTS: 'update-texts' }, COMMANDS: { SEND_TEXT: 'set-text', REMOVE_TEXT: 'remove-text' } };
import { InjectionToken } from '@angular/core'; export const config: InjectionToken<string> = new InjectionToken('websocket');
import Dexie from 'dexie'; import { IMessage, IWsMessage } from './websocket.interfaces'; import { WS_API } from './websocket.events'; class MessagesDatabase extends Dexie { // Dexie typescript public messages!: Dexie.Table<IMessage, number>; // id is number in this case constructor() { super('MessagesDatabase'); // this.version(1).stores({ // messages: '++id,text' }); } }
export const modelParser = (message: IWsMessage) => { if (message && message.buffer) { /* */ const encodeUint8Array = String.fromCharCode .apply(String, new Uint8Array(message.buffer.data)); const parseData = JSON.parse(encodeUint8Array); let MessagesDB: MessagesDatabase; // IndexedDB if (message.event === WS_API.EVENTS.MESSAGES) { // IMessage[] if (!MessagesDB) { MessagesDB = new MessagesDatabase(); } parseData.forEach((messageData: IMessage) => { /* */ MessagesDB.transaction('rw', MessagesDB.messages, async () => { /* , */ if ((await MessagesDB.messages .where({id: messageData.id}).count()) === 0) { const id = await MessagesDB.messages .add({id: messageData.id, text: messageData.text}); console.log(`Addded message with id ${id}`); } }).catch(e => { console.error(e.stack || e); }); }); return MessagesDB.messages.toArray(); // IMessage[] } if (message.event === WS_API.EVENTS.COUNTER) { // counter return new Promise(r => r(parseData)); // } if (message.event === WS_API.EVENTS.UPDATE_TEXTS) { // text const texts = []; parseData.forEach((textData: string) => { texts.push(textData); }); return new Promise(r => r(texts)); // } } else { console.log(`[${Date()}] Buffer is "undefined"`); } };
@NgModule({ imports: [ CommonModule ] }) export class WebsocketModule { public static config(wsConfig: WebSocketConfig): ModuleWithProviders { return { ngModule: WebsocketModule, providers: [{provide: config, useValue: wsConfig}] }; } }
private listeners: IListeners; // private uniqueId: number; // id private websocket: ReconnectingWebSocket; // constructor(@Inject(config) private wsConfig: WebSocketConfig) { this.uniqueId = -1; this.listeners = {}; this.wsConfig.ignore = wsConfig.ignore ? wsConfig.ignore : []; // this.connect(); } ngOnDestroy() { this.websocket.close(); // }
private connect(): void { // ReconnectingWebSocket config const options = { connectionTimeout: 1000, // , maxRetries: 10, // , ...this.wsConfig.options }; // this.websocket = new ReconnectingWebSocket(this.wsConfig.url, [], options); this.websocket.addEventListener('open', (event: Event) => { // console.log(`[${Date()}] WebSocket connected!`); }); this.websocket.addEventListener('close', (event: CloseEvent) => { // console.log(`[${Date()}] WebSocket close!`); }); this.websocket.addEventListener('error', (event: ErrorEvent) => { // console.error(`[${Date()}] WebSocket error!`); }); this.websocket.addEventListener('message', (event: MessageEvent) => { // this.onMessage(event); }); setInterval(() => { // this.garbageCollect(); }, (this.wsConfig.garbageCollectInterval || 10000)); }
private garbageCollect(): void { for (const event in this.listeners) { if (this.listeners.hasOwnProperty(event)) { const topic = this.listeners[event]; for (const key in topic) { if (topic.hasOwnProperty(key)) { const subject = topic[key]; // Subject if (!subject.observers.length) { delete topic[key]; } } } , if (!Object.keys(topic).length) { delete this.listeners[event]; } } } }
private onMessage(event: MessageEvent): void { const message = JSON.parse(event.data); for (const name in this.listeners) { if (this.listeners.hasOwnProperty(name) && !this.wsConfig.ignore.includes(name)) { const topic = this.listeners[name]; const keys = name.split('/'); // const isMessage = keys.includes(message.event); const model = modelParser(message); // if (isMessage && typeof model !== 'undefined') { model.then((data: ITopicDataType) => { // Subject this.callMessage<ITopicDataType>(topic, data); }); } } } }
private callMessage<T>(topic: ITopic<T>, data: T): void { for (const key in topic) { if (topic.hasOwnProperty(key)) { const subject = topic[key]; if (subject) { // subject.next(data); } else { console.log(`[${Date()}] Topic Subject is "undefined"`); } } } }
private addTopic<T>(topic: string, id?: number): MessageSubject<T> { const token = (++this.uniqueId).toString(); const key = id ? token + id : token; // id const hash = sha256.hex(key); // SHA256- id if (!this.listeners[topic]) { this.listeners[topic] = <any>{}; } return this.listeners[topic][hash] = new MessageSubject<T>(this.listeners, topic, hash); }
public addEventListener<T>(topics: string | string[], id?: number): Observable<T> { if (topics) { // const topicsKey = typeof topics === 'string' ? topics : topics.join('/'); return this.addTopic<T>(topicsKey, id).asObservable(); } else { console.log(`[${Date()}] Can't add EventListener. Type of event is "undefined".`); } }
public sendMessage(event: string, data: any = {}): void { // , if (event && this.websocket.readyState === 1) { this.websocket.send(JSON.stringify({event, data})); } else { console.log('Send error!'); } }
public runtimeIgnore(topics: string[]): void { if (topics && topics.length) { // this.wsConfig.ignore.push(...topics); } }
public runtimeRemoveIgnore(topics: string[]): void { if (topics && topics.length) { topics.forEach((topic: string) => { // const topicIndex = this.wsConfig.ignore.findIndex(t => t === topic); if (topicIndex > -1) { // this.wsConfig.ignore.splice(topicIndex, 1); } }); } }
@NgModule({ declarations: [ AppComponent ], imports: [ BrowserModule, ReactiveFormsModule, WebsocketModule.config({ url: environment.ws, // "ws://mywebsocketurl" // ignore: [WS_API.EVENTS.ANY_1, WS_API.EVENTS.ANY_2], garbageCollectInterval: 60 * 1000, // options: { connectionTimeout: 1000, // maxRetries: 10 // } }) ], providers: [], bootstrap: [AppComponent] }) export class AppModule { }
@Component({ selector: 'app-root', templateUrl: './app.component.html', styleUrls: ['./app.component.css'] }) export class AppComponent implements OnInit, OnDestroy { private messages$: Observable<IMessage[]>; private messagesMulti$: Observable<IMessage[]>; private counter$: Observable<number>; private texts$: Observable<string[]>; public form: FormGroup; constructor( private fb: FormBuilder, private wsService: WebsocketService) { } ngOnInit() { this.form = this.fb.group({ text: [null, [ Validators.required ]] }); // get messages this.messages$ = this.wsService .addEventListener<IMessage[]>(WS_API.EVENTS.MESSAGES); // get messages multi this.messagesMulti$ = this.wsService .addEventListener<IMessage[]>([ WS_API.EVENTS.MESSAGES, WS_API.EVENTS.MESSAGES_1 ]); // get counter this.counter$ = this.wsService .addEventListener<number>(WS_API.EVENTS.COUNTER); // get texts this.texts$ = this.wsService .addEventListener<string[]>(WS_API.EVENTS.UPDATE_TEXTS); } ngOnDestroy() { } public sendText(): void { if (this.form.valid) { this.wsService .sendMessage(WS_API.COMMANDS.SEND_TEXT, this.form.value.text); this.form.reset(); } } public removeText(index: number): void { this.wsService.sendMessage(WS_API.COMMANDS.REMOVE_TEXT, index); } }
Source: https://habr.com/ru/post/419099/
All Articles