📜 ⬆️ ⬇️

[The Methanum project] Creating tools for building distributed systems with the “Star” topology



The star is by far the most common topology of computer networks. This structure has several advantages: ease of scaling, reliability (the failure of one machine does not affect the others) and ease of administration. Of course, this solution from the physical layer has long been implemented at the program level. Nevertheless, I present my version of the .Net toolkit for building distributed systems with a star topology to the readers.

Systems built on the basis of such a topology can be structurally organized, for example, as in the image below.
')


In this article, I will describe the creation of a minimal toolkit, without many useful features that I once used in my designs based on the architecture presented. However, this is quite enough to build really useful systems.

Methanum





The project received the code name Methanum solely because of the structural similarity of the topology with the methane molecule :). The central node performing the role of a communicator is called "Core". The rest of the network applications are connected to the kernel and subscribe to events. Also, each network application can emit events. Thus, data is exchanged through the network through events. Events is a serializable Event class that can contain arbitrary data. An event contains at least 2 fields - the string field Destination, which classifies the event, and the Data field, which contains the key value dictionary. Key is a string, the name of the argument, Value is of type object and may contain primitives (int, double, bool ...). For structures, it is necessary to help the system serialize them somewhat.

To begin with, we will create a project “methanum” of the class library in C # and add files to it as the text goes.

Event





As already mentioned, the data is transmitted through events. An event is a class that includes a Data data field and a field to identify a Destination event. I also left two more fields: Id - a unique identifier and DataTime containing the time of the event creation. These additional fields are needed solely for convenience, for example, for parsing logs. The event class also contains a number of methods designed to simplify the life of the programmer, I think their purpose will be clear from the code and does not need additional explanations.

Event.cs
using System; using System.Collections.Generic; using System.Linq; using System.Runtime.Serialization; using System.Text; using System.Web.Script.Serialization; namespace methanum { [DataContract] [KnownType(typeof(List<DateTime>))] public class Event { /// <summary> /// A unique id of the event /// </summary> [DataMember] public Guid Id { set; get; } /// <summary> /// DateTime of event creation /// </summary> [DataMember] public DateTime DataTime { get; set; } /// <summary> /// Target /// </summary> [DataMember] public string Destination { get; set; } /// <summary> /// Data container /// </summary> [DataMember] public Dictionary<string, object> Data { get; set; } public Event() { Init(); } public Event(string destination) { Init(); Destination = destination; } private void Init() { Data = new Dictionary<string, object>(); Id = Guid.NewGuid(); DataTime = DateTime.Now; } public override string ToString() { var properties = GetType().GetProperties(); var sb = new StringBuilder(); sb.AppendFormat("[{0}]", GetType().Name); foreach (var property in properties) { if (property.Name == "Data") { sb.Append("\nData = "); string s = string.Format(" {0}", '{'); s = Data.Keys.Aggregate(s, (current, key) => current + String.Format("\n {0}\t:{1}", key, Data[key])); sb.AppendFormat("{0}\n{1}", s, '}'); } else sb.AppendFormat("\n{0} = {1};", property.Name, property.GetValue(this, null)); } return sb.ToString(); } public void SetData(string key, object obj) { Data[key] = obj; } public object GetObj(string key) { return !Data.ContainsKey(key) ? null : Data[key]; } public double GetDbl(string key) { return !Data.ContainsKey(key) ? Double.NaN : Convert.ToDouble(Data[key]); } public int GetInt(string key) { return !Data.ContainsKey(key) ? Int32.MinValue : Convert.ToInt32(Data[key]); } public bool GetBool(string key) { return Data.ContainsKey(key) && Convert.ToBoolean(Data[key]); } public string GetStr(string key) { return !Data.ContainsKey(key) ? null : Convert.ToString(Data[key]); } public void SetCustomData(string key, object value) { var serializer = new JavaScriptSerializer(); var str = serializer.Serialize(value); SetData(key, str); } public object GetCustom(string key, Type valueType) { if (!Data.ContainsKey(key)) return null; if (Data[key].GetType() != typeof(string)) return null; var serializer = new JavaScriptSerializer(); var str = (string) Data[key]; var obj = serializer.Deserialize(str, valueType); return obj; } } } 



Gate





The core of the core lies in the implementation of the interface, let's call it the “gate interface”. The main goal of the gate is to provide functionality for registering clients and asynchronously sending events in both directions (from the application to the core and back).

IGate.cs
 using System.ServiceModel; namespace methanum { [ServiceContract(CallbackContract = typeof(IListener))] public interface IGate { [OperationContract] void Subscribe(); [OperationContract] void KillConnection(); [OperationContract] void Fire(Event evt); } } 



Our data contract is duplex, in the forward direction - from the application to the core - we shoot events through IGate by calling the void Fire (Event evt) method. The callback — from kernel to application — occurs via the IListener interface, which will be covered later.
Gates work on the following principle. When the kernel starts, a Gate object inherited from the IGate interface is created. Gate has a static _subscribers field in which all active connections to the kernel are stored. When calling the Subscribe () method, we add the current connection, if it has not yet been added. The KillConnection () method is used to remove the current connection. The most interesting is the Fire method (Event evt), but there is nothing complicated in it either. Half of the method we dig up to the IP address and port, only to display information in the console. I left this part of the code solely in order to demonstrate how to access the connection address, for example, to filter or log events by allowed addresses. The main work of this method is to bypass all existing connections and asynchronously call the Receive method from their IListener listeners. If we find a closed connection, we immediately remove it from the list of active connections.

Gate.cs
 using System; using System.Collections.Generic; using System.ServiceModel; using System.ServiceModel.Channels; namespace methanum { public class Gate : IGate { private static List<OperationContext> _subscribers; public Gate() { if (_subscribers == null) _subscribers = new List<OperationContext>(); } public void Subscribe() { var oc = OperationContext.Current; if (!_subscribers.Exists(c => c.SessionId == oc.SessionId)) { _subscribers.Add(oc); Console.WriteLine("(subscribe \"{0}\")", oc.SessionId); } } public void KillConnection() { var oc = OperationContext.Current; _subscribers.RemoveAll(c => c.SessionId == oc.SessionId); Console.WriteLine("(kill \"{0}\")", oc.SessionId); } public void Fire(Event evt) { var currentOperationContext = OperationContext.Current; var remoteEndpointMessageProperty = currentOperationContext.IncomingMessageProperties[RemoteEndpointMessageProperty.Name] as RemoteEndpointMessageProperty; var ip = ""; var port = 0; if (remoteEndpointMessageProperty != null) { ip = remoteEndpointMessageProperty.Address; port = remoteEndpointMessageProperty.Port; } Console.WriteLine("(Fire (event . \"{0}\") (from . \"{1}:{2}\") (subscribers . {3}))", evt.Id, ip, port, _subscribers.Count); for (var i = _subscribers.Count - 1; i >= 0; i--) { var oc = _subscribers[i]; if (oc.Channel.State == CommunicationState.Opened) { var channel = oc.GetCallbackChannel<IListener>(); try { ((DelegateReceive) (channel.Receive)).BeginInvoke(evt, null, null); } catch (Exception e) { Console.WriteLine(e.Message); } } else { _subscribers.RemoveAt(i); Console.WriteLine("(dead . \"{0}\")", oc.SessionId); } } } } } 



Listener





To send a message from the kernel to the client, only one Receive method is required, which is defined in the IListener interface.

IListener.cs
 using System.ServiceModel; namespace methanum { public delegate void DelegateReceive(Event evt); interface IListener { [OperationContract(IsOneWay = true)] void Receive(Event evt); } } 



The Connector class is inherited from the IListener interface, which implements the entire logic of the interaction between the client application and the kernel. When creating an instance of a class, a connection to the kernel is created, through which messages are sent and received. Messages are sent and received in separate threads to prevent blocking of applications and the kernel. To distinguish between events, they have the field Destination. It is inconvenient to filter events using if-then-else or switch-case constructs, so a mechanism was implemented that allows a handler to be associated with each event of interest. Such mapping is stored in the Dictionary Dictionary <string, CbHandler> _handlers ;. When an event is accepted, a search is performed in the dictionary and, if a key is found, the corresponding handler is called.

Connector.cs
 using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.ServiceModel; using System.Threading; namespace methanum { public delegate void CbHandler(Event evt); public class Connector : IListener { private Dictionary<string, CbHandler> _handlers; private NetTcpBinding _binding; private EndpointAddress _endpointToAddress; private InstanceContext _instance; private DuplexChannelFactory<IGate> _channelFactory; private IGate _channel; private Thread _fireThread; private List<Event> _eventQueue; public event CbHandler ReceiveEvent; private bool _isSubscribed; private object _channelSync = new object(); protected virtual void OnReceive(Event evt) { CbHandler handler = ReceiveEvent; if (handler != null) handler.BeginInvoke(evt, null, null); } //localhost:2255 public Connector(string ipAddress) { init(ipAddress); } private void init(string ipAddress) { _handlers = new Dictionary<string, CbHandler>(); _binding = new NetTcpBinding(); _endpointToAddress = new EndpointAddress(string.Format("net.tcp://{0}", ipAddress)); _instance = new InstanceContext(this); Conect(); _eventQueue = new List<Event>(); _fireThread = new Thread(FireProc); _fireThread.IsBackground = true; _fireThread.Start(); } private void Conect() { _isSubscribed = false; while (!_isSubscribed) { try { _channelFactory = new DuplexChannelFactory<IGate>(_instance, _binding, _endpointToAddress); _channel = _channelFactory.CreateChannel(); _channel.Subscribe(); _isSubscribed = true; } catch (Exception e) { if (!(e is EndpointNotFoundException)) throw e; Thread.Sleep(1000); } } } private void ReConect() { lock (_channelSync) { try { _channel.KillConnection(); } catch (Exception e) { Console.WriteLine("(ReConect-exception \"{0}\"", e.Message); } Conect(); } } public void Fire(Event evt) { lock (_eventQueue) { _eventQueue.Add(evt); } } private void FireProc() { while (true) { var isHasEventsToFire = false; lock (_eventQueue) { isHasEventsToFire = _eventQueue.Any(); } if (_isSubscribed && isHasEventsToFire) { Event evt; lock (_eventQueue) { evt = _eventQueue.First(); } try { lock (_eventQueue) { _eventQueue.Remove(evt); } _channel.Fire(evt); } catch (Exception) { if (_isSubscribed) _isSubscribed = false; ReConect(); } } else Thread.Sleep(10); } } public void SetHandler(string destination, CbHandler handler) { _handlers[destination] = handler; } public void DeleteHandler(string destination) { if(_handlers.ContainsKey(destination)) _handlers.Remove(destination); } public void Receive(Event evt) { if (_handlers.ContainsKey(evt.Destination)) { _handlers[evt.Destination].BeginInvoke(evt, null, null); } OnReceive(evt); } static public void HoldProcess() { var processName = Process.GetCurrentProcess().ProcessName; var defColor = Console.ForegroundColor; Console.ForegroundColor = ConsoleColor.Green; Console.WriteLine("The {0} is ready", processName); Console.WriteLine("Press <Enter> to terminate {0}", processName); Console.ForegroundColor = defColor; Console.ReadLine(); } } } 



For convenience, we will create another small class that starts the service.

SrvRunner.cs
 using System; using System.ServiceModel; namespace methanum { public class SrvRunner { private ServiceHost _sHost; public void Start(int port) { var uris = new[] { new Uri(string.Format("net.tcp://0.0.0.0:{0}", port)) }; _sHost = new ServiceHost(typeof (Gate), uris); _sHost.Open(); foreach (var uri2 in _sHost.BaseAddresses) { Console.WriteLine("Start on: {0}", uri2.ToString()); } } public void Stop() { _sHost.Close(); } } } 



Core





We implemented all the classes necessary for communicating our applications. It remains to create a kernel to which our applications will connect. To do this, in the solution we create a project “Core” of the console application, connect the methanum assembly to it. In general, we have already written everything, it remains only to run.

CoreMain.cs
 using System; using System.Linq; using methanum; namespace Core { internal class CoreMain { private static void Main(string[] args) { int port = 0; if ((!args.Any()) || (!int.TryParse(args[0], out port))) { Console.WriteLine("Usage:"); Console.WriteLine("Core.exe port"); Environment.Exit(1); } try { var coreSrv = new SrvRunner(); coreSrv.Start(port); Console.WriteLine("The Core is ready."); Console.WriteLine("Press <ENTER> to terminate Core."); Console.ReadLine(); coreSrv.Stop(); } catch (Exception e) { Console.WriteLine(e.Message); } } } } 



Usage example



For demostration, let's create a primitive messenger: create another console application, add a link to the methanum assembly, and paste the contents of the Program.cs file.

Program.cs
 using System; using System.Linq; using methanum; namespace ClentExamle { class Program { static void Main(string[] args) { if ((!args.Any())) { Console.WriteLine("Usage:"); Console.WriteLine("ClentExample.exe coreAddress:port"); Environment.Exit(1); } var userName = ""; while (String.IsNullOrWhiteSpace(userName)) { Console.WriteLine("Please write user name:"); userName = Console.ReadLine(); } try { var maingate = new Connector(args[0]); maingate.SetHandler("message", MsgHandler); Console.WriteLine("Hello {0}, now you can send messages", userName); while (true) { var msg = Console.ReadLine(); var evt = new Event("message"); evt.SetData("name", userName); evt.SetData("text", msg); maingate.Fire(evt); } } catch (Exception e) { Console.WriteLine(e.Message); } } static private void MsgHandler(Event evt) { Console.WriteLine("[{0}] >> {1}", evt.GetStr("name"), evt.GetStr("text")); } } } 



Now we start the Core.exe application by specifying the port in the command line, for example “Core 2255”. Then we start several instances of ClentExample.exe with the command “ClentExample localhost: 2255”. Applications offer to enter a username, and then connect to the kernel. As a result, a broadcasting primitive chat is obtained: each new message is sent by calling maingate.Fire (evt), received in the MsgHandler (Event evt) handler.



The full source is available on methanum gihab .

Source: https://habr.com/ru/post/280640/


All Articles