📜 ⬆️ ⬇️

UDP and C # Reactive Extensions

Recently I read a post about UDP and C # async / await , in which the description of solving a simple task of polling devices over UDP by one client. Solving a problem with async \ await really reduces the amount of code compared to the manual implementation of asynchronous calls. On the other hand, it creates many problems with the synchronization of tasks, competitive access to data and exception handling. The resulting solution is very error prone. The original version of the author contained non-release resource errors.

Can it be made easier and more reliable?


What is the actual problem?


The problem is in the UdpClient.Receive (- Async ) method. This method is not reenterable, that is, if the client is already waiting for the arrival of the datagram, then you cannot call this method again. Even if the error does not come out, then it is quite possible to get the datagram expected by another “stream”. Therefore, you need to write additional code that synchronizes user actions and the state of the UdpClient .
')
async \ await and Tasks Parallel Library does not have ready-made synchronization tools. It is necessary either to write code with your hands, as in the original article, or to use ready-made libraries, like TPL Dataflow . But, alas, Dataflow is very heavy.

Reactive Extensions


Instead of TPL Dataflow, you can use Reactive Extensions (RX). RX describes asynchronous data streams (asynchronous sequences). RX has many functions for creating and manipulating data flows. RX allows you to work not only with IO, but also with “event streams” generated by interface elements. This allows the entire program to be described as a set of data streams.

Code example
To solve the original problem, you will need to add the Rx-Main library from NuGet to the project and write several helpers:
 public static IObservable<UdpReceiveResult> ReceiveObservable(this UdpClient client) { return client.ReceiveAsync().ToObservable(); } public static IObservable<int> SendObservable(this UdpClient client, byte[] msg, int bytes, string ip, int port) { return client.SendAsync(msg, bytes, ip, port).ToObservable(); } public static IObservable<UdpReceiveResult> ReceiveStream(this UdpClient client) { return Observable.Defer(() => client.ReceiveObservable()).Repeat(); } 

The first two helpers transform Task into IObservable (one-element asynchronous sequence) using an extension method.
The last helper shows an example of sequence manipulation.
Observable.Defer - defers a call to the sequence constructor in a parameter before the subscriber appears.
The extension method .Repeat() repeats the infinite source sequence.
Together, the two methods create an endless loop of getting datagrams from a socket.

Now the method of sending and receiving data:
 public IObservable<byte[]> SendReceiveUdpAsync(byte[] msg, string ip, int port, int timeOut) { var o = from _ in this.client.SendObservable(msg, msg.Length, ip, port) from r in receiveStream where r.RemoteEndPoint.Address.ToString() == ip && r.RemoteEndPoint.Port == port select r.Buffer; return o.Take(1).Timeout(TimeSpan.FromMilliseconds(timeOut)); } 

Yes, RX supports Linq for asynchronous sequences.
This Linq expression is quite difficult to understand without knowing RX, but its essence is very simple: after receiving the result from the SendObservable stream SendObservable subscribe to the receiveStream stream and get only those elements that satisfy the predicate in where to return the buffer from the received datagram. Then one result of the resulting sequence is taken and a timeout is hung.

The most important part of the code is the receiveStream definition:
 receiveStream = client.ReceiveStream().Publish().RefCount(); 


Hot, cold and warm sequences
When you work with RX sequences, it is important to know their “temperature”.

Cold sequences are those that appear when a subscriber appears in a sequence and disappear when the subscriber ceases to exist.
The ReceiveStream extension ReceiveStream returns just such a sequence. This means that each subscriber will have his own sequence, that is, there will be several calls to UdpClient.ReceiveAsync and the problem described at the beginning cannot be solved.

Hot sequences - which exist independently of the subscribers. For example, the sequence of mouse movements of the user. The Publish function in the code above allows you to turn a cold sequence into a hot one. But it carries another problem. If the UdpClient constructor UdpClient not specify the port and call Receive before the Send call, an error will occur.

Therefore, we need an intermediate option - the sequence should be common for all subscribers and should exist as long as there is at least one subscriber. This sequence is called “warm” and is created by calling RefCount .

Subscribe to events
For testing, I also wrote a “server” function:
 public IDisposable Listen(Func<UdpReceiveResult, byte[]> process) { return receiveStream.Subscribe(async r => { var msg = process(r); await client.SendObservable(msg, msg.Length, r.RemoteEndPoint); }); } 

The Subscribe method allows you to specify the action that will be called on each element of the asynchronous sequence. You can also specify an action for the end of the sequence and for the exception.

Also note that RX supports async \ await, that is, you do not need to know RX to use code built on the basis of asynchronous sequences.

Conclusion


The resulting code does not contain a single loop, a single explicit synchronization, or a single thread or task creation. At the same time, the code is completely asynchronous and secure.
RX is definitely worth exploring, even if you won't use it. The main part of Rx was invented by applying the principle of duality of monads to standard interfaces IEnumerable and IEnumerator, so the RX was compact and powerful. In addition, RX is also available for JavaScript, C ++, Java, Scala and Python, Ruby.

The source code together with the client and the server was posted on github - github.com/gandjustas/UdpRxSample .

Source: https://habr.com/ru/post/238445/


All Articles