📜 ⬆️ ⬇️

Event-oriented HTTP server in C # using Rx and HttpListener

Big enough name? Yes? In this post I will show you an alternative approach to creating a simple event-oriented HTTP server in C #, using the power of Reactive Extensions .

Introduction

I’m not very good at explaining, so I’ll quote a very interesting article from Dan York about the node.js event model:
The “traditional” mode of web servers has always been based on the threading model. When you start Apache or any other web server, it starts accepting connections. When it accepts a connection, it keeps the connection open until it finishes processing the page or another transaction. If reading a page from disk or writing results to a database takes several microseconds, then the web server is blocked for I / O operations. (This is referred to as “blocking I / O”). To scale this type of server, you will need to run additional copies of the server itself (referred to as “thread-based”, because each copy usually requires an additional operating system thread).
In contrast, Node.JS uses an event-oriented model, in which the web server receives requests, quickly puts them in for processing, then is taken as the next request. When the initial request is completed, it returns to the processing queue and when it reaches the end of the queue, the results are returned (or whatever the next action requires). This model is very efficient and scalable, because the web server usually always accepts requests, because does not wait for any read or write operation to complete. (This method is referred to as “non-blocking I / O” or “event-oriented I / O”).

What happens in the .NET world?

A lot of things happen around this in the .NET ecosystem:

Alternative approach

Using the HttpListener class and Reactive Extensions, we can create something like this:
public class HttpServer : IObservable<RequestContext>, IDisposable { private readonly HttpListener listener; private readonly IObservable<RequestContext> stream; public HttpServer(string url) { listener = new HttpListener(); listener.Prefixes.Add(url); listener.Start(); stream = ObservableHttpContext(); } private IObservable<RequestContext> ObservableHttpContext() { return Observable.Create<RequestContext>(obs => Observable.FromAsyncPattern<HttpListenerContext>(listener.BeginGetContext, listener.EndGetContext)() .Select(c => new RequestContext(c.Request, c.Response)) .Subscribe(obs)) .Repeat() .Retry() .Publish() .RefCount(); } public void Dispose() { listener.Stop(); } public IDisposable Subscribe(IObserver<RequestContext> observer) { return stream.Subscribe(observer); } } 

Some notes to this code:

Usage example

You can create any type of web application based on this concept. The application of the “hello world” level will look like this:
 static void Main() { //a stream os messages var subject = new Subject<string>(); using(var server = new HttpServer("http://*:5555/")) { var handler = server.Where(ctx => ctx.Request.Url.EndsWith("/hello")) .Subscribe(ctx => ctx.Respond(new StringResponse("world"))); Console.ReadLine(); handler.Dispose(); } } 

I recommend that everything you do be asynchronous. For example, if you are connecting to a database, this should be an asynchronous operation, and you will need to hold together callbacks / observables / Tasks, etc.

There is an even more interesting application that I would like to share, and which is called long polling :
Long polling is a variation of the traditional polling technique and allows you to emulate sending information from the server to the client. With long polling, the client requests information from the server in the same manner as with a normal request. However, if the server does not have any available information for the client, instead of sending an empty response, the server holds the request and waits for the availability of information.

So, we have the simplest example of long polling, working through the above code:
 class Program { static void Main() { //a stream os messages var subject = new Subject<string>(); using(var server = new HttpServer("http://*:5555/")) { //the listeners stream and subscription var listeners = server .Where(ctx => ctx.Request.HttpMethod == "GET") .Subscribe(ctx => subject.Take(1) //wait the next message to end the request .Subscribe(m => ctx.Respond(new StringResponse(m)))); //the publishing stream and subscrition var publisher = server .Where(ctx => ctx.Request.HttpMethod == "POST") .Subscribe(ctx => ctx.Request.InputStream.ReadBytes(ctx.Request.ContentLength) .Subscribe(bts => { ctx.Respond(new EmptyResponse(201)); subject.OnNext(Encoding.UTF8.GetString(bts)); })); Console.ReadLine(); listeners.Dispose(); publisher.Dispose(); } } } 

As you can see, we are making the observers work ... There is no blocking operation. Even reading from a stream is an asynchronous operation.

Want to see a working code?

Below is a video demonstrating how the code works:
image
And, at the end, the source code is published here under opensource, if you want to delve into it step by step or just study it.
Special thanks to Gustavo Machado, Silvio Massari and the guys from the Nancy framework for the advice and part of the code I stole from them.

')

Source: https://habr.com/ru/post/129861/


All Articles