module App class << self def configuration yield(config) if block_given? config.sessions = Metriks.counter('total_sessions') config.active = Metriks.counter('active_sessions') end def config @config ||= OpenStruct.new( redis: nil, root: nil ) end def id @instance_id ||= SecureRandom.hex end def logger @logger ||= Logger.new $stderr end def register config.redis.multi do config.redis.set "node_#{App.id}", true config.redis.expire "node_#{App.id}", 60*10 end if config.redis EM.next_tick do config.sub = PubSub.connect config.sub.subscribe App.id do |type, channel, message| case type when 'message' begin json = Oj.load(message, mode: :compat) WS::Base.remote_messsage json rescue => ex App.logger.error "ERROR: #{message.class} #{message} #{ex.to_s}" end else App.logger.debug "(#{type}) #{channel}:: #{message}" end end @pingpong = EM.add_periodic_timer(30) do App.config.redis.expire "node_#{App.id}", 60 end end rescue config.redis = nil end end end
module WS class Base NEXT_RACK = [404, {}, []].freeze def self.call(*args) instance.call(*args) end def self.instance @instance ||= self.new end def self.remote_messsage(json) user = User.get json['from'] instance.send :process, user, json if user rescue => ex user.error( { error: ex.to_s } ) end def initialize @ws_cache = {} end def call(env) return NEXT_RACK unless Faye::WebSocket.websocket?(env) ws = Faye::WebSocket.new(env, ['xmpp'], ping: 5) user = User.register(ws) ws.onmessage = lambda do |event| json = Oj.load(event.data, mode: :compat) process(user, json ) end ws.onclose = lambda do |event| App.logger.info [:close, event.code, event.reason] user.unregister user = nil end ws.rack_response rescue WS::User::NotUnique => ex ws.send Oj.dump({ action: :error, data: { error: 'not unique session' } }) ws.close ws.rack_response end private def process(user, json) action = json['action'].to_s data = json['data'] return App.logger.info([:message, 'Empty action']) if action.empty? return App.logger.info([:message, "Unknown action #{json['action']}"]) unless user.respond_to? "on_#{action}" user.send "on_#{action}", data rescue => ex user.error({ error: ex.to_s }) puts ex.to_s puts ex.backtrace end end end
module WS class User include UserBehavior attr_reader :id class Error < StandardError; end class RoomFull < Error; end class NotFound < Error attr_reader :id def initialize(id); @id = id end def to_s; "User '@#{id}' not found" end end class NotUnique < Error; end class << self def cache @ws_cache ||= {} end def get(id) fail NotFound.new(id) if id.to_s.empty? @ws_cache.fetch(id) rescue KeyError WS::RemoteUser.new(id) end def register(ws) self.new(ws) end def unregister(ws) url = URI.parse(ws.url) id = url.path.split('/').last get(id).unregister end end def initialize(ws) @ws = ws register @pingpong = EM.add_periodic_timer(5) do @ws.ping('') do App.config.redis.expire @id, 15 if App.config.redis end end end def unregister on_close if respond_to? :on_close App.config.active.decrement App.config.redis.del @id if App.config.redis User.cache.delete(@id) @pingpong.cancel @pingpong = nil @ws = nil @id = nil end def send_client(from, action, data) return unless @ws data = Oj.dump({ from: from.id, action: action.to_s, data: data }, mode: :compat) @ws.send(data) end private def register url = URI.parse(@ws.url) @id = url.path.split('/').last if App.config.redis App.config.redis.multi do App.config.redis.set @id, App.id App.config.redis.expire @id, 15 end App.config.sessions.increment App.config.active.increment end User.cache[@id] = self App.logger.info [:open, @ws.url, @ws.version, @ws.protocol] on_register if respond_to? :on_close self end end class RemoteUser include UserBehavior attr_reader :id attr_reader :node def initialize(id) @id = id.to_s fail WS::User::NotFound.new(id) if @id.empty? @node = App.config.redis.get(@id).to_s fail WS::User::NotFound.new(id) if @node.empty? end def send_client(from, action, data) return if node.to_s.empty? App.logger.info ['REMOTE', self.id, from.id, action] data = Oj.dump({ from: from.id, action: action.to_s, data: data }, mode: :compat) App.config.redis.publish node, data end end end
module UserBehavior module ClassMethods def register_action(action, params = {}) return App.logger.info ['register_action', "Method #{action} already defined"] if respond_to? action block = lambda do |*args | if block_given? data, from = yield(self, *args) send_client from || self, action, data else send_client self, action, args.first end end define_method action, &block define_method "on_#{action}" do |data| self.send action, data end if params[:passthrough] end end def self.included(base) base.instance_exec do extend ClassMethods register_action :message do |user, from, text| [{ to: user.id, text: text }, from] end register_action :error, passthrough: true end end def on_message(data) App.logger.info ['MESSAGE', id, data.to_s] to_user_id = data['to'] to_user = WS::User.get(to_user_id) to_user.message self, data['text'] rescue WS::User::NotFound => ex error({ error: ex.to_s }) end end
Source: https://habr.com/ru/post/248737/
All Articles