using System; using System.Collections.Generic; using System.Linq; using System.Runtime.Serialization; using System.Text; using System.Web.Script.Serialization; namespace methanum { [DataContract] [KnownType(typeof(List<DateTime>))] public class Event { /// <summary> /// A unique id of the event /// </summary> [DataMember] public Guid Id { set; get; } /// <summary> /// DateTime of event creation /// </summary> [DataMember] public DateTime DataTime { get; set; } /// <summary> /// Target /// </summary> [DataMember] public string Destination { get; set; } /// <summary> /// Data container /// </summary> [DataMember] public Dictionary<string, object> Data { get; set; } public Event() { Init(); } public Event(string destination) { Init(); Destination = destination; } private void Init() { Data = new Dictionary<string, object>(); Id = Guid.NewGuid(); DataTime = DateTime.Now; } public override string ToString() { var properties = GetType().GetProperties(); var sb = new StringBuilder(); sb.AppendFormat("[{0}]", GetType().Name); foreach (var property in properties) { if (property.Name == "Data") { sb.Append("\nData = "); string s = string.Format(" {0}", '{'); s = Data.Keys.Aggregate(s, (current, key) => current + String.Format("\n {0}\t:{1}", key, Data[key])); sb.AppendFormat("{0}\n{1}", s, '}'); } else sb.AppendFormat("\n{0} = {1};", property.Name, property.GetValue(this, null)); } return sb.ToString(); } public void SetData(string key, object obj) { Data[key] = obj; } public object GetObj(string key) { return !Data.ContainsKey(key) ? null : Data[key]; } public double GetDbl(string key) { return !Data.ContainsKey(key) ? Double.NaN : Convert.ToDouble(Data[key]); } public int GetInt(string key) { return !Data.ContainsKey(key) ? Int32.MinValue : Convert.ToInt32(Data[key]); } public bool GetBool(string key) { return Data.ContainsKey(key) && Convert.ToBoolean(Data[key]); } public string GetStr(string key) { return !Data.ContainsKey(key) ? null : Convert.ToString(Data[key]); } public void SetCustomData(string key, object value) { var serializer = new JavaScriptSerializer(); var str = serializer.Serialize(value); SetData(key, str); } public object GetCustom(string key, Type valueType) { if (!Data.ContainsKey(key)) return null; if (Data[key].GetType() != typeof(string)) return null; var serializer = new JavaScriptSerializer(); var str = (string) Data[key]; var obj = serializer.Deserialize(str, valueType); return obj; } } }
using System.ServiceModel; namespace methanum { [ServiceContract(CallbackContract = typeof(IListener))] public interface IGate { [OperationContract] void Subscribe(); [OperationContract] void KillConnection(); [OperationContract] void Fire(Event evt); } }
using System; using System.Collections.Generic; using System.ServiceModel; using System.ServiceModel.Channels; namespace methanum { public class Gate : IGate { private static List<OperationContext> _subscribers; public Gate() { if (_subscribers == null) _subscribers = new List<OperationContext>(); } public void Subscribe() { var oc = OperationContext.Current; if (!_subscribers.Exists(c => c.SessionId == oc.SessionId)) { _subscribers.Add(oc); Console.WriteLine("(subscribe \"{0}\")", oc.SessionId); } } public void KillConnection() { var oc = OperationContext.Current; _subscribers.RemoveAll(c => c.SessionId == oc.SessionId); Console.WriteLine("(kill \"{0}\")", oc.SessionId); } public void Fire(Event evt) { var currentOperationContext = OperationContext.Current; var remoteEndpointMessageProperty = currentOperationContext.IncomingMessageProperties[RemoteEndpointMessageProperty.Name] as RemoteEndpointMessageProperty; var ip = ""; var port = 0; if (remoteEndpointMessageProperty != null) { ip = remoteEndpointMessageProperty.Address; port = remoteEndpointMessageProperty.Port; } Console.WriteLine("(Fire (event . \"{0}\") (from . \"{1}:{2}\") (subscribers . {3}))", evt.Id, ip, port, _subscribers.Count); for (var i = _subscribers.Count - 1; i >= 0; i--) { var oc = _subscribers[i]; if (oc.Channel.State == CommunicationState.Opened) { var channel = oc.GetCallbackChannel<IListener>(); try { ((DelegateReceive) (channel.Receive)).BeginInvoke(evt, null, null); } catch (Exception e) { Console.WriteLine(e.Message); } } else { _subscribers.RemoveAt(i); Console.WriteLine("(dead . \"{0}\")", oc.SessionId); } } } } }
using System.ServiceModel; namespace methanum { public delegate void DelegateReceive(Event evt); interface IListener { [OperationContract(IsOneWay = true)] void Receive(Event evt); } }
using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.ServiceModel; using System.Threading; namespace methanum { public delegate void CbHandler(Event evt); public class Connector : IListener { private Dictionary<string, CbHandler> _handlers; private NetTcpBinding _binding; private EndpointAddress _endpointToAddress; private InstanceContext _instance; private DuplexChannelFactory<IGate> _channelFactory; private IGate _channel; private Thread _fireThread; private List<Event> _eventQueue; public event CbHandler ReceiveEvent; private bool _isSubscribed; private object _channelSync = new object(); protected virtual void OnReceive(Event evt) { CbHandler handler = ReceiveEvent; if (handler != null) handler.BeginInvoke(evt, null, null); } //localhost:2255 public Connector(string ipAddress) { init(ipAddress); } private void init(string ipAddress) { _handlers = new Dictionary<string, CbHandler>(); _binding = new NetTcpBinding(); _endpointToAddress = new EndpointAddress(string.Format("net.tcp://{0}", ipAddress)); _instance = new InstanceContext(this); Conect(); _eventQueue = new List<Event>(); _fireThread = new Thread(FireProc); _fireThread.IsBackground = true; _fireThread.Start(); } private void Conect() { _isSubscribed = false; while (!_isSubscribed) { try { _channelFactory = new DuplexChannelFactory<IGate>(_instance, _binding, _endpointToAddress); _channel = _channelFactory.CreateChannel(); _channel.Subscribe(); _isSubscribed = true; } catch (Exception e) { if (!(e is EndpointNotFoundException)) throw e; Thread.Sleep(1000); } } } private void ReConect() { lock (_channelSync) { try { _channel.KillConnection(); } catch (Exception e) { Console.WriteLine("(ReConect-exception \"{0}\"", e.Message); } Conect(); } } public void Fire(Event evt) { lock (_eventQueue) { _eventQueue.Add(evt); } } private void FireProc() { while (true) { var isHasEventsToFire = false; lock (_eventQueue) { isHasEventsToFire = _eventQueue.Any(); } if (_isSubscribed && isHasEventsToFire) { Event evt; lock (_eventQueue) { evt = _eventQueue.First(); } try { lock (_eventQueue) { _eventQueue.Remove(evt); } _channel.Fire(evt); } catch (Exception) { if (_isSubscribed) _isSubscribed = false; ReConect(); } } else Thread.Sleep(10); } } public void SetHandler(string destination, CbHandler handler) { _handlers[destination] = handler; } public void DeleteHandler(string destination) { if(_handlers.ContainsKey(destination)) _handlers.Remove(destination); } public void Receive(Event evt) { if (_handlers.ContainsKey(evt.Destination)) { _handlers[evt.Destination].BeginInvoke(evt, null, null); } OnReceive(evt); } static public void HoldProcess() { var processName = Process.GetCurrentProcess().ProcessName; var defColor = Console.ForegroundColor; Console.ForegroundColor = ConsoleColor.Green; Console.WriteLine("The {0} is ready", processName); Console.WriteLine("Press <Enter> to terminate {0}", processName); Console.ForegroundColor = defColor; Console.ReadLine(); } } }
using System; using System.ServiceModel; namespace methanum { public class SrvRunner { private ServiceHost _sHost; public void Start(int port) { var uris = new[] { new Uri(string.Format("net.tcp://0.0.0.0:{0}", port)) }; _sHost = new ServiceHost(typeof (Gate), uris); _sHost.Open(); foreach (var uri2 in _sHost.BaseAddresses) { Console.WriteLine("Start on: {0}", uri2.ToString()); } } public void Stop() { _sHost.Close(); } } }
using System; using System.Linq; using methanum; namespace Core { internal class CoreMain { private static void Main(string[] args) { int port = 0; if ((!args.Any()) || (!int.TryParse(args[0], out port))) { Console.WriteLine("Usage:"); Console.WriteLine("Core.exe port"); Environment.Exit(1); } try { var coreSrv = new SrvRunner(); coreSrv.Start(port); Console.WriteLine("The Core is ready."); Console.WriteLine("Press <ENTER> to terminate Core."); Console.ReadLine(); coreSrv.Stop(); } catch (Exception e) { Console.WriteLine(e.Message); } } } }
using System; using System.Linq; using methanum; namespace ClentExamle { class Program { static void Main(string[] args) { if ((!args.Any())) { Console.WriteLine("Usage:"); Console.WriteLine("ClentExample.exe coreAddress:port"); Environment.Exit(1); } var userName = ""; while (String.IsNullOrWhiteSpace(userName)) { Console.WriteLine("Please write user name:"); userName = Console.ReadLine(); } try { var maingate = new Connector(args[0]); maingate.SetHandler("message", MsgHandler); Console.WriteLine("Hello {0}, now you can send messages", userName); while (true) { var msg = Console.ReadLine(); var evt = new Event("message"); evt.SetData("name", userName); evt.SetData("text", msg); maingate.Fire(evt); } } catch (Exception e) { Console.WriteLine(e.Message); } } static private void MsgHandler(Event evt) { Console.WriteLine("[{0}] >> {1}", evt.GetStr("name"), evt.GetStr("text")); } } }
Source: https://habr.com/ru/post/280640/
All Articles