📜 ⬆️ ⬇️

Rust in detail, part 2

Part 2: Sending and receiving messages


In this series of articles, we look at the process of creating a scalable chat server in real time, in full detail. The purpose of the article is to show an example of the practical application of the Rust language against the background of studying the concepts of system programming and system APIs, step by step.

The second part is a direct continuation of the first , so if you missed it (or forgot the context), I recommend that you familiarize yourself with it first. In this part, we continue the implementation of the WebSocket protocol.

14 frames


Now we are ready to start the exchange of real messages. But first of all, we need to understand how the WebSocket protocol encodes the data for transmission. The description can be found in the RFC , which obliges us to pack data into frames , which consist of a header containing meta information (frame type, etc.) and payload , that is, the data we want to send .

The beginning of the frame header consists of 2 bytes (or 16 bits), which look like this:
')

Each diamond in the diagram represents one bit (one or zero).


Note that all this data is packed in only 2 octets (an octet is a byte consisting of 8 bits). For example, take this object serialized in JSON:

 { "fin" : 1, "rsv1" : 0, "rsv2" : 0, "rsv3" : 0, "opcode" : 2, "masked" : 1, "payload_len": 64 } 

All of these values ​​can also be encoded in binary format , as groups of bits:

 { "fin" : b1, "rsv1" : b0, "rsv2" : b0, "rsv3" : b0, "opcode" : b10, "masked" : b1, "payload_len": b1000000 } 

Or just as one big binary number: 1.0.0.0.0010.1.1000000 , or in hexadecimal system: 0x82C0 .
These are different views of the frame header received from clients, and for the convenience of work we need to bring the binary number to a structure with separate fields, dividing it into separate parts.

15 Unpacking Headers


In order to pull out from the number in the binary number system the information we need, we use a technique known as bitmasks . It sounds scary, but in reality it is a rather simple idea.

We simply "select" the bits that we want to "extract" from the number:


Gray squares reflect zeros, and yellow ones are ones.
Thus, the scheme corresponds to the bit mask 00001111b .


Following the same logic, this is the bitmask 00100000b .

We "overlay" the mask with the & operator (bitwise AND , and ) as follows: & .
Here's how it works:


Again, the gray squares correspond to 0 , and the green ones - 1 .
Having two "variables" A = 1b and B = 0b , at the output we get Out = 0b .
The same is true for the values A = 0b and B = 1b .
Out = 1b at the output is obtained only if both incoming numbers are ones. That's the whole logic.

In the case when we have a lot of bits, the operation (AND) is applied sequentially to each of them. Suppose we have the values A = 101b and B = 011b . In this case, the output is Out = 001b , since the only common bit for A and B is the third unit, 1b .


Now, using the same logic, we can apply masks with "selected" bits, examples of which we saw at the beginning, to extract parts of the frame header that we are interested in.

16 byte order


Byte order (endianness [1] ) determines how two bytes or more are grouped in memory. The order can go from younger to older (little-endian), or from older to younger (big-endian).

Let's see what the difference is.

One word can contain numbers up to 0xFFFF .
For example, in the case of the value 0x0001 - in which order should these two bytes be placed in memory?


The simplest analogy that can be drawn is the writing of words on paper. As a rule, in European languages ​​they are written from left to right. In the same way, we also write Arabic numbers — on the left, high order numbers (thousands, hundreds), on the right, lower order numbers (tens, ones). In the binary number system, the higher order bytes - those that make up the bulk of the number, that is, for example, for the number 0x2A42 high byte is 0x2A . Such a record corresponds to the order of big-endian .

At the same time, in some oriental languages ​​(for example, in Hebrew and Arabic), it is customary to write words from right to left. The order of writing to little-endian works in a similar way: the low bytes are always to the left, and in this case we write the binary number 0x2A42 as 0x422A .

Here is how these numbers look in memory:


In general, this is all you need to know about the order of bytes - just remember the example with a two-byte number 0x0001 , which we write in the "little" way in little-endian as 0x0100 .

We need to know all this for one simple reason: over the network, all data comes to us in big-endian, and this is a traditional order for network applications, also called “network byte order”. At the same time, x86 (and x86-64) architecture processors use little-endian order [2] , which means the need to convert numbers from one order to another. Sounds hard? In fact, we can significantly simplify our lives using libraries from the standard Crates repository.

The library of interest to us is called byteorder , and it allows you to work with bytes in a fairly simple way, simply by indicating the order of interest to us before reading or writing bytes. All this we will look at an example a little later.

17 Refactoring the join process


Before we proceed to the implementation of unpacking bytes, let's do a little warm-up - let's reconsider the WebSocketClient structure a bit so that our code does not turn into noodles. Let me remind you that in this structure we process client connections, and the read method in it is called every time new data from the client comes to us.

First, let's move the connection handling code to a separate function, simply renaming fn read to fn read_handshake . Now we can add a new read implementation that, depending on the state of the connected client, will transfer control of the desired implementation:

 fn read(&mut self) { match self.state { ClientState::AwaitingHandshake => { self.read_handshake(); }, _ => {} } } 

Everything is simple here - we make a comparison with the current value of self.state , and process the case when the state is equal to AwaitingHandshake (waiting for the connection to start using the HTTP protocol). For all other possible cases, we use the universal _ pattern, because the Rust compiler requires match cover all possible cases for the safe operation of our program.

And since we have touched upon the topic of the current connection state ( self.state ), let's do another small refactoring, which is available to us thanks to Rust's interesting features. In the first part of the article, we parse the HTTP request headers using a special structure, Parser , which contains the context and status of the HTTP parser. At the same time, we need this structure only once - when we have not yet proceeded to communicate using the WebSocket protocol, so it would be nice to free the memory we use with the parser that we do not need.

We can easily achieve this with a simple trick: we simply move the state of the parser to the enum of the ClientState . Code example:

 enum ClientState { //       enum: AwaitingHandshake(RefCell<Parser<HttpParser>>), … } struct WebSocketClient { //         : headers: Rc<RefCell<HashMap<String, String>>>, … interest: EventSet, … } //    : impl WebSocketClient { fn new(socket: TcpStream) -> WebSocketClient { … WebSocketClient { socket: socket, headers: headers.clone(), interest: EventSet::readable(), state: ClientState::AwaitingHandshake(RefCell::new(Parser::request(HttpParser { current_key: None, headers: headers.clone() }))) } } } 

What does this give us, and how is this even possible? Strictly speaking, enumerations are structures with some peculiarities. More precisely, enumerations in Rust are algebraic data types (ADT), about which you could hear in the context of some functional languages ​​(OCaml, Haskell, and others). ADT allows us to define complex data structures (including recursive ones), using only the type system. This way some of the structures built into the language are implemented - for example, Option and Result , already familiar to us. There is no black magic in these structures, and you can easily implement them yourself if you wish.

Thus, Rust allows us to store the state directly along with type information. In this case, however, the enumeration fields are immutable, so we will use RefCell to encapsulate the variable state of the parser.

Due to the fact that we have transferred ownership of the parser to the enumeration (or rather to the ClientState::AwaitingHandshake type), all memory associated with it will be automatically released when the state transitions to another enum value - for example, to ClientState::Connected . This effect we sought. Now, after we removed the http_parser from the client structure, we need to change the read and read_handshake functions read_handshake :

 fn read(&mut self) { match self.state { ClientState::AwaitingHandshake(_) => { self.read_handshake(); }, _ => {} } } fn read_handshake(&mut self) { let is_upgrade = if let ClientState::AwaitingHandshake(ref parser_state) = self.state { let mut parser = parser_state.borrow_mut(); parser.parse(&buf); parser.is_upgrade() } else { false }; if is_upgrade { //    self.state = ClientState::HandshakeResponse; … } } 

In read_handshake you can see an unfamiliar expression, if let :

 let is_upgrade = if let ClientState::AwaitingHandshake(ref parser_state) = self.state { 

if let is a simplified version of the match pattern match operator. With it, we can only do matching on one sample, which in some cases is more convenient than match , which requires handling all possible cases.

And pay attention to the pattern with which we are doing the mapping, and in particular the ref keyword:

 ClientState::AwaitingHandshake(ref parser_state) 

ref used to get the value by reference (that is, to borrow it) - without ref sample would look as if we want to transfer the state of the parser to the new owner, the metavariable parser_state (or implicitly copy it).

If you pay attention, we could get a link to the status of the parser in the read function when we did the mapping with the current connection state - for example, like this:

 match self.state { ClientState::AwaitingHandshake(ref parser_state) => { self.read_handshake(parser_state); }, … } 

But this approach is a violation of the rules of the borrowing analyzer, and therefore as a result we would get the following error:

 error: cannot borrow `*self` as mutable because `self.state.0` is also borrowed as immutable ClientState::AwaitingHandshake(ref parser_state) => self.read_handshake(...), ^~~~ 


This happens because during the decompression of the state of the ref parser_state from self.state we also implicitly borrow self and to prevent possible future changes and movements from self.state .

At the same time, in the read_handshake function at the time of entry, we once again borrow self with the ability to change - as a result, an attempt is made to double borrow self to change, which, as you remember, the compiler rules in Rust are forbidden - that is why we borrow the state of the parser as part of the read_handshake function.

All this may not seem very convenient, but in this way Rust protects us from memory damage - following such strict but simple rules, we can always be sure that our program works correctly.

18 Unpacking the frame


Finally, let's write the code to unpack the frame header. But before we get to this, it would be nice to divide the project into modules - as the number of lines grows, keeping all the code in one file becomes quite inconvenient. We will start with the frame.rs module, which will contain code related to working with frames.

Below is a complete listing of the module - the code is quite voluminous, but do not be afraid, after we consider it in detail:

 use std::io; use std::io::Read; use std::error::Error; use byteorder::{ReadBytesExt, BigEndian}; const PAYLOAD_LEN_U16: u8 = 126; const PAYLOAD_LEN_U64: u8 = 127; #[derive(Debug, Clone, Copy, PartialEq)] #[allow(dead_code)] pub enum OpCode { TextFrame = 1, BinaryFrame = 2, ConnectionClose = 8, Ping = 9, Pong = 0xA } impl OpCode { fn from(op: u8) -> Option<OpCode> { match op { 1 => Some(OpCode::TextFrame), 2 => Some(OpCode::BinaryFrame), 8 => Some(OpCode::ConnectionClose), 9 => Some(OpCode::Ping), 0xA => Some(OpCode::Pong), _ => None } } } pub struct WebSocketFrameHeader { fin: bool, rsv1: bool, rsv2: bool, rsv3: bool, masked: bool, opcode: OpCode, payload_length: u8 } pub struct WebSocketFrame { header: WebSocketFrameHeader, mask: Option<[u8; 4]>, pub payload: Vec<u8> } impl WebSocketFrame { pub fn read<R: Read>(input: &mut R) -> io::Result<WebSocketFrame> { let buf = try!(input.read_u16::<BigEndian>()); let header = Self::parse_header(buf); let len = try!(Self::read_length(header.payload_length, input)); let mask_key = if header.masked { let mask = try!(Self::read_mask(input)); Some(mask) } else { None }; let mut payload = try!(Self::read_payload(len, input)); if let Some(mask) = mask_key { Self::apply_mask(mask, &mut payload); } Ok(WebSocketFrame { header: header, payload: payload, mask: mask_key }) } pub fn get_opcode(&self) -> OpCode { self.header.opcode.clone() } fn parse_header(buf: u16) -> Result<WebSocketFrameHeader, String> { let opcode_num = ((buf >> 8) as u8) & 0x0F; let opcode = OpCode::from(opcode_num); if let Some(opcode) = opcode { Ok(WebSocketFrameHeader { fin: (buf >> 8) & 0x80 == 0x80, rsv1: (buf >> 8) & 0x40 == 0x40, rsv2: (buf >> 8) & 0x20 == 0x20, rsv3: (buf >> 8) & 0x10 == 0x10, opcode: opcode, masked: buf & 0x80 == 0x80, payload_length: (buf as u8) & 0x7F, }) } else { Err(format!(" opcode: {}", opcode_num)) } } fn apply_mask(mask: [u8; 4], bytes: &mut Vec<u8>) { for (idx, c) in bytes.iter_mut().enumerate() { *c = *c ^ mask[idx % 4]; } } fn read_mask<R: Read>(input: &mut R) -> io::Result<[u8; 4]> { let mut buf = [0; 4]; try!(input.read(&mut buf)); Ok(buf) } fn read_payload<R: Read>(payload_len: usize, input: &mut R) -> io::Result<Vec<u8>> { let mut payload: Vec<u8> = Vec::with_capacity(payload_len); payload.extend(iter::repeat(0).take(payload_len)); try!(input.read(&mut payload)); Ok(payload) } fn read_length<R: Read>(payload_len: u8, input: &mut R) -> io::Result<usize> { return match payload_len { PAYLOAD_LEN_U64 => input.read_u64::<BigEndian>().map(|v| v as usize).map_err(|e| io::Error::from(e)), PAYLOAD_LEN_U16 => input.read_u16::<BigEndian>().map(|v| v as usize).map_err(|e| io::Error::from(e)), _ => Ok(payload_len as usize) // payload_len < 126 } } } 

The first change you might notice in the code is the use of the pub modifiers for functions, structures, variables, and constants in an explicit form. They define the interface of the module — that is, the characters available to other modules for import using the use frame::{a, b, c}; construct use frame::{a, b, c}; .

Then we define two new structures: WebSocketFrameHeader contains frame header data, and WebSocketFrame needed to represent the received and sent frames. The WebSocketFrame structure also contains a read function that allows you to read frames not only from the socket, but also from any other data source — files, strings, etc. That is why, instead of explicitly specifying the type of argument as TcpStream we use a generic abstract type for data sources Read . This rule should always be followed - rarely does it become necessary to specify specific types for function arguments, so it is better to use interfaces / types - this can be useful, for example, when writing tests.

Read title


Let's start the analysis of the code by unpacking the header:

 fn parse_header(buf: [u8; 2]) -> WebSocketFrameHeader { let opcode_num = ((buf >> 8) as u8) & 0x0F; let opcode = OpCode::from(opcode_num); if let Some(opcode) = opcode { Ok(WebSocketFrameHeader { fin: (buf >> 8) & 0x80 == 0x80, rsv1: (buf >> 8) & 0x40 == 0x40, rsv2: (buf >> 8) & 0x20 == 0x20, rsv3: (buf >> 8) & 0x10 == 0x10, opcode: opcode, masked: buf & 0x80 == 0x80, payload_length: (buf as u8) & 0x7F, }) } else { Err(format!(" opcode: {}", opcode_num)) } } 

In addition to the bitwise & (AND) we use the right shift operation - >> .
The idea here is even simpler than with bitwise - it works like this:



That is, we simply shift a certain number of bits from left to right for easier work with bitwise masks.

 let opcode_num = ((buf >> 8) as u8) & 0x0F; 

In the example above, we shift the 8 high bits to the right and apply the mask we already know:


The further process differs only in that we apply different masks to other parts of the header.

Then we create the OpCode from the frame type number:

 let opcode = OpCode::from(opcode_num); 

In such cases, it is always worth using enumerations, because they provide type safety. When using untyped numbers and constants it is very easy to make a mistake, because you can easily use a number that does not correspond to any real opcode. In opcode_num , then there will be an undefined value.
In addition, we can associate each type in the list with any number:

 pub enum OpCode { TextFrame = 1, BinaryFrame = 2, … } 

In order to obtain from the number the corresponding type of OpCode , we use a separate function:

 impl OpCode { fn from(op: u8) -> Option<OpCode> { match op { 1 => Some(OpCode::TextFrame), 2 => Some(OpCode::BinaryFrame), … _ => None } } } 

The function is safe because the return value is of type Option<OpCode> - for those cases where the number does not match any opcode, we simply return None .

Determination of data size


Now we need to determine the size of the data. The 7-bit frame header is responsible for this, therefore the maximum possible value for payload len is 127. At the same time, obviously, the frame size can go far beyond these limits, so we use special logic to determine the actual length in bytes. Quotation from RFC (section 5.2):

The size of the data, in bytes: if 0-125, then this is the size of the data. If 126, then
the size is the next 2 bytes, which should be interpreted as
A 16-bit unsigned integer. If 127, the size is as follows
8 bytes, which should be interpreted as a 64-bit unsigned integer
number.

That is, if we speak in terms of Rust, we read u16 , if payload_len is 126 , and u64 , if 127 . In order not to use these "magic" values ​​in the code, we define a pair of constants with more or less clear names:

 const PAYLOAD_LEN_U16: u8 = 126; const PAYLOAD_LEN_U64: u8 = 127; 

And we will write a separate function to read the data size:

 fn read_length<R: Read>(payload_len: u8, input: &mut R) -> io::Result<usize> { return match payload_len { PAYLOAD_LEN_U64 => input.read_u64::<BigEndian>().map(|v| v as usize).map_err(From::from), PAYLOAD_LEN_U16 => input.read_u16::<BigEndian>().map(|v| v as usize).map_err(From::from), _ => Ok(payload_len as usize) // payload_len < 127 } } 

The code is quite dense, so let's gradually analyze it. First of all, pay attention to the function signature:

 fn read_length<R: Read>(payload_len: u8, input: &mut R) -> io::Result<usize> { 

It takes two arguments: payload_len (the value from the header), and input , which uses the parameterized type R <R: Read> defines this type.

Parameterized (or generic ) types allow functions to accept and return values ​​that are not strictly defined, but of arbitrary type — these are kind of stubs for type designations.

In the read_length function read_length we take as an argument any type that implements the type Read . Thus, we mean that the input variable must respond to a specific interface through which we read bytes from some data source (which can be a network socket, a file, or even just an array of bytes — the Read type is implemented for all these sources).

And finally, we return the result of type io::Result<usize> . io::Result is a type alias for Result . In the std::io module, it is defined as follows:

 type Result<T> = std::result::Result<T, io::Error> 

That is, this is just a brief entry for the type Result<T, io::Error>, where Tis the exact same parameterized type that we used above for the function arguments read_length. We can define our own type aliases in the same way.

Next we do the mapping to the variable payload_len:

 return match payload_len { PAYLOAD_LEN_U64 => input.read_u64::<BigEndian>().map(|v| v as usize).map_err(From::from), … } 

Here we use the very library byteorderthat we talked about at the beginning. At the beginning of the module, we import it as follows:

 use byteorder::{ReadBytesExt, BigEndian}; 

The module ReadBytesExtprovides a type that contains methods for reading numbers in different sizes - u16, u32and u64- in a specific byte order. ReadBytesExtIt works in an interesting way: it extends the type Readfrom the standard library, which leads to the addition of its methods to any structures that implement it Read. Rust allows you to expand any type (or structure) by adding arbitrary methods to existing and future structures. [3]

Using this feature is pretty easy - for example, this is how it is implemented ReadBytesExtin the byteorder code :

 ///  `Read`,     . ( `std::io`.) pub trait ReadBytesExt: io::Read { fn read_u16<T: ByteOrder>(&mut self) -> Result<u16> { // …   … } // …  read_u32, read_u64,  . . … } ///    `ReadBytesExt`   ,  `Read`: impl<R: io::Read + ?Sized> ReadBytesExt for R {} 

Pay attention to the last line, where the most interesting thing happens. Here a generic type is declared R, which includes all structures that implement the type Read. For this type R(or rather, for a set of types) the type is realized ReadBytesExt. The implementation body ( impl) is empty, since it ReadBytesExtalready contains the necessary method implementations (which resembles abstract classes from "traditional" OOP languages).

We also specify the byte order using the parameterization of types: it is determined by the type that implements the type ByteOrder. The library byteorderalready has several such implementations, of which we are most interested in BigEndianand LittleEndian.

This is how we all use it in our code, reading 8 bytes in the big-endian network order:

 input.read_u64::<BigEndian>() .map(|v| v as usize) .map_err(From::from), 

Using the methods of structure Result, mapand map_err, we bring the result Result<u64, byteorder::Error>to the type we need Result<usize, io::Error>. maphere transforms the type of the return value, and map_err, accordingly, the type of error.

Mask


According to the protocol, after we read the frame size, we need to read a 4-byte mask — but only if the bit maskis 1.
It is a good time to use the type Option:

 let mask_key = if header.masked { let mask = try!(Self::read_mask(input)); Some(mask) } else { None } 

In this piece of code, you can find a new interesting construct: macro try! .This macro allows us to handle routine errors in short form. In this case, it is expanded into this code:

 match Self::read_mask(input) { Ok(val) => val, Err(err) => { return Err(From::from(err)) } } 

First, it checks the return read_maskvalue of the function - if it is not an error, then it is unpacked from Ok(...)and returned as a result:

 Ok(val) => val, 

If the result is an error, then it is converted to the type returned by the function (in this case io::Error), and immediately returned as the result of the function (interrupting its further execution):

 Err(err) => { return Err(From::from(err)) } 

Macros try!are a simple and non-polluting code way to handle errors. It matchwould be rather tiring and, frankly, not very pleasant to handle every mistake manually in this way, with comparison through .

Let's look at the implementation of the function read_mask(which, however, is nothing special - we simply sequentially read 4 bytes from the data source into the array):

 fn read_mask<R: Read>(input: &mut R) -> io::Result<[u8; 4]> { let mut buf = [0; 4]; try!(input.read(&mut buf)); Ok(buf) } 

After we read the mask, we can proceed directly to reading the frame data:

 fn read_payload<R: Read>(payload_len: usize, input: &mut R) -> io::Result<Vec<u8>> { let mut payload: Vec<u8> = Vec::with_capacity(payload_len); payload.extend(iter::repeat(0).take(payload_len)); try!(input.read(&mut payload)); Ok(payload) } 

Explanations are needed here. First we create a buffer that will contain the frame data:

 let mut payload: Vec<u8> = Vec::with_capacity(payload_len); 

Vec::with_capacitycreates a vector with a pre-allocated memory for it. We use a vector, dynamically allocated array, because with the standard method of declaring byte arrays [0; <>]we cannot use a variable to specify the size, since these are static buffers and their size cannot be changed.

Vectors allow you to create arrays of arbitrary size. In order to use them effectively, you need to know about the difference between capacity and length .


If the length determines the number of elements in the vector, the capacity is the number of elements that the vector can hold without re-allocating memory . When allocating memory for a vector, all its elements are moved to another area of ​​memory - obviously, this is not a quick process (especially with a large size of elements), so if we guess with the intended capacity, the vector will work faster.

An interesting detail is related to this. We read data from the source Readas follows:

 try!(input.read(&mut payload)); 

Since we do not explicitly indicate the number of bytes read anywhere, the method readfor estimating uses the size of the transferred buffer. Since the size of a vector is determined by its length, not its capacity, and our vector payloadcontains no elements, it will, accordingly, readtry to read 0 bytes and will not return anything.

For this reason, we use this trick, pre-filling the buffer with zeros:

 payload.extend(iter::repeat(0).take(payload_len)); 

We create an iterator that infinitely repeats zeros, and limit the number of elements in a variable payload_len, resulting in a sequence [0, 0, 0, ...payload_len].

That's all - now we have a vector with frame data. We can only apply a mask to it, following the description in the RFC :

5.3

,
, .
.

i («transformed-octet-i»),
i («original-octet-i»)
(XOR) i 4 («masking-key-octet-j»):

 `j = i MOD 4
transformed-octet-i = original-octet-i XOR masking-key-octet-j` 


, :

 fn apply_mask(mask: [u8; 4], bytes: &mut Vec<u8>) { for (idx, c) in bytes.iter_mut().enumerate() { *c = *c ^ mask[idx % 4]; } } 

iterate_mut() , enumerate() , .


, ^ , , : 1 , (.. 1 ) A B , .

, .

, (, " " — , ). WebSocket , " ". [four]

Results


, , WebSocketClient :

 impl WebSocketClient { // …    … fn read(&mut self) { match self.state { ClientState::AwaitingHandshake(_) => { … }, //      `Connected`: ClientState::Connected => { let frame = WebSocketFrame::read(&mut self.socket); match frame { Ok(frame) => println!("{:?}", frame), Err(e) => println!("   : {}", e) } } } } } 

— cargo run . :


19


, , . , — - (, , ..) .

:

 impl WebSocketFrameHeader { fn new_header(len: usize, opcode: OpCode) -> WebSocketFrameHeader { WebSocketFrameHeader { fin: true, rsv1: false, rsv2: false, rsv3: false, masked: false, payload_length: Self::determine_len(len), opcode: opcode } } fn determine_len(len: usize) -> u8 { if len < (PAYLOAD_LEN_U16 as usize) { len as u8 } else if len < (u16::MAX as usize) { PAYLOAD_LEN_U16 } else { PAYLOAD_LEN_U64 } } } 

new_header 2 — , . determine_len "" , (.., 126 127).

, — . , :

 impl<'a> From<&'a str> for WebSocketFrame { fn from(payload: &str) -> WebSocketFrame { WebSocketFrame { header: WebSocketFrameHeader::new_header(payload.len(), OpCode::TextFrame), payload: Vec::from(payload), mask: None } } } 

From , . — From<&'a str> . — &'a str , &str , <'a> impl<'a> ?

Rust: . , , — , . , , - , .

, — , . , . 'a .

, , — , , , From .

-, , . , " Rust" .

. , , — , . , parse_header — serialize_header :

 impl WebSocketFrame { // …    … fn serialize_header(hdr: &WebSocketFrameHeader) -> u16 { let b1 = ((hdr.fin as u8) << 7) | ((hdr.rsv1 as u8) << 6) | ((hdr.rsv2 as u8) << 5) | ((hdr.rsv3 as u8) << 4) | ((hdr.opcode as u8) & 0x0F); let b2 = ((hdr.masked as u8) << 7) | ((hdr.payload_length as u8) & 0x7F); ((b1 as u16) << 8) | (b2 as u16) } } 

, << — . , >> , , :


, | — — , . , , 1 , — 0 , 1 :


( u16 ) — 8 , :

 ((b1 as u16) << 8) | (b2 as u16) 

write , (, , ..):

 pub fn write<W: Write>(&self, output: &mut W) -> io::Result<()> { let hdr = Self::serialize_header(&self.header); try!(output.write_u16::<BigEndian>(hdr)); match self.header.payload_length { PAYLOAD_LEN_U16 => try!(output.write_u16::<BigEndian>(self.payload.len() as u16)), PAYLOAD_LEN_U64 => try!(output.write_u64::<BigEndian>(self.payload.len() as u64)), _ => {} } try!(output.write(&self.payload)); Ok(()) } 

2 , ( 125 ), — , RFC , .

— ! WebSocketClient.write — , write_handshake , write :

 impl WebSocketClient { fn write(&mut self) { match self.state { ClientState::AwaitingHandshake(_) => { self.write_handshake(); }, _ => {} } } fn write_handshake(&mut self) { let headers = self.headers.borrow(); let response_key = gen_key(&headers.get("Sec-WebSocket-Key").unwrap()); … } } 

ClientState::Connected — , . — , , , . WebSocketClient :

 struct WebSocketClient { socket: TcpStream, … //    : outgoing: Vec<WebSocketFrame> } 

-:

 impl WebSocketClient { // …    … fn new(socket: TcpStream) -> WebSocketClient { let headers = Rc::new(RefCell::new(HashMap::new())); WebSocketClient { … outgoing: Vec::new() } } } 

, , write :

 match self.state { ClientState::HandshakeResponse => …, ClientState::Connected => { println!("   : {}", self.outgoing.len()); for frame in self.outgoing.iter() { if let Err(e) = frame.write(&mut self.socket) { println!("  : {}", e); } } self.outgoing.clear(); self.interest.remove(EventSet::writable()); self.interest.insert(EventSet::readable()); }, _ => {} } 

, , . , , - . read :

 fn read(&mut self) { match self.state { ClientState::AwaitingHandshake(_) => { … }, ClientState::Connected => { let frame = WebSocketFrame::read(&mut self.socket); match frame { Ok(frame) => { println!("{:?}", frame), //     : let reply_frame = WebSocketFrame::from("!"); self.outgoing.push(reply_frame); //      : self.interest.remove(EventSet::readable()); self.interest.insert(EventSet::writable()); }, Err(e) => println!("   : {}", e) } } } } 

cargo run , , , . :

 ws = new WebSocket('ws://127.0.0.1:10000'); ws.onmessage = function (event) { console.log('  : ', event.data); }; 

"" :

 ws.send('!'); 

:


20


, . , . , . , -.

, WebSocketClient.read :

 pub fn read(&mut self) { match self.state { ClientState::AwaitingHandshake(_) => self.read_handshake(); ClientState::Connected => self.read_frame(), _ => {} } } fn read_frame(&mut self) { let frame = WebSocketFrame::read(&mut self.socket); … } 

, ping pong, . read_frame :

 fn read_frame(&mut self) { let frame = WebSocketFrame::read(&mut self.socket); match frame { Ok(frame) => { match frame.get_opcode() { OpCode::TextFrame => { println!("{:?}", frame), let reply_frame = WebSocketFrame::from("!"); self.outgoing.push(reply_frame); }, _ => {} } self.interest.remove(EventSet::readable()); self.interest.insert(EventSet::writable()); } Err(e) => println!("   : {}", e) } } 

get_opcode , .

Ping :

 match frame.get_opcode() { OpCode::TextFrame => …, OpCode::Ping => { self.outgoing.push(WebSocketFrame::pong(&frame)); } _ => {} } 

Pong ( ):

 impl WebSocketFrame { // …    … pub fn pong(ping_frame: &WebSocketFrame) -> WebSocketFrame { let payload = ping_frame.payload.clone(); WebSocketFrame { header: WebSocketFrameHeader::new_header(payload.len(), OpCode::Pong), payload: payload, mask: None } } } 

Close :

 match frame.get_opcode() { OpCode::TextFrame => …, OpCode::Ping => …, //        ConnectionClose: OpCode::ConnectionClose => { self.outgoing.push(WebSocketFrame::close_from(&frame)); }, _ => {} } 

ConnectionClose , . , , , , :

2- ( ),
/code/ 7.4.

7.4 , — . , close_from , :

 impl WebSocketFrame { … pub fn close_from(recv_frame: &WebSocketFrame) -> WebSocketFrame { let body = if recv_frame.payload.len() > 0 { let status_code = &recv_frame.payload[0..2]; let mut body = Vec::with_capacity(2); body.write(status_code); body } else { Vec::new() }; WebSocketFrame { header: WebSocketFrameHeader::new_header(body.len(), OpCode::ConnectionClose), payload: body, mask: None } } } 

. , TCP-. , WebSocketClient , hup :

 impl WebSocketClient { fn write(&mut self) { match self.state { ClientState::HandshakeResponse => …, ClientState::Connected => { //         : let mut close_connection = false; for frame in self.outgoing.iter() { if let Err(e) = frame.write(&mut self.socket) { println!("   : {}", e); } // ,     ,   : if (frame.is_close()) { close_connection = true; } } self.outgoing.clear(); self.interest.remove(EventSet::writable()); //     -    `hup`,   - //     : if (close_connection) { self.interest.insert(EventSet::hup()); } else { self.interest.insert(EventSet::readable()); } }, _ => {} } } } 

— :

 impl WebSocketFrame { // …    … pub fn is_close(&self) -> bool { self.header.opcode == OpCode::ConnectionClose } } 

— WebSocketServer :

 impl Handler for WebSocketServer { // …    … fn ready(&mut self, event_loop: &mut EventLoop<WebSocketServer>, token: Token, events: EventSet) { if events.is_readable() { … } if events.is_writable() { … } //   : if events.is_hup() { let client = self.clients.remove(&token).unwrap(); client.socket.shutdown(Shutdown::Both); event_loop.deregister(&client.socket); } } } 

.

21


. , WebSocket , WebSocket .

, . , RSS- .

, , GitHub . -, - .

C !

Notes


[1] , , " ", «» (little-endian) «» (big-endian). ↑
[2] «» , x86. , : , , — . , little-endian, , . ↑
[3] - C#, — . , , . ↑
[4] , *(.)*. " Talking to Yourself for Fun and Profit ". ↑



, podust .

Source: https://habr.com/ru/post/278635/


All Articles