📜 ⬆️ ⬇️

Using TPL Dataflow for multi-threaded file compression

Using a small example, I will tell how using the TPL Dataflow library can solve the rather non-trivial task of multi-thread file compression for 15 minutes.

Task


You need to implement efficient file compression using the GZipStream class, GZipStream is in the System.IO.Compression namespace. It is assumed that we will compress the files large, and they can not fit entirely in memory.

TPL Dataflow


TPL Dataflow (TDF) is built on top of the TPL (The Task Parallel Library) library included in the .NET 4 and supplements it with a set of primitives to solve more complex tasks than the original library. TPL Dataflow uses tasks, stream-safe collections and other features introduced in .NET 4 to add support for parallel processing of data streams. The essence of the library is to ensure that by joining different blocks, organize various data processing circuits. In this case, data processing can occur both synchronously and asynchronously. The library will be included in the upcoming .NET 4.5.

Decision


To solve this problem, you will need only 3 blocks:
  1. Buffer for data read from data source:
     var buffer = new BufferBlock<byte[]>(); 

  2. Data compression block:
     var compressor = new TransformBlock<byte[], byte[]>(bytes => Compress(bytes)); 

    Compression function:
     private static byte[] Compress(byte[] bytes) { using (var resultStream = new MemoryStream()) { using (var zipStream = new GZipStream(resultStream, CompressionMode.Compress)) { using (var writer = new BinaryWriter(zipStream)) { writer.Write(bytes); return resultStream.ToArray(); } } } } 

  3. Compressed data recording block:
     var writer = new ActionBlock<byte[]>(bytes => outputStream.Write(bytes, 0, bytes.Length)); 


Connect our blocks:
 buffer.LinkTo(compressor); compressor.LinkTo(writer); 

We will also inform our blocks when the data for them has run out and they can complete their work. This can be done by calling the block's Complete method:
 buffer.Completion.ContinueWith(task => compressor.Complete()); compressor.Completion.ContinueWith(task => writer.Complete()); 

As the file is read, we will offer the data to our buffer. This is done by calling the Post method of the block:
 while (!buffer.Post(bytes)) { } 

We need such a construction in order to take into account the situation when the unit is full and does not accept more data.
')
Upon completion of the reading, we will notify our block that we have run out of data:
 buffer.Complete(); 

Now we just have to wait until the end of our block, which is responsible for writing compressed data to the stream:
 writer.Completion.Wait(); 


The resulting method:
 public static void Compress(Stream inputStream, Stream outputStream) { var buffer = new BufferBlock<byte[]>(); var compressor = new TransformBlock<byte[], byte[]>(bytes => Compress(bytes)); var writer = new ActionBlock<byte[]>(bytes => outputStream.Write(bytes, 0, bytes.Length)); buffer.LinkTo(compressor); buffer.Completion.ContinueWith(task => compressor.Complete()); compressor.LinkTo(writer); compressor.Completion.ContinueWith(task => writer.Complete()); var readBuffer = new byte[BufferSize]; while (true) { int readCount = inputStream.Read(readBuffer, 0, BufferSize); if (readCount > 0) { var bytes = new byte[readCount]; Buffer.BlockCopy(readBuffer, 0, bytes, 0, readCount); while (!buffer.Post(bytes)) { } } if (readCount != BufferSize) { buffer.Complete(); break; } } writer.Completion.Wait(); } 

It would be possible to finish this if not for one “but”: this code does not differ in its speed from the absolutely synchronous one. In order to make it work faster, we need to indicate that our compression operation needs to be done asynchronously. You can do this by adding the necessary settings to our block:
 var compressorOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }; var compressor = new TransformBlock<byte[], byte[]>(bytes => Compress(bytes), compressorOptions); 

We also need to consider the situation when data is read faster than it is compressed or written more slowly than it is compressed. You can do this by changing the BoundedCapacity property of our blocks:
 var buffer = new BufferBlock<byte[]>(new DataflowBlockOptions { BoundedCapacity = 100 }); var compressorOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, BoundedCapacity = 100 }; var compressor = new TransformBlock<byte[], byte[]>(bytes => Compress(bytes), compressorOptions); var writerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 100, SingleProducerConstrained = true }; var writer = new ActionBlock<byte[]>(bytes => outputStream.Write(bytes, 0, bytes.Length), writerOptions); 

The final method looks like this:
 public static void Compress(Stream inputStream, Stream outputStream) { var buffer = new BufferBlock<byte[]>(new DataflowBlockOptions {BoundedCapacity = 100}); var compressorOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, BoundedCapacity = 100 }; var compressor = new TransformBlock<byte[], byte[]>(bytes => Compress(bytes), compressorOptions); var writerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 100, SingleProducerConstrained = true }; var writer = new ActionBlock<byte[]>(bytes => outputStream.Write(bytes, 0, bytes.Length), writerOptions); buffer.LinkTo(compressor); buffer.Completion.ContinueWith(task => compressor.Complete()); compressor.LinkTo(writer); compressor.Completion.ContinueWith(task => writer.Complete()); var readBuffer = new byte[BufferSize]; while (true) { int readCount = inputStream.Read(readBuffer, 0, BufferSize); if (readCount > 0) { var postData = new byte[readCount]; Buffer.BlockCopy(readBuffer, 0, postData, 0, readCount); while (!buffer.Post(postData)) { } } if (readCount != BufferSize) { buffer.Complete(); break; } } writer.Completion.Wait(); } 

We can call it, for example, from such a console application:
 private const int BufferSize = 16384; static void Main(string[] args) { var stopwatch = Stopwatch.StartNew(); using (var inputStream = File.OpenRead(@"C:\file.bak")) { using (var outputStream = File.Create(@"E:\file.gz")) { Compress(inputStream, outputStream); } } stopwatch.Stop(); Console.WriteLine(); Console.WriteLine(string.Format("Time elapsed: {0}s", stopwatch.Elapsed.TotalSeconds)); Console.ReadKey(); } 


Conclusion


As you can see using TPL Dataflow can seriously simplify the solution of multi-threaded data processing tasks. On the tests I conducted, the time required for compression decreased by almost 3 times.
Download this library and read about it can be on the official page .

Source: https://habr.com/ru/post/138531/


All Articles