📜 ⬆️ ⬇️

Convenient binary data source instead of Stream

How do your .NET components normally get binary data? If we discard the primitive case when all data is already in a byte array, then I am sure that in the form of System.IO.Stream . In general, it allows actually only one operation - to read the specified number of bytes into the specified byte array (buffer). When reading with this operation, there are two types of difficulties and one inefficient use of resources.

Problem number one: if the data of the same source is needed in several components, then after one component counted some data from the Stream , then it “consumed” it, and the other components could not be reached. Problem number two: we need the data in the form of some blocks, and as a result of reading the block can only be partially in the buffer (only three bytes of a 32-bit number, only half of the letters of a word, etc.). Irrational use of resources arises from the fact that each component reading the data must create its own buffer for reading. Next, I propose an easy-to-use solution to these difficulties, which will allow you to clean up your code, get high reading performance and get versatile components.

First, consider how we can solve these difficulties without refusing to use Stream .

The problem of incomplete availability of blocks of information is fundamentally solvable, but at the cost of other problems. The first solution is to read the data one byte until we get the right amount, which is fraught with a critical drop in performance. The second solution is to check before each access to the buffer whether we have reached its limits and, if we have reached, then repeat the reading. Checks, reading the Stream and the subsequent correction of the index in the buffer before each call strongly litter the code and provoke numerous errors. This can be carried out in a separate method, clearing the sense code, but each component will still have to be separately supplied with this method.
')
The problem of joint reading of one Stream by several components is unsolvable in principle, especially in conditions when reading for high performance occurs at once in large chunks. All you can think of is to feed the Stream to the input of only one component, and transfer the rest of the data in other, non-standard ways. This approach does not allow creating universal components and kills any composition or replacement of them in the bud.

Judging by the analysis of some projects, a significant part of the code is devoted to bypassing these two difficulties. Moreover, the semantic load of projects is sometimes lost in the wilds of the code that provides reading of the data source.

Clarification required by the analysis of comments on the article. My proposed next solution does not change the data source usage model. Other models, when not the consumer requests the data, but the source pushes them in the form of notifications, are not relevant to the topic of the article, and can be implemented and used regardless of my decision or in conjunction with it.

For a fundamental solution of these difficulties, it is necessary to take a fundamental decision: to abandon the Stream , replacing it with an analog (let's call it BufferedSource), which will be provided with a buffer that is accessible to all components.
In our BufferedSource, the read method is replaced by two: the actual filling of the buffer and the omission (consumption, deletion) of the used data from the buffer. And, of course, we add a method that makes sure that the source buffer contains at least the specified amount of data and, if necessary, reads them into the buffer.

public interface IBufferedSource { //      ,       byte[] Buffer { get; } int Offset { get; } int Count { get; } //      ,       . void EnsureBuffer (int size); //       void SkipBuffer (int size); } 


Immediately after I started using such an interface in practice, there was a need to add more properties and methods to it. For sources that supply data in portions of a certain size (for example, cryptographic transformations), there is a need for a method of reading the maximum possible amount of data into the buffer, not knowing which portions the source can provide them. To reduce the number of method calls unnecessarily (especially true for asynchronous methods), it was necessary to add a property indicating that the source counted all the available data in the buffer and could not add anything else. To optimize reading in the case when a large amount of data is not used, but simply skipped, a skip method was needed, which transmits not only buffer data, but also further data. This skip allows you to save on data processing by the source when reading, as well as use the fast positioning feature that sources like file or memory have.

After the implementation of many sources of different types and consumers for them, I came to this kind of interface, which contains only the necessary that can not be implemented outside the source. All additional requirements as they arise are met in the extension methods.

 public interface IBufferedSource { //      ,       byte[] Buffer { get; } int Offset { get; } int Count { get; } //   . bool IsExhausted { get; } //    ,     .      . int FillBuffer (); //        .       . void EnsureBuffer (int size); //         . void SkipBuffer (int size); //      ,     .     . long TrySkip (long size); } 


The EnsureBuffer () and SkipBuffer () methods are critical in speed, because the data consumer will call them frequently. In the overwhelming majority of calls, they should only do primitive arithmetic with the index and size in the buffer. Therefore, we do not combine them with FillBuffer () and TrySkip (), which, on the contrary, in most cases will initiate a resource-intensive reading of new data.

Some necessary principles of operation of a source implementing IBufferedSource cannot be described in terms of an interface, therefore I describe them in XML comments and contracts. The most important principle - the property Buffer is unchanged the entire lifetime of the source. According to the second principle, the IsExhausted property changes its value from False to True only once during the lifetime of the source, and the value of True cannot change. The third principle allows the FillBuffer () method to return zero only when IsExhausted = True and Count = 0. An additional wish that corresponds to the original goals of saving on duplicate buffers: data sources do not create any buffers, but accept ready-made ones in the designer.

Considering all the requirements, we turn to specific implementations. Initially, we replace the Stream , so the first implementation was a very simple ArrayBufferedSource , which is an analogue of the MemoryStream . The following interface implementations are BufferedSource-source, which receives data from the Stream ( StreamBufferedSource ) and vice versa: Stream , which receives data from the BufferedSource-source ( BufferedSourceStream ). In the continuation of the replacement theme, Stream created the CryptoTransformingBufferedSource , which is an analogue of the CryptoStream , that is, takes data from another source and applies the specified crypto transformation to them. Then I created the source-translator ObservableBufferedSource , which simply duplicates another source, but at each data consumption sends notifications to the IProgress recipient, which allows you to create an indication of consumption processes. A more complex translator has become SizeLimitedBufferedSource , which limits the other source to the specified size. For more convenient migration from Stream, I added the following extensions: Read (byte_array), CopyTo (stream_for_array), ReadAllBytes (), and ReadAllText (encoding) for IBufferedSource. All this is laid out in an open repository on github .

How to create an iBufferedSource source consumer? Here, for example, parsing of an AVI video clip, which is stored in a file in the encoding "base-64".

 static void Main (string[] args) { using (var stream = new System.IO.FileStream (@"c:\test.avi.base64", FileMode.Open, FileAccess.Read)) { var fileSource = new StreamBufferedSource (stream, new byte[1024]); var transform = new System.Security.Cryptography.FromBase64Transform (); var aviSource = new CryptoTransformingBufferedSource (fileSource, transform, new byte[1024]); ParseAvi (aviSource); } } void ParseAvi (IBufferedSource source) { do { source.EnsureBuffer (8); var id = System.Text.Encoding.ASCII.GetString (source.Buffer, source.Offset, 4); var size = (long)BitConverter.ToUInt32 (source.Buffer, source.Offset + 4); source.SkipBuffer (8); if (id == "avih") { var chunkSource = new SizeLimitedBufferedSource (source, size); ParseAvihChunk (chunkSource); } source.TrySkip (size); } while (!source.IsEmpty ()); } void ParseAvihChunk (IBufferedSource source) { source.EnsureBuffer (56); var microSecPerFrame = BitConverter.ToUInt32 (source.Buffer, source.Offset); var flags = BitConverter.ToUInt32 (source.Buffer, source.Offset + 12); var totalFrames = BitConverter.ToUInt32 (source.Buffer, source.Offset + 16); var width = BitConverter.ToUInt32 (source.Buffer, source.Offset + 32); var height = BitConverter.ToUInt32 (source.Buffer, source.Offset + 36); } 


Without stopping at what has been accomplished, I propose the further development of IBufferedSource for sources, which are a collection of blocks of information. For example, AVI or MKV files are separate chunks, each of which is conveniently considered as a separate data source. Implementing the IPartitionedBufferedSource interface will make it possible to view a single source as a source of one portion with the possibility of transition to the next.

 public interface IPartitionedBufferedSource : IBufferedSource { //     ,   ,      . bool TrySkipPart (); } 


I draw your attention to the fact that in IPartitionedBufferedSource the semantics of the IsExhausted inherited property change. Now it means the exhaustion of one part / portion / block, which does not exclude the transition to the next.

Immediately present two implementations of IPartitionedBufferedSource. TemplateSeparatedBufferedSource allows you to read the source as separate parts, which are separated from each other by a fixed separator. For example, the individual fields of the HTTP header are separated from each other by two bytes 13 and 10. The base abstract class EvaluatorPartitionedBufferedSourceBase allows you to define your function by inheriting it, which arbitrarily finds the boundary between the parts. For example, I used it to read individual parts in a composite mail MIME message. Both of the mentioned implementations can be found in the same repository on github .

Now let's try using all the created classes to create a parse of a rather complex structure: an e-mail message. The message contains a collection of headers and a collection of parts, each of which begins with a collection of its own headers. Please note that to implement parsing of different entities, you get completely independent methods that receive an IBufferedSource-source as input.

 static void Main (string[] args) { using (var fs = new System.IO.FileStream (@"c:\message.eml", FileMode.Open, FileAccess.Read)) { var fileSource = new StreamBufferedSource (fs, new byte[1024]); ParseMultipartMessage (fileSource); } } void ParseMultipartMessage (IBufferedSource source) { var headerSource = new TemplateSeparatedBufferedSource (source, new byte[] { 0x0d, 0x0a, 0x0d, 0x0a }); var fieldSource = new HeaderFieldSource (headerSource); do { var field = ParseField (fieldSource); } while (fieldSource.TrySkipPart ()); headerSource.TrySkipPart (); var bodyPartsSource = new BodyPartSource ("boundary--", source); while (bodyPartsSource.TrySkipPart ()) { var entity = ParseEntity (bodyPartsSource); } } void ParseEntity (IBufferedSource source) { var headerSource = new TemplateSeparatedBufferedSource (source, new byte[] { 0x0d, 0x0a, 0x0d, 0x0a }); var fieldSource = new HeaderFieldSource (headerSource); do { var field = ParseField (fieldSource); } while (fieldSource.TrySkipPart ()); headerSource.TrySkipPart (); var body = source.ReadAllBytes (); } 

code explanations
The HeaderFieldSource and BodyPartSource classes are descendants of EvaluatorPartitionedBufferedSourceBase because it is impossible to use a TemplateSeparatedBufferedSource due to the ambiguity of the separator of individual parts. In particular, for a header, the delimiters of individual fields are not any newline, but only the one followed by non-blank characters. For individual parts of the message, the part of the separator (specified in the header bounday) may or may not be followed by line feeds.


The next step is to add asynchronous methods to our BufferedSource, but this topic is not simple, so I will leave it for the next articles.

Summing up, let me express satisfaction with the result. The created BufferedSource successfully replaces and solves all the problems of working with Stream . In this case, the alteration of existing components using Stream can be carried out gradually, using similar methods BufferedSource and adapter classes.

Later, the project was reconfigured as a Portable Class Library Profile328 (.NET Framework 4, Silverlight 5, Windows 8, Windows Phone 8.1, Windows Phone Silverlight 8), tests were added and a nuget package was created .

Source: https://habr.com/ru/post/231275/


All Articles