📜 ⬆️ ⬇️

Horizontal websocket scaling in Ruby

Not so long ago there was an article in which the author described his framework for writing applications using Ruby, Sinatra and websossket. But in that decision the issue of scaling was not raised. So, when connecting to one of the nodes, users can receive notifications / data only about events / changes caused by users of the same node, and if they make changes through the other, they will not know. To solve this problem it is necessary to organize a common data bus. I will consider this task in the context of client-client messaging.

Data bus


The requirements that we will impose on the tire are the following:

You can organize the bus through the repository with a periodic survey, or through the queue server.
The first option does not satisfy the second condition, since The transfer delay will be equal to the repository polling period. Reducing the period will increase the load on him. Therefore, this option is dismissed immediately.

The second option is best suited. In this case, you can use specialized solutions like RabbitMQ , ActiveMQ . Both of these products are serious decisions, with many functions, good scaling. You can use them, but you need to assess whether it will not be a gun on the sparrows. In addition to these solutions, the queue functionality is also provided by Redis , in addition we obtain the key-value storage, which we also need.

Redis provides the simplest Pub-Sub mechanism, which is sufficient for our task. It is fast enough, easy to use and has low transmission delays.
')

Decision


Our system will have the following scheme.



Messages between users of the same node are transmitted directly, and messages between nodes over the bus.
For this:
  1. the node generates a unique name;
  2. subscribes to messages in Redis;
  3. all clients connected to this node record a key-value pair in the form of a client identifier and the identifier of the node to which it is connected;
  4. when sending a message to another client, find out the node name and pass the message to its queue for processing.

And now we realize


The faye-websocket-ruby is selected as the library for the websocket. To work with Redis, the standard gem redis (hiredis) + sample code for PubSub via EventMachine, since the implementation from gem works in blocking mode, and when working in the same thread as the web server, this is not acceptable.

module App class << self def configuration yield(config) if block_given? config.sessions = Metriks.counter('total_sessions') config.active = Metriks.counter('active_sessions') end def config @config ||= OpenStruct.new( redis: nil, root: nil ) end def id @instance_id ||= SecureRandom.hex end def logger @logger ||= Logger.new $stderr end def register config.redis.multi do config.redis.set "node_#{App.id}", true config.redis.expire "node_#{App.id}", 60*10 end if config.redis EM.next_tick do config.sub = PubSub.connect config.sub.subscribe App.id do |type, channel, message| case type when 'message' begin json = Oj.load(message, mode: :compat) WS::Base.remote_messsage json rescue => ex App.logger.error "ERROR: #{message.class} #{message} #{ex.to_s}" end else App.logger.debug "(#{type}) #{channel}:: #{message}" end end @pingpong = EM.add_periodic_timer(30) do App.config.redis.expire "node_#{App.id}", 60 end end rescue config.redis = nil end end end 

The main work of this module is the register method, which registers itself on the bus and waits for incoming messages. For monitoring, a key of the type node_% node_id% c TTL in 60 seconds and a refresh period of 30 seconds is created, in case the node falls off. Thus, you can always find out how many nodes are currently on the network and their names.

 module WS class Base NEXT_RACK = [404, {}, []].freeze def self.call(*args) instance.call(*args) end def self.instance @instance ||= self.new end def self.remote_messsage(json) user = User.get json['from'] instance.send :process, user, json if user rescue => ex user.error( { error: ex.to_s } ) end def initialize @ws_cache = {} end def call(env) return NEXT_RACK unless Faye::WebSocket.websocket?(env) ws = Faye::WebSocket.new(env, ['xmpp'], ping: 5) user = User.register(ws) ws.onmessage = lambda do |event| json = Oj.load(event.data, mode: :compat) process(user, json ) end ws.onclose = lambda do |event| App.logger.info [:close, event.code, event.reason] user.unregister user = nil end ws.rack_response rescue WS::User::NotUnique => ex ws.send Oj.dump({ action: :error, data: { error: 'not unique session' } }) ws.close ws.rack_response end private def process(user, json) action = json['action'].to_s data = json['data'] return App.logger.info([:message, 'Empty action']) if action.empty? return App.logger.info([:message, "Unknown action #{json['action']}"]) unless user.respond_to? "on_#{action}" user.send "on_#{action}", data rescue => ex user.error({ error: ex.to_s }) puts ex.to_s puts ex.backtrace end end end 

This class is responsible for establishing connections and handling messages. In the call method, a new client is created and handlers are hung. The class method remote_messsage is used to receive external messages (from the bus). The process method is a single point for messages coming directly from the client and for messages coming over the bus.
Customers
 module WS class User include UserBehavior attr_reader :id class Error < StandardError; end class RoomFull < Error; end class NotFound < Error attr_reader :id def initialize(id); @id = id end def to_s; "User '@#{id}' not found" end end class NotUnique < Error; end class << self def cache @ws_cache ||= {} end def get(id) fail NotFound.new(id) if id.to_s.empty? @ws_cache.fetch(id) rescue KeyError WS::RemoteUser.new(id) end def register(ws) self.new(ws) end def unregister(ws) url = URI.parse(ws.url) id = url.path.split('/').last get(id).unregister end end def initialize(ws) @ws = ws register @pingpong = EM.add_periodic_timer(5) do @ws.ping('') do App.config.redis.expire @id, 15 if App.config.redis end end end def unregister on_close if respond_to? :on_close App.config.active.decrement App.config.redis.del @id if App.config.redis User.cache.delete(@id) @pingpong.cancel @pingpong = nil @ws = nil @id = nil end def send_client(from, action, data) return unless @ws data = Oj.dump({ from: from.id, action: action.to_s, data: data }, mode: :compat) @ws.send(data) end private def register url = URI.parse(@ws.url) @id = url.path.split('/').last if App.config.redis App.config.redis.multi do App.config.redis.set @id, App.id App.config.redis.expire @id, 15 end App.config.sessions.increment App.config.active.increment end User.cache[@id] = self App.logger.info [:open, @ws.url, @ws.version, @ws.protocol] on_register if respond_to? :on_close self end end class RemoteUser include UserBehavior attr_reader :id attr_reader :node def initialize(id) @id = id.to_s fail WS::User::NotFound.new(id) if @id.empty? @node = App.config.redis.get(@id).to_s fail WS::User::NotFound.new(id) if @node.empty? end def send_client(from, action, data) return if node.to_s.empty? App.logger.info ['REMOTE', self.id, from.id, action] data = Oj.dump({ from: from.id, action: action.to_s, data: data }, mode: :compat) App.config.redis.publish node, data end end end 


The register method registers a user in the storage, associating his ID with the node ID where he is connected and caches it in the local list. The unregister method on the contrary removes all customer records and deletes the timer. A timer is used to periodically check the status of the client and update the TTL to record it so that there are no dead souls in Redis.
The client ID is obtained from the URL for which the connection request was made. It has the format ws: //% hostname% / ws /% user_id% where user_id is a randomly generated unique sequence.

The send_client method sends the data to the client itself.

The class method get takes a special place. This method returns by ID an instance of the WS :: User class or if the user is not found in the local cache creates an instance of the class WS :: RemoteUser . When it is created, it is checked if there is such an ID in the repository and to which node it belongs. If the ID does not find an exception is thrown.

The WS :: RemoteUser class, in contrast to WS :: User, has only one method send_client , which sends the generated messages via the bus to the required node.

Thus, no matter where the client is located, a call to the send_client method will deliver the data to the destination.

 module UserBehavior module ClassMethods def register_action(action, params = {}) return App.logger.info ['register_action', "Method #{action} already defined"] if respond_to? action block = lambda do |*args | if block_given? data, from = yield(self, *args) send_client from || self, action, data else send_client self, action, args.first end end define_method action, &block define_method "on_#{action}" do |data| self.send action, data end if params[:passthrough] end end def self.included(base) base.instance_exec do extend ClassMethods register_action :message do |user, from, text| [{ to: user.id, text: text }, from] end register_action :error, passthrough: true end end def on_message(data) App.logger.info ['MESSAGE', id, data.to_s] to_user_id = data['to'] to_user = WS::User.get(to_user_id) to_user.message self, data['text'] rescue WS::User::NotFound => ex error({ error: ex.to_s }) end end 

The processing of the events themselves is placed in a separate UserBehavior module, which extends the previous two classes with methods for reacting to messages. Each message has fields FROM , ACTION and DATA . The first identifies from whom it came, the second determines the method, and the third related data. So for ACTION , the on_message method will be called with the value “message”, to which the value of the DATA field will be passed.

Using this approach, it was possible to implement transparent message transfer between connected clients, and it does not matter if they are located on the same node or on different ones. For testing I ran several instances on different ports, the messages were sent and received correctly.

For those who want to try, the code of the working application is posted on github . Runs simply through rackup

PS


This solution is not complete, I think there is much to improve it and remove too much, but as a starting point it will fit perfectly.

Source: https://habr.com/ru/post/248737/


All Articles