📜 ⬆️ ⬇️

Messaging between roles and instances

Today, your attention is the second article from a series of articles on architectural solutions of the AtContent.com service.

In the process of working on the AtContent.com service, we had the problem of synchronization between instances within a role, as well as between roles. Standard tools from the SDK do not allow to solve this problem with two lines of code. Therefore, we have developed our own solution for communication between instances. It allows you to perform tasks on all instances of the role at once, or on a single instance, which is chosen in a certain way or randomly.


The essence of the solution lies in communication between role instances via HTTP. In order to do this safely, you need to create an Endpoint in the role settings.
')

Figure 1. Create Endpoint

The point type is Internal, and specify any unused Private port. This is necessary so that the exchange takes place inside the data center. After adding, you can access this point via HTTP and send requests.

To process requests for an instance, you need to add a Generic Handler to the Web Role. Here it is Communicator.ashx


Figure 2. Creating a Generic Handler

In principle, you can already arrange messaging, but there are still several problems to be solved.

The first problem is security. The handler added earlier is also available at the external address and port. Therefore, it is necessary to protect it at the entrance. The easiest way is to add a parameter to the POST request that will act as a key.

The second problem is the call to the Internal Endpoint for the instance. To solve it, the SDK offers a great tool - ServiceRuntime. To use it you need to connect the appropriate namespaces:
using Microsoft.WindowsAzure; using Microsoft.WindowsAzure.ServiceRuntime; 

After that, we can select the role instances we need and send POST requests to them:
 //   var Params = new Dictionary<string, string>(); Params["key"] = "key_value"; //   POST- string STDIN = ""; foreach (var Item in Params) { STDIN += Item.Key + "=" + HttpUtility.UrlEncode(Item.Value, Encoding.UTF8) + "&"; } byte[] sentData = Encoding.UTF8.GetBytes(STDIN); foreach (RoleInstance roleInst in RoleEnvironment.Roles["WebRole1"].Instances) { // WebRequest    WebRequest reqPOST = WebRequest.Create( "http://" + roleInst.InstanceEndpoints["InterEndpoint"].IPEndpoint + "/Communicator.ashx"); reqPOST.Method = "POST"; reqPOST.Timeout = 120000; reqPOST.ContentType = "application/x-www-form-urlencoded"; //  reqPOST.ContentLength = sentData.Length; System.IO.Stream sendStream = reqPOST.GetRequestStream(); sendStream.Write(sentData, 0, sentData.Length); sendStream.Close(); System.Net.WebResponse resp = reqPOST.GetResponse(); System.IO.Stream stream = resp.GetResponseStream(); System.IO.StreamReader sr = new StreamReader(stream); string s = sr.ReadToEnd(); } 

This example shows how to send requests at once to all instances of the “WebRole1” role. In Dictionary Params, you can add additional parameters that will be involved in processing the task.

If you want to send a task to a specific instance, or to a random one, then simply select an instance from RoleEnvironment.Roles [“WebRole1”]. Instances according to the rule we need and send the request only to it.

In this case, in Communicator.ashx we need only to process this POST request and execute the necessary commands:
 //    if (context.Request.Form["key"] != "key_value") return; //  

The most common scenario in our service is changing the instance cache.

The situation is somewhat more complicated with sending a message to the service instance (Worker Role). The standard interaction script with it is the use of queues (Queue). But this does not mean that you can not interact with instances of the service via HTTP. To do this, it is also necessary, as for the application instance, to add an Endpoint with the Internal type and specify the Private port for it. And when you start the service instance, you need to add HTTPListener, which will process incoming POST requests.
 private HttpListener listener = null; private AutoResetEvent connectionWaitHandle = new AutoResetEvent(false); public override void Run() { //      HttpListener Thread HttpListenerThread = null; while (true) { if (HttpListenerThread == null) { HttpListenerThread = new Thread(new ThreadStart(HttpListenerHandler)); HttpListenerThread.IsBackground = true; HttpListenerThread.Start(); } Thread.Sleep(1000); } } protected void HttpListenerHandler() { //   HttpListener     ServiceEndpoint if (listener == null) { listener = new HttpListener(); var HostEndpoint = RoleEnvironment.CurrentRoleInstance.InstanceEndpoints["ServiceEndpoint"].IPEndpoint.ToString(); listener.Prefixes.Add(string.Format("http://{0}/", HostEndpoint)); listener.Start(); } while (true) { IAsyncResult result = listener.BeginGetContext(HandleAsyncConnection, listener); connectionWaitHandle.WaitOne(); } } private void HandleAsyncConnection(IAsyncResult result) { // POST- HttpListener listener = (HttpListener)result.AsyncState; HttpListenerContext context = listener.EndGetContext(result); connectionWaitHandle.Set(); var Request = context.Request; var Response = context.Response; if (Request.HttpMethod == "POST") { Stream BodyStream = context.Request.InputStream; var encoding = context.Request.ContentEncoding; var reader = new StreamReader(BodyStream, encoding); var PostParams = HttpUtility.ParseQueryString(reader.ReadToEnd(), encoding); if (PostParams["key"] != "key_value") return; //  } Response.OutputStream.Close(); } 

Here is the code for the class WorkerRole.cs, or rather the part that has undergone changes. It should be noted that to process requests we need the System.Web namespace. You also need to add the appropriate Reference in the WorkerRole.

After all this, we got a working mechanism for accessing both instances within a single role, and instances between roles. There are many scenarios for the use of such an interaction. I will tell about some of them in the following articles from the cycle - “Caching data on an instance and managing caching” and “Effective management of processing by cloud queues (Queue)”.

The mechanism described in the article with some modifications will be available as part of the OpenSource CPlase library for Windows Azure, which will soon be made publicly available.

Read in the series:

Source: https://habr.com/ru/post/140461/


All Articles