Recently I read a post about
UDP and C # async / await , in which the description of solving a simple task of polling devices over UDP by one client. Solving a problem with async \ await really reduces the amount of code compared to the manual implementation of asynchronous calls. On the other hand, it creates many problems with the synchronization of tasks, competitive access to data and exception handling. The resulting solution is very error prone. The original version of the author contained non-release resource errors.
Can it be made easier and more reliable?
What is the actual problem?
The problem is in the
UdpClient.Receive
(-
Async
) method. This method is not reenterable, that is, if the client is already waiting for the arrival of the datagram, then you cannot call this method again. Even if the error does not come out, then it is quite possible to get the datagram expected by another “stream”. Therefore, you need to write additional code that synchronizes user actions and the state of the
UdpClient
.
')
async \ await and Tasks Parallel Library does not have ready-made synchronization tools. It is necessary either to write code with your hands, as in the original article, or to use ready-made libraries, like
TPL Dataflow . But, alas, Dataflow is very heavy.
Reactive Extensions
Instead of TPL Dataflow, you can use Reactive Extensions (RX). RX describes asynchronous data streams (asynchronous sequences). RX has many functions for creating and manipulating data flows. RX allows you to work not only with IO, but also with “event streams” generated by interface elements. This allows the entire program to be described as a set of data streams.
Code example
To solve the original problem, you will need to add the
Rx-Main
library from NuGet to the project and write several helpers:
public static IObservable<UdpReceiveResult> ReceiveObservable(this UdpClient client) { return client.ReceiveAsync().ToObservable(); } public static IObservable<int> SendObservable(this UdpClient client, byte[] msg, int bytes, string ip, int port) { return client.SendAsync(msg, bytes, ip, port).ToObservable(); } public static IObservable<UdpReceiveResult> ReceiveStream(this UdpClient client) { return Observable.Defer(() => client.ReceiveObservable()).Repeat(); }
The first two helpers transform Task into IObservable (one-element asynchronous sequence) using an extension method.
The last helper shows an example of sequence manipulation.
Observable.Defer
- defers a call to the sequence constructor in a parameter before the subscriber appears.
The extension method
.Repeat()
repeats the infinite source sequence.
Together, the two methods create an endless loop of getting datagrams from a socket.
Now the method of sending and receiving data:
public IObservable<byte[]> SendReceiveUdpAsync(byte[] msg, string ip, int port, int timeOut) { var o = from _ in this.client.SendObservable(msg, msg.Length, ip, port) from r in receiveStream where r.RemoteEndPoint.Address.ToString() == ip && r.RemoteEndPoint.Port == port select r.Buffer; return o.Take(1).Timeout(TimeSpan.FromMilliseconds(timeOut)); }
Yes, RX supports Linq for asynchronous sequences.
This Linq expression is quite difficult to understand without knowing RX, but its essence is very simple: after receiving the result from the
SendObservable
stream
SendObservable
subscribe to the
receiveStream
stream and get only those elements that satisfy the predicate in
where to return the buffer from the received datagram. Then one result of the resulting sequence is taken and a timeout is hung.
The most important part of the code is the
receiveStream
definition:
receiveStream = client.ReceiveStream().Publish().RefCount();
Hot, cold and warm sequences
When you work with RX sequences, it is important to know their “temperature”.
Cold sequences are those that appear when a subscriber appears in a sequence and disappear when the subscriber ceases to exist.
The
ReceiveStream
extension
ReceiveStream
returns just such a sequence. This means that each subscriber will have his own sequence, that is, there will be several calls to
UdpClient.ReceiveAsync
and the problem described at the beginning cannot be solved.
Hot sequences - which exist independently of the subscribers. For example, the sequence of mouse movements of the user. The
Publish
function in the code above allows you to turn a cold sequence into a hot one. But it carries another problem. If the
UdpClient
constructor
UdpClient
not specify the port and call
Receive
before the
Send
call, an error will occur.
Therefore, we need an intermediate option - the sequence should be common for all subscribers and should exist as long as there is at least one subscriber. This sequence is called “warm” and is created by calling
RefCount
.
Subscribe to events
For testing, I also wrote a “server” function:
public IDisposable Listen(Func<UdpReceiveResult, byte[]> process) { return receiveStream.Subscribe(async r => { var msg = process(r); await client.SendObservable(msg, msg.Length, r.RemoteEndPoint); }); }
The Subscribe method allows you to specify the action that will be called on each element of the asynchronous sequence. You can also specify an action for the end of the sequence and for the exception.
Also note that RX supports async \ await, that is, you do not need to know RX to use code built on the basis of asynchronous sequences.
Conclusion
The resulting code does not contain a single loop, a single explicit synchronization, or a single thread or task creation. At the same time, the code is completely asynchronous and secure.
RX is definitely worth exploring, even if you won't use it. The main part of Rx was invented by applying the principle of duality of monads to standard interfaces IEnumerable and IEnumerator, so the RX was compact and powerful. In addition, RX is also available for JavaScript, C ++, Java, Scala and Python, Ruby.
The source code together with the client and the server was posted on github -
github.com/gandjustas/UdpRxSample .