📜 ⬆️ ⬇️

Angular2 + Websocket + RxJS + Rails5

image

Hello! This article is about how to connect the Angular2 client application with a Rails 5 server using Websocket.

Rails


For example, we need the simplest ApplicationCable :: Channel:

class ChatChannel < ApplicationCable::Channel def subscribed stream_from 'chat_channel' end def index ActionCable.server.broadcast 'chat_channel', { messages: Message.serialize_all( Message.all ) } end def create( data ) Message.create( name: data[ 'name' ], text: data[ 'text' ] ); ActionCable.server.broadcast 'chat_channel', { messages: Message.serialize_all( Message.all ) } end def unsubscribed end end 

')

Angular2


WebSocketService


First we need a service that will provide our application data exchange with the server:

 import { Injectable } from "@angular/core"; import { Subject, Observable, Subscription } from 'rxjs/Rx'; import { WebSocketSubject } from "rxjs/observable/dom/WebSocketSubject"; @Injectable() export class WebSocketService { private ws: WebSocketSubject<Object>; private socket: Subscription; private url: string; public message: Subject<Object> = new Subject(); public opened: Subject<boolean> = new Subject(); public close():void{ this.socket.unsubscribe(); this.ws.complete(); } public sendMessage( message:string ):void{ this.ws.next( message ); } public start( url: string ):void{ let self = this; this.url = url; this.ws = Observable.webSocket( this.url ); this.socket = this.ws.subscribe( { next: ( data:MessageEvent ) => { if( data[ 'type' ] == 'welcome' ){ self.opened.next( true ); } this.message.next( data ); }, error: () => { self.opened.next( false ); this.message.next( { type: 'closed' } ); self.socket.unsubscribe(); setTimeout( () => { self.start( self.url ); }, 1000 ); }, complete: () => { this.message.next( { type: 'closed' } ); } } ); } } 

This service has 3 private and 2 public variables, as well as 3 public functions.

 private ws: WebSocketSubject<Object>; private socket: Subscription; private url: string; 

ws is the observed WebSocketSubject variable.
socket is a variable for subscribing to ws.
url - link to the socket.

 public message: Subject<Object> = new Subject(); public opened: Subject<boolean> = new Subject(); 

message - an observable variable to which all data from the socket is translated.
opened - an observable variable that monitors the opening / closing of the connection with the socket.

 public close():void{ this.socket.unsubscribe(); this.ws.complete(); } 

Function to close the socket.

 public sendMessage( message:string ):void{ this.ws.next( message ); } 

Function to send data to the socket.

 public start( url: string ):void{ let self = this; this.url = url; this.ws = Observable.webSocket( this.url ); this.socket = this.ws.subscribe( { next: ( data:MessageEvent ) => { if( data[ 'type' ] == 'welcome' ){ self.opened.next( true ); } this.message.next( data ); }, error: () => { self.opened.next( false ); this.message.next( { type: 'closed' } ); self.socket.unsubscribe(); setTimeout( () => { self.start( self.url ); }, 1000 ); }, complete: () => { this.message.next( { type: 'closed' } ); } } ); } 

This function opens a connection with the socket by writing its object to the observed variable and subscribes to the broadcast from it. When the connection is lost every second tries to restore the connection.

ChannelWebsocketService


Inherited service for subscribing to Rails5 Action Cable channels:

 import { Injectable } from "@angular/core"; import { Subject } from "rxjs/Subject"; import { WebSocketService } from "./websocket.service"; @Injectable() export class ChannelWebsocketService { private socketStarted: boolean; public observableData: Subject<Object> = new Subject(); public identifier:Object = {}; public identifierStr: string; public subscribed: Subject<boolean> = new Subject(); constructor( private websocketService: WebSocketService ){ this.observeOpened(); this.observeMessage(); } private static encodeIdentifier( identifier:string ):Object{ return JSON.parse( identifier ); } private static getDataString( parameters:Object ):string{ let first = true, result = ''; for ( let key in parameters ){ if( first ){ first = false; result += `\"${ key }\":\"${ parameters[ key ] }\"`; } else { result += `, \"${ key }\":\"${ parameters[ key ] }\"`; } } return `{ ${ result } }`; } private getSubscribeString():string{ this.identifierStr = ChannelWebsocketService.getDataString( this.identifier ); return JSON.stringify( { command: 'subscribe', identifier: this.identifierStr } ); }; private isThisChannel( data:Object ):boolean { if( data[ 'identifier' ] ){ let identifier = ChannelWebsocketService.encodeIdentifier( data[ 'identifier' ] ); if ( JSON.stringify( identifier ) === JSON.stringify( this.identifier ) ){ return true; } } return false; } private observeMessage(){ let self = this; this.websocketService.message.subscribe( ( data: Object ) => { if( self.isThisChannel( data ) ){ if( data[ 'type' ] && data[ 'type' ] == 'confirm_subscription' ){ this.subscribed.next( true ); } else if ( data[ 'message' ] ){ this.observableData.next( data[ 'message' ] ); } } } ); } private observeOpened(){ let self = this; this.websocketService.opened.subscribe( ( data: boolean ) => { self.socketStarted = data; if( data ){ self.subscribe(); } } ); } private subscribe(){ this.websocketService.sendMessage( this.getSubscribeString() ); } public send( data: Object ){ this.websocketService.sendMessage( JSON.stringify( { command:'message', identifier: this.identifierStr, data: ChannelWebsocketService.getDataString( data ) } ) ); } public unsubscribe(){ this.websocketService.sendMessage( JSON.stringify( { command: 'unsubscribe', identifier: this.identifierStr } ) ); this.subscribed.next( false ); } } 

This service has 2 private and 3 public variables, as well as 7 private and 2 public functions.

 private socketStarted: boolean; private identifierStr: string; 

socketStarted - a variable in which the state of the subscription to the socket is broadcast.
identifierStr is a specially prepared string identifier for the Rails5 Action Cable channel.

 public observableData: Subject<Object> = new Subject(); public identifier:Object = {}; public subscribed: Subject<boolean> = new Subject(); 

observableData - the observable variable in which the message is written from the socket for the channel.
identifier - object identifier for the Rails5 Action Cable channel.
subscribed - an observable variable in which the subscription state is written.

 constructor( private websocketService: WebSocketService ){ this.observeOpened(); this.observeMessage(); } ... private observeMessage(){ let self = this; this.websocketService.message.subscribe( ( data: Object ) => { if( self.isThisChannel( data ) ){ if( data[ 'type' ] && data[ 'type' ] == 'confirm_subscription' ){ this.subscribed.next( true ); } else if ( data[ 'message' ] ){ this.observableData.next( data[ 'message' ] ); } } } ); } private observeOpened(){ let self = this; this.websocketService.opened.subscribe( ( data: boolean ) => { self.socketStarted = data; if( data ){ self.subscribe(); } } ); } 

In the constructor of this service, we call 2 functions: observeMessage and observeOpened, which track the data sent by the socket and the state of the socket, respectively.

 private static encodeIdentifier( identifier:string ):Object{ return JSON.parse( identifier ); } private static getDataString( parameters:Object ):string{ let first = true, result = ''; for ( let key in parameters ){ if( first ){ first = false; result += `\"${ key }\":\"${ parameters[ key ] }\"`; } else { result += `, \"${ key }\":\"${ parameters[ key ] }\"`; } } return `{ ${ result } }`; } 

encodeIdentifier - the static private function decodes the identifier string that the socket returned to identify the message as belonging to the channel.
getDataString - converts an object to a string format that accepts Rails5 Action Cable.

 private getSubscribeString():string{ this.identifierStr = ChannelWebsocketService.getDataString( this.identifier ); return JSON.stringify( { command: 'subscribe', identifier: this.identifierStr } ); }; 

Returns a string to subscribe to a Rails5 Action Cable channel.

 private isThisChannel( data:Object ):boolean { if( data[ 'identifier' ] ){ let identifier = ChannelWebsocketService.encodeIdentifier( data[ 'identifier' ] ); if ( JSON.stringify( identifier ) === JSON.stringify( this.identifier ) ){ return true; } } return false; } 

Determines the message belonging to the channel to the socket.

 private subscribe(){ this.websocketService.sendMessage( this.getSubscribeString() ); } 

Signs to the channel.

 public send( data: Object ){ this.websocketService.sendMessage( JSON.stringify( { command:'message', identifier: this.identifierStr, data: ChannelWebsocketService.getDataString( data ) } ) ); } 

Sends data to the Rails5 Action Cable;

 public unsubscribe(){ this.websocketService.sendMessage( JSON.stringify( { command: 'unsubscribe', identifier: this.identifierStr } ) ); this.subscribed.next( false ); } 

Unsubscribes from the channel.

ChatChannelService


The service inherited from ChannelWebsocketService to subscribe to the ChatChannel channel:

 import { Injectable } from "@angular/core"; import { ChannelWebsocketService } from "./channel.websocket.service"; import { WebSocketService } from "./websocket.service"; @Injectable() export class ChatChannelService extends ChannelWebsocketService { constructor( websocketService: WebSocketService ){ super( websocketService ); this.identifier = { channel: 'ChatChannel' }; } } 

In the constructor of this service, the identifier variable is redefined to identify the channel.

Chatcomponent


Component that using ChatChannelService accepts / sends data to the channel.
I don't give an example of code, it is in GitHub, the link to which is given below

Example


Here you can download an example.

To start the client application, go to the “client” folder and call:

 npm install gulp 

I hope this article will help to clarify this issue.

Source: https://habr.com/ru/post/318040/


All Articles