System.IO.Pipelines is a new library that simplifies code organization in .NET. It is difficult to ensure high performance and accuracy if you have to deal with complex code. The task of System.IO.Pipelines is to simplify the code. More under the cut!

The library came as a result of the efforts of the .NET Core development team, which sought to make Kestrel one of
the fastest web servers in the industry . It was originally conceived as part of the Kestrel implementation, but evolved into a reusable API, available in version 2.1 as the first class BCL API (System.IO.Pipelines).
')
What problems does she solve?
To properly analyze data from a stream or socket, you need to write a large amount of standard code. At the same time, there are many pitfalls that complicate both the code itself and its support.
What difficulties arise today?
Let's start with a simple task. We need to write a TCP server that receives line-delimited messages from the client (\ n).
TCP Server with NetworkStream
REMOVAL: As with any task that requires high performance, each case should be considered based on the characteristics of your application. It is possible that spending resources on the use of various approaches, which will be discussed further, does not make sense if the scale of the network application is not very large.
Normal .NET code before using pipelines looks like this:
async Task ProcessLinesAsync(NetworkStream stream) { var buffer = new byte[1024]; await stream.ReadAsync(buffer, 0, buffer.Length);
see
sample1.cs on
githubProbably, this code will work with local testing, but it has some errors:
- Perhaps after a single call to ReadAsync, the entire message will not be received (until the end of the line).
- It ignores the result of the work of the stream.ReadAsync () method - the amount of data actually transferred to the buffer.
- The code does not handle receiving multiple lines in a single ReadAsync call.
These are the most common errors in reading streaming data. To avoid them, you need to make a number of changes:
- You need to buffer incoming data until a new string is found.
- It is necessary to analyze all the rows returned to the buffer.
async Task ProcessLinesAsync(NetworkStream stream) { var buffer = new byte[1024]; var bytesBuffered = 0; var bytesConsumed = 0; while (true) { var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, buffer.Length - bytesBuffered); if (bytesRead == 0) {
see
sample2.cs on
githubI repeat: this could work with local testing, but sometimes there are lines longer than 1 KB (1024 bytes). It is necessary to increase the size of the input buffer until a new line is found.
In addition, we collect buffers into an array when processing long strings. We can improve this process using ArrayPool, which will avoid re-allocation of buffers during the analysis of long rows coming from the client.
async Task ProcessLinesAsync(NetworkStream stream) { byte[] buffer = ArrayPool<byte>.Shared.Rent(1024); var bytesBuffered = 0; var bytesConsumed = 0; while (true) {
see sample3.cs on githubThe code works, but now the buffer size has changed, as a result, many copies of it appear. More memory is also used, since the logic does not reduce the buffer after processing the lines. To avoid this, you can save a list of buffers, and not to change the buffer size each time when lines are received longer than 1 KB.
In addition, we do not increase the buffer size of 1 KB, until it is completely empty. This means that we will send smaller and smaller buffers to ReadAsync, resulting in an increase in the number of calls to the operating system.
We will try to eliminate this and allocate a new buffer as soon as the size of the existing one is less than 512 bytes:
public class BufferSegment { public byte[] Buffer { get; set; } public int Count { get; set; } public int Remaining => Buffer.Length - Count; } async Task ProcessLinesAsync(NetworkStream stream) { const int minimumBufferSize = 512; var segments = new List<BufferSegment>(); var bytesConsumed = 0; var bytesConsumedBufferIndex = 0; var segment = new BufferSegment { Buffer = ArrayPool<byte>.Shared.Rent(1024) }; segments.Add(segment); while (true) {
see sample4.cs on githubAs a result, the code is significantly complicated. During the search for a delimiter, we track the filled buffers. To do this, use the List, which displays the buffered data when searching for a new row delimiter. As a result, ProcessLine and IndexOf will take a List instead of byte [], offset and count. The parsing logic will start processing one or more segments of the buffer.
And now the server will process partial messages and use the combined memory to reduce the total memory consumption. However, a number of changes need to be made:
- From ArrayPoolbyte, we use only Byte [] - the standard managed arrays. In other words, when executing the ReadAsync or WriteAsync function, the validity period of the buffers is tied to the time the asynchronous operation is performed (to interact with the operating system's own I / O API). Since pinned memory cannot move, this affects the performance of the garbage collector and can cause array fragmentation. It may be necessary to change the pool implementation, depending on how long the asynchronous operation waits for execution.
- Throughput can be improved by breaking the link between the logic of reading and processing. We get the effect of batch processing, and now the parsing logic will be able to read large amounts of data, processing large blocks of buffers, rather than analyzing individual lines. As a result, the code becomes even more complicated:
- It is necessary to create two cycles, working independently of each other. The first will read data from the socket, and the second will analyze the buffers.
- We need a way to tell the parsing logic that the data is available.
- You also need to determine what happens if the loop reads data from the socket too quickly. We need a way to regulate the read cycle if the syntax analysis logic does not keep up with it. This is commonly referred to as "flow control" or "flow resistance."
- We need to make sure that the data is transferred safely. Now the set of buffers is used both by the read cycle and the syntax analysis cycle, they work independently from each other on different threads.
- The memory management logic is also involved in two different code fragments: borrowing data from the buffer pool, which reads data from the socket, and returning from the buffer pool, which is the logic of the syntax analysis.
- You need to be extremely careful with returning buffers after executing the parsing logic. Otherwise, there is a possibility that we will return the buffer to which the socket reading logic is still being written.
The difficulty begins to roll off (and this is not all cases!). To create a high-performance network, you need to write very complex code.
The goal of System.IO.Pipelines is to simplify this procedure.
TCP Server and System.IO.Pipelines
Let's see how System.IO.Pipelines works:
async Task ProcessLinesAsync(Socket socket) { var pipe = new Pipe(); Task writing = FillPipeAsync(socket, pipe.Writer); Task reading = ReadPipeAsync(pipe.Reader); return Task.WhenAll(reading, writing); } async Task FillPipeAsync(Socket socket, PipeWriter writer) { const int minimumBufferSize = 512; while (true) {
see sample5.cs on githubIn the pipeline version of our row reader there are two cycles:
- FillPipeAsync reads from the socket and writes to PipeWriter.
- ReadPipeAsync reads from PipeReader and analyzes incoming lines.
Unlike the first examples, there are no specially assigned buffers. This is one of the main features of System.IO.Pipelines. All buffer management tasks are passed to PipeReader / PipeWriter implementations.
The procedure is simplified: we use code only for business logic, instead of implementing complex buffer management.
In the first loop, PipeWriter.GetMemory (int) is first called to get a certain amount of memory from the main recorder. It then calls PipeWriter.Advance (int), which tells PipeWriter how much data is actually written to the buffer. This is followed by a call to PipeWriter.FlushAsync () so that PipeReader can access the data.
The second loop consumes buffers that were recorded by PipeWriter, but initially came from a socket. When a request is returned to PipeReader.ReadAsync (), we get a ReadResult containing two important messages: the data read in the form ReadOnlySequence, and the logical data type IsCompleted, which tells the reader whether the recorder has completed the job (EOF). When the end-of-line (EOL) separator is found and the string is analyzed, we divide the buffer into parts to skip the already processed fragment. After that, PipeReader.AdvanceTo is called, and it tells PipeReader how much data has been consumed.
At the end of each cycle, both the reader and the recorder are completed. As a result, the main channel frees all allocated memory.
System.IO.Pipelines
Partial reading
In addition to memory management, System.IO.Pipelines performs another important function: it looks at the data in the channel, but does not consume it.
PipeReader has two main APIs: ReadAsync and AdvanceTo. ReadAsync receives data from the channel, AdvanceTo informs PipeReader that these buffers are no longer required by the reader, so you can get rid of them (for example, return to the main buffer pool).
Below is an example of an HTTP analyzer that reads data from channel partial data buffers until it receives a suitable starting line.

ReadOnlySequenceT
The channel implementation stores a list of associated buffers transferred between PipeWriter and PipeReader. PipeReader.ReadAsync reveals ReadOnlySequence, which is a new BCL type and consists of one or more ReadOnlyMemory <T> segments. It is similar to Span or Memory, which gives us the opportunity to look at arrays and strings.

Inside the channel there are pointers that show where the reader and the recorder are located in the general set of selected data, and also update them as they are written and read the data. SequencePosition is a single point in the linked list of buffers and is used to efficiently separate ReadOnlySequence <T>.
Since ReadOnlySequence <T> supports one segment and more, the standard operation of high-performance logic is the separation of fast and slow paths based on the number of segments.
As an example, we present a function that converts ASCII ReadOnlySequence to a string:
string GetAsciiString(ReadOnlySequence<byte> buffer) { if (buffer.IsSingleSegment) { return Encoding.ASCII.GetString(buffer.First.Span); } return string.Create((int)buffer.Length, buffer, (span, sequence) => { foreach (var segment in sequence) { Encoding.ASCII.GetChars(segment.Span, span); span = span.Slice(segment.Length); } }); }
see
sample6.cs on
githubFlow Resistance and Flow Control
Ideally, reading and analysis work together: the reading stream consumes data from the network and puts it into buffers, while the analysis stream creates the appropriate data structures. Analysis usually takes longer than just copying data blocks from the network. As a result, the reading stream can easily overload the analysis stream. Therefore, the reading stream will have to either slow down or consume more memory in order to save data for the analysis stream. To ensure optimal performance, a balance is needed between the pause frequency and the allocation of large amounts of memory.
To solve this problem, the pipeline has two flow control functions: PauseWriterThreshold and ResumeWriterThreshold. PauseWriterThreshold determines how much data needs to be buffered before pausing PipeWriter.FlushAsync. ResumeWriterThreshold determines how much memory the reader may consume before resuming the operation of the recorder.

PipeWriter.FlushAsync is “blocked” when the amount of data in the pipeline stream exceeds the limit set in PauseWriterThreshold and is “unlocked” when it falls below the limit set in ResumeWriterThreshold. To prevent the consumption limit being exceeded, only two values ​​are used.
I / O scheduling
When using async / await, subsequent operations are usually invoked either in pool threads or in the current SynchronizationContext.
When implementing I / O, it is very important to carefully control where it is executed in order to use the processor cache more efficiently. This is critical for high performance applications such as web servers. System.IO.Pipelines uses PipeScheduler to determine where to place asynchronous callbacks. This allows you to very precisely control which threads to use for I / O.
A practical example is the Kestrel Libuv transport, in which I / O callbacks are executed through dedicated channels of the event loop.
There are other benefits of the PipeReader pattern.
- Some base systems support “standby without buffering”: the buffer does not need to be allocated until the available data appears in the base system. So, in Linux with epoll, you can not provide a buffer for reading until the data is prepared. This avoids a situation where there are many threads waiting for data, and you need to immediately reserve a huge amount of memory.
- The default pipeline simplifies the recording of network code unit tests: the parsing logic is separate from the network code, and the unit tests run this logic only in buffers in memory, and do not consume it directly from the network. It also makes it easy to test complex patterns with partial data sent. ASP.NET Core uses it to test various aspects of the Kestrel http parser.
- Systems that allow custom code to use the main OS buffers (for example, the registered Windows I / O API) are initially suitable for using pipelines, because the PipeReader implementation always provides buffers.
Other related types
We also added a number of new simple BCL types to System.IO.Pipelines:
- MemoryPoolT , IMemoryOwnerT , MemoryManagerT . ArrayPoolT has been added to .NET Core 1.0, and .NET Core 2.1 now has a more general abstract representation for the pool that works with any MemoryTs. We get an extensibility point that allows for more advanced distribution strategies, as well as control of buffer management (for example, using predefined buffers instead of exclusively managed arrays).
- The iBufferWriterT is a receiver for recording synchronous buffered data (implemented by PipeWriter).
- IValueTaskSource - ValueTaskT has existed since the release of .NET Core 1.1, but in .NET Core 2.1 it has acquired extremely efficient tools that provide uninterrupted asynchronous operations without distribution. For more information, see here .
How to use conveyors?
The APIs are in the
System.IO.Pipelines nuget package.
An example of a .NET Server 2.1 server application that uses pipelines to handle inline messages (from the example above) see
here . It can be run using the dotnet run (or Visual Studio). In the example, data transfer from the socket on port 8087 is expected, then the received messages are written to the console. To connect to port 8087, you can use a client, such as netcat or putty. Send a text message and see how it works.
At the moment, the pipeline is working in Kestrel and SignalR, and we hope that it will find more widespread use in many network libraries and components of the .NET community in the future.