fin
- the last fragment marker in a series of fragmented frames, which are used when the length of the transmitted data is not known in advance - for example, for streaming transmission. For the time being, we will not cover this topic in detail, since there is no need to solve this problem directly.rsv1
, rsv2
, and rsv3
reserved for future versions of WebSocket in case any protocol extensions appear. For our task this is also irrelevant, therefore, we can safely ignore these fields.opcode
specifies the type of frame sent or received, which may contain binary data or text. In addition, there are several types of control frames (control frames) used for pings, or in cases where one of the parties interrupts the connection. We will discuss this issue later.masked
set to 1 (true) when data is encoded using a bitmask. Details later.payload len
- the size of the transmitted data. This field requires special logic for processing, since its value depends on the size of the data - we will return to this soon. { "fin" : 1, "rsv1" : 0, "rsv2" : 0, "rsv3" : 0, "opcode" : 2, "masked" : 1, "payload_len": 64 }
{ "fin" : b1, "rsv1" : b0, "rsv2" : b0, "rsv3" : b0, "opcode" : b10, "masked" : b1, "payload_len": b1000000 }
1.0.0.0.0010.1.1000000
, or in hexadecimal system: 0x82C0
.00001111b
.00100000b
.&
operator (bitwise AND , and
) as follows: &
.0
, and the green ones - 1
.A = 1b
and B = 0b
, at the output we get Out = 0b
.A = 0b
and B = 1b
.Out = 1b
at the output is obtained only if both incoming numbers are ones. That's the whole logic.
operation (AND) is applied sequentially to each of them. Suppose we have the values A = 101b
and B = 011b
. In this case, the output is Out = 001b
, since the only common bit for A
and B
is the third unit, 1b
.0xFFFF
.0x0001
- in which order should these two bytes be placed in memory?0x2A42
high byte is 0x2A
. Such a record corresponds to the order of big-endian .0x2A42
as 0x422A
.0x0001
, which we write in the "little" way in little-endian as 0x0100
.WebSocketClient
structure a bit so that our code does not turn into noodles. Let me remind you that in this structure we process client connections, and the read
method in it is called every time new data from the client comes to us.fn read
to fn read_handshake
. Now we can add a new read
implementation that, depending on the state of the connected client, will transfer control of the desired implementation: fn read(&mut self) { match self.state { ClientState::AwaitingHandshake => { self.read_handshake(); }, _ => {} } }
self.state
, and process the case when the state is equal to AwaitingHandshake
(waiting for the connection to start using the HTTP protocol). For all other possible cases, we use the universal _
pattern, because the Rust compiler requires match
cover all possible cases for the safe operation of our program.self.state
), let's do another small refactoring, which is available to us thanks to Rust's interesting features. In the first part of the article, we parse the HTTP request headers using a special structure, Parser
, which contains the context and status of the HTTP parser. At the same time, we need this structure only once - when we have not yet proceeded to communicate using the WebSocket protocol, so it would be nice to free the memory we use with the parser that we do not need.ClientState
. Code example: enum ClientState { // enum: AwaitingHandshake(RefCell<Parser<HttpParser>>), … } struct WebSocketClient { // : headers: Rc<RefCell<HashMap<String, String>>>, … interest: EventSet, … } // : impl WebSocketClient { fn new(socket: TcpStream) -> WebSocketClient { … WebSocketClient { socket: socket, headers: headers.clone(), interest: EventSet::readable(), state: ClientState::AwaitingHandshake(RefCell::new(Parser::request(HttpParser { current_key: None, headers: headers.clone() }))) } } }
Option
and Result
, already familiar to us. There is no black magic in these structures, and you can easily implement them yourself if you wish.RefCell
to encapsulate the variable state of the parser.ClientState::AwaitingHandshake
type), all memory associated with it will be automatically released when the state transitions to another enum value - for example, to ClientState::Connected
. This effect we sought. Now, after we removed the http_parser
from the client structure, we need to change the read
and read_handshake
functions read_handshake
: fn read(&mut self) { match self.state { ClientState::AwaitingHandshake(_) => { self.read_handshake(); }, _ => {} } } fn read_handshake(&mut self) { let is_upgrade = if let ClientState::AwaitingHandshake(ref parser_state) = self.state { let mut parser = parser_state.borrow_mut(); parser.parse(&buf); parser.is_upgrade() } else { false }; if is_upgrade { // self.state = ClientState::HandshakeResponse; … } }
read_handshake
you can see an unfamiliar expression, if let
: let is_upgrade = if let ClientState::AwaitingHandshake(ref parser_state) = self.state {
if let
is a simplified version of the match pattern match
operator. With it, we can only do matching on one sample, which in some cases is more convenient than match
, which requires handling all possible cases.ref
keyword: ClientState::AwaitingHandshake(ref parser_state)
ref
used to get the value by reference (that is, to borrow it) - without ref
sample would look as if we want to transfer the state of the parser to the new owner, the metavariable parser_state
(or implicitly copy it).read
function when we did the mapping with the current connection state - for example, like this: match self.state { ClientState::AwaitingHandshake(ref parser_state) => { self.read_handshake(parser_state); }, … }
error: cannot borrow `*self` as mutable because `self.state.0` is also borrowed as immutable ClientState::AwaitingHandshake(ref parser_state) => self.read_handshake(...), ^~~~
ref parser_state
from self.state
we also implicitly borrow self
and to prevent possible future changes and movements from self.state
.read_handshake
function at the time of entry, we once again borrow self
with the ability to change - as a result, an attempt is made to double borrow self
to change, which, as you remember, the compiler rules in Rust are forbidden - that is why we borrow the state of the parser as part of the read_handshake
function.frame.rs
module, which will contain code related to working with frames. use std::io; use std::io::Read; use std::error::Error; use byteorder::{ReadBytesExt, BigEndian}; const PAYLOAD_LEN_U16: u8 = 126; const PAYLOAD_LEN_U64: u8 = 127; #[derive(Debug, Clone, Copy, PartialEq)] #[allow(dead_code)] pub enum OpCode { TextFrame = 1, BinaryFrame = 2, ConnectionClose = 8, Ping = 9, Pong = 0xA } impl OpCode { fn from(op: u8) -> Option<OpCode> { match op { 1 => Some(OpCode::TextFrame), 2 => Some(OpCode::BinaryFrame), 8 => Some(OpCode::ConnectionClose), 9 => Some(OpCode::Ping), 0xA => Some(OpCode::Pong), _ => None } } } pub struct WebSocketFrameHeader { fin: bool, rsv1: bool, rsv2: bool, rsv3: bool, masked: bool, opcode: OpCode, payload_length: u8 } pub struct WebSocketFrame { header: WebSocketFrameHeader, mask: Option<[u8; 4]>, pub payload: Vec<u8> } impl WebSocketFrame { pub fn read<R: Read>(input: &mut R) -> io::Result<WebSocketFrame> { let buf = try!(input.read_u16::<BigEndian>()); let header = Self::parse_header(buf); let len = try!(Self::read_length(header.payload_length, input)); let mask_key = if header.masked { let mask = try!(Self::read_mask(input)); Some(mask) } else { None }; let mut payload = try!(Self::read_payload(len, input)); if let Some(mask) = mask_key { Self::apply_mask(mask, &mut payload); } Ok(WebSocketFrame { header: header, payload: payload, mask: mask_key }) } pub fn get_opcode(&self) -> OpCode { self.header.opcode.clone() } fn parse_header(buf: u16) -> Result<WebSocketFrameHeader, String> { let opcode_num = ((buf >> 8) as u8) & 0x0F; let opcode = OpCode::from(opcode_num); if let Some(opcode) = opcode { Ok(WebSocketFrameHeader { fin: (buf >> 8) & 0x80 == 0x80, rsv1: (buf >> 8) & 0x40 == 0x40, rsv2: (buf >> 8) & 0x20 == 0x20, rsv3: (buf >> 8) & 0x10 == 0x10, opcode: opcode, masked: buf & 0x80 == 0x80, payload_length: (buf as u8) & 0x7F, }) } else { Err(format!(" opcode: {}", opcode_num)) } } fn apply_mask(mask: [u8; 4], bytes: &mut Vec<u8>) { for (idx, c) in bytes.iter_mut().enumerate() { *c = *c ^ mask[idx % 4]; } } fn read_mask<R: Read>(input: &mut R) -> io::Result<[u8; 4]> { let mut buf = [0; 4]; try!(input.read(&mut buf)); Ok(buf) } fn read_payload<R: Read>(payload_len: usize, input: &mut R) -> io::Result<Vec<u8>> { let mut payload: Vec<u8> = Vec::with_capacity(payload_len); payload.extend(iter::repeat(0).take(payload_len)); try!(input.read(&mut payload)); Ok(payload) } fn read_length<R: Read>(payload_len: u8, input: &mut R) -> io::Result<usize> { return match payload_len { PAYLOAD_LEN_U64 => input.read_u64::<BigEndian>().map(|v| v as usize).map_err(|e| io::Error::from(e)), PAYLOAD_LEN_U16 => input.read_u16::<BigEndian>().map(|v| v as usize).map_err(|e| io::Error::from(e)), _ => Ok(payload_len as usize) // payload_len < 126 } } }
pub
modifiers for functions, structures, variables, and constants in an explicit form. They define the interface of the module — that is, the characters available to other modules for import using the use frame::{a, b, c};
construct use frame::{a, b, c};
.WebSocketFrameHeader
contains frame header data, and WebSocketFrame
needed to represent the received and sent frames. The WebSocketFrame
structure also contains a read
function that allows you to read frames not only from the socket, but also from any other data source — files, strings, etc. That is why, instead of explicitly specifying the type of argument as TcpStream
we use a generic abstract type for data sources Read
. This rule should always be followed - rarely does it become necessary to specify specific types for function arguments, so it is better to use interfaces / types - this can be useful, for example, when writing tests. fn parse_header(buf: [u8; 2]) -> WebSocketFrameHeader { let opcode_num = ((buf >> 8) as u8) & 0x0F; let opcode = OpCode::from(opcode_num); if let Some(opcode) = opcode { Ok(WebSocketFrameHeader { fin: (buf >> 8) & 0x80 == 0x80, rsv1: (buf >> 8) & 0x40 == 0x40, rsv2: (buf >> 8) & 0x20 == 0x20, rsv3: (buf >> 8) & 0x10 == 0x10, opcode: opcode, masked: buf & 0x80 == 0x80, payload_length: (buf as u8) & 0x7F, }) } else { Err(format!(" opcode: {}", opcode_num)) } }
&
(AND) we use the right shift operation - >>
.
- it works like this: let opcode_num = ((buf >> 8) as u8) & 0x0F;
OpCode
from the frame type number: let opcode = OpCode::from(opcode_num);
opcode_num
, then there will be an undefined value. pub enum OpCode { TextFrame = 1, BinaryFrame = 2, … }
OpCode
, we use a separate function: impl OpCode { fn from(op: u8) -> Option<OpCode> { match op { 1 => Some(OpCode::TextFrame), 2 => Some(OpCode::BinaryFrame), … _ => None } } }
Option<OpCode>
- for those cases where the number does not match any opcode, we simply return None
.payload len
is 127. At the same time, obviously, the frame size can go far beyond these limits, so we use special logic to determine the actual length in bytes. Quotation from RFC (section 5.2):The size of the data, in bytes: if 0-125, then this is the size of the data. If 126, then
the size is the next 2 bytes, which should be interpreted as
A 16-bit unsigned integer. If 127, the size is as follows
8 bytes, which should be interpreted as a 64-bit unsigned integer
number.
u16
, if payload_len
is 126
, and u64
, if 127
. In order not to use these "magic" values ​​in the code, we define a pair of constants with more or less clear names: const PAYLOAD_LEN_U16: u8 = 126; const PAYLOAD_LEN_U64: u8 = 127;
fn read_length<R: Read>(payload_len: u8, input: &mut R) -> io::Result<usize> { return match payload_len { PAYLOAD_LEN_U64 => input.read_u64::<BigEndian>().map(|v| v as usize).map_err(From::from), PAYLOAD_LEN_U16 => input.read_u16::<BigEndian>().map(|v| v as usize).map_err(From::from), _ => Ok(payload_len as usize) // payload_len < 127 } }
fn read_length<R: Read>(payload_len: u8, input: &mut R) -> io::Result<usize> {
payload_len
(the value from the header), and input
, which uses the parameterized type R
<R: Read>
defines this type.read_length
function read_length
we take as an argument any type that implements the type Read
. Thus, we mean that the input
variable must respond to a specific interface through which we read bytes from some data source (which can be a network socket, a file, or even just an array of bytes — the Read
type is implemented for all these sources).io::Result<usize>
. io::Result
is a type alias for Result
. In the std::io
module, it is defined as follows: type Result<T> = std::result::Result<T, io::Error>
Result<T, io::Error>
, where T
is the exact same parameterized type that we used above for the function arguments read_length
. We can define our own type aliases in the same way.payload_len
: return match payload_len { PAYLOAD_LEN_U64 => input.read_u64::<BigEndian>().map(|v| v as usize).map_err(From::from), … }
byteorder
that we talked about at the beginning. At the beginning of the module, we import it as follows: use byteorder::{ReadBytesExt, BigEndian};
ReadBytesExt
provides a type that contains methods for reading numbers in different sizes - u16
, u32
and u64
- in a specific byte order. ReadBytesExt
It works in an interesting way: it extends the type Read
from the standard library, which leads to the addition of its methods to any structures that implement it Read
. Rust allows you to expand any type (or structure) by adding arbitrary methods to existing and future structures. [3]ReadBytesExt
in the byteorder code : /// `Read`, . ( `std::io`.) pub trait ReadBytesExt: io::Read { fn read_u16<T: ByteOrder>(&mut self) -> Result<u16> { // … … } // … read_u32, read_u64, . . … } /// `ReadBytesExt` , `Read`: impl<R: io::Read + ?Sized> ReadBytesExt for R {}
R
, which includes all structures that implement the type Read
. For this type R
(or rather, for a set of types) the type is realized ReadBytesExt
. The implementation body ( impl
) is empty, since it ReadBytesExt
already contains the necessary method implementations (which resembles abstract classes from "traditional" OOP languages).ByteOrder
. The library byteorder
already has several such implementations, of which we are most interested in BigEndian
and LittleEndian
. input.read_u64::<BigEndian>() .map(|v| v as usize) .map_err(From::from),
Result
, map
and map_err
, we bring the result Result<u64, byteorder::Error>
to the type we need Result<usize, io::Error>
. map
here transforms the type of the return value, and map_err
, accordingly, the type of error.mask
is 1.Option
: let mask_key = if header.masked { let mask = try!(Self::read_mask(input)); Some(mask) } else { None }
try!
.This macro allows us to handle routine errors in short form. In this case, it is expanded into this code: match Self::read_mask(input) { Ok(val) => val, Err(err) => { return Err(From::from(err)) } }
read_mask
value of the function - if it is not an error, then it is unpacked from Ok(...)
and returned as a result: Ok(val) => val,
io::Error
), and immediately returned as the result of the function (interrupting its further execution): Err(err) => { return Err(From::from(err)) }
try!
are a simple and non-polluting code way to handle errors. It match
would be rather tiring and, frankly, not very pleasant to handle every mistake manually in this way, with comparison through .read_mask
(which, however, is nothing special - we simply sequentially read 4 bytes from the data source into the array): fn read_mask<R: Read>(input: &mut R) -> io::Result<[u8; 4]> { let mut buf = [0; 4]; try!(input.read(&mut buf)); Ok(buf) }
fn read_payload<R: Read>(payload_len: usize, input: &mut R) -> io::Result<Vec<u8>> { let mut payload: Vec<u8> = Vec::with_capacity(payload_len); payload.extend(iter::repeat(0).take(payload_len)); try!(input.read(&mut payload)); Ok(payload) }
let mut payload: Vec<u8> = Vec::with_capacity(payload_len);
Vec::with_capacity
creates a vector with a pre-allocated memory for it. We use a vector, dynamically allocated array, because with the standard method of declaring byte arrays [0; <>]
we cannot use a variable to specify the size, since these are static buffers and their size cannot be changed.Read
as follows: try!(input.read(&mut payload));
read
for estimating uses the size of the transferred buffer. Since the size of a vector is determined by its length, not its capacity, and our vector payload
contains no elements, it will, accordingly, read
try to read 0 bytes and will not return anything. payload.extend(iter::repeat(0).take(payload_len));
payload_len
, resulting in a sequence [0, 0, 0, ...payload_len]
.5.3
,
, .
.
i («transformed-octet-i»),
i («original-octet-i»)
(XOR) i 4 («masking-key-octet-j»):`j = i MOD 4 transformed-octet-i = original-octet-i XOR masking-key-octet-j`
fn apply_mask(mask: [u8; 4], bytes: &mut Vec<u8>) { for (idx, c) in bytes.iter_mut().enumerate() { *c = *c ^ mask[idx % 4]; } }
iterate_mut()
, enumerate()
, .^
, , : 1
, (.. 1
) A
B
, .WebSocketClient
: impl WebSocketClient { // … … fn read(&mut self) { match self.state { ClientState::AwaitingHandshake(_) => { … }, // `Connected`: ClientState::Connected => { let frame = WebSocketFrame::read(&mut self.socket); match frame { Ok(frame) => println!("{:?}", frame), Err(e) => println!(" : {}", e) } } } } }
cargo run
. : impl WebSocketFrameHeader { fn new_header(len: usize, opcode: OpCode) -> WebSocketFrameHeader { WebSocketFrameHeader { fin: true, rsv1: false, rsv2: false, rsv3: false, masked: false, payload_length: Self::determine_len(len), opcode: opcode } } fn determine_len(len: usize) -> u8 { if len < (PAYLOAD_LEN_U16 as usize) { len as u8 } else if len < (u16::MAX as usize) { PAYLOAD_LEN_U16 } else { PAYLOAD_LEN_U64 } } }
new_header
2 — , . determine_len
"" , (.., 126 127). impl<'a> From<&'a str> for WebSocketFrame { fn from(payload: &str) -> WebSocketFrame { WebSocketFrame { header: WebSocketFrameHeader::new_header(payload.len(), OpCode::TextFrame), payload: Vec::from(payload), mask: None } } }
From
, . — From<&'a str>
. — &'a str
, &str
, <'a>
impl<'a>
?'a
.From
.parse_header
— serialize_header
: impl WebSocketFrame { // … … fn serialize_header(hdr: &WebSocketFrameHeader) -> u16 { let b1 = ((hdr.fin as u8) << 7) | ((hdr.rsv1 as u8) << 6) | ((hdr.rsv2 as u8) << 5) | ((hdr.rsv3 as u8) << 4) | ((hdr.opcode as u8) & 0x0F); let b2 = ((hdr.masked as u8) << 7) | ((hdr.payload_length as u8) & 0x7F); ((b1 as u16) << 8) | (b2 as u16) } }
<<
— . , >>
, , :|
—
— , . , , 1
, — 0
, 1
:u16
) — 8 , : ((b1 as u16) << 8) | (b2 as u16)
write
, (, , ..): pub fn write<W: Write>(&self, output: &mut W) -> io::Result<()> { let hdr = Self::serialize_header(&self.header); try!(output.write_u16::<BigEndian>(hdr)); match self.header.payload_length { PAYLOAD_LEN_U16 => try!(output.write_u16::<BigEndian>(self.payload.len() as u16)), PAYLOAD_LEN_U64 => try!(output.write_u64::<BigEndian>(self.payload.len() as u64)), _ => {} } try!(output.write(&self.payload)); Ok(()) }
WebSocketClient.write
— , write_handshake
, write
: impl WebSocketClient { fn write(&mut self) { match self.state { ClientState::AwaitingHandshake(_) => { self.write_handshake(); }, _ => {} } } fn write_handshake(&mut self) { let headers = self.headers.borrow(); let response_key = gen_key(&headers.get("Sec-WebSocket-Key").unwrap()); … } }
ClientState::Connected
— , . — , , , . WebSocketClient
: struct WebSocketClient { socket: TcpStream, … // : outgoing: Vec<WebSocketFrame> }
impl WebSocketClient { // … … fn new(socket: TcpStream) -> WebSocketClient { let headers = Rc::new(RefCell::new(HashMap::new())); WebSocketClient { … outgoing: Vec::new() } } }
write
: match self.state { ClientState::HandshakeResponse => …, ClientState::Connected => { println!(" : {}", self.outgoing.len()); for frame in self.outgoing.iter() { if let Err(e) = frame.write(&mut self.socket) { println!(" : {}", e); } } self.outgoing.clear(); self.interest.remove(EventSet::writable()); self.interest.insert(EventSet::readable()); }, _ => {} }
read
: fn read(&mut self) { match self.state { ClientState::AwaitingHandshake(_) => { … }, ClientState::Connected => { let frame = WebSocketFrame::read(&mut self.socket); match frame { Ok(frame) => { println!("{:?}", frame), // : let reply_frame = WebSocketFrame::from("!"); self.outgoing.push(reply_frame); // : self.interest.remove(EventSet::readable()); self.interest.insert(EventSet::writable()); }, Err(e) => println!(" : {}", e) } } } }
cargo run
, , , . : ws = new WebSocket('ws://127.0.0.1:10000'); ws.onmessage = function (event) { console.log(' : ', event.data); };
ws.send('!');
WebSocketClient.read
: pub fn read(&mut self) { match self.state { ClientState::AwaitingHandshake(_) => self.read_handshake(); ClientState::Connected => self.read_frame(), _ => {} } } fn read_frame(&mut self) { let frame = WebSocketFrame::read(&mut self.socket); … }
read_frame
: fn read_frame(&mut self) { let frame = WebSocketFrame::read(&mut self.socket); match frame { Ok(frame) => { match frame.get_opcode() { OpCode::TextFrame => { println!("{:?}", frame), let reply_frame = WebSocketFrame::from("!"); self.outgoing.push(reply_frame); }, _ => {} } self.interest.remove(EventSet::readable()); self.interest.insert(EventSet::writable()); } Err(e) => println!(" : {}", e) } }
get_opcode
, .Ping
: match frame.get_opcode() { OpCode::TextFrame => …, OpCode::Ping => { self.outgoing.push(WebSocketFrame::pong(&frame)); } _ => {} }
Pong
( ): impl WebSocketFrame { // … … pub fn pong(ping_frame: &WebSocketFrame) -> WebSocketFrame { let payload = ping_frame.payload.clone(); WebSocketFrame { header: WebSocketFrameHeader::new_header(payload.len(), OpCode::Pong), payload: payload, mask: None } } }
Close
: match frame.get_opcode() { OpCode::TextFrame => …, OpCode::Ping => …, // ConnectionClose: OpCode::ConnectionClose => { self.outgoing.push(WebSocketFrame::close_from(&frame)); }, _ => {} }
ConnectionClose
, . , , , , :2- ( ),
/code/ 7.4.
close_from
, : impl WebSocketFrame { … pub fn close_from(recv_frame: &WebSocketFrame) -> WebSocketFrame { let body = if recv_frame.payload.len() > 0 { let status_code = &recv_frame.payload[0..2]; let mut body = Vec::with_capacity(2); body.write(status_code); body } else { Vec::new() }; WebSocketFrame { header: WebSocketFrameHeader::new_header(body.len(), OpCode::ConnectionClose), payload: body, mask: None } } }
WebSocketClient
, hup
: impl WebSocketClient { fn write(&mut self) { match self.state { ClientState::HandshakeResponse => …, ClientState::Connected => { // : let mut close_connection = false; for frame in self.outgoing.iter() { if let Err(e) = frame.write(&mut self.socket) { println!(" : {}", e); } // , , : if (frame.is_close()) { close_connection = true; } } self.outgoing.clear(); self.interest.remove(EventSet::writable()); // - `hup`, - // : if (close_connection) { self.interest.insert(EventSet::hup()); } else { self.interest.insert(EventSet::readable()); } }, _ => {} } } }
impl WebSocketFrame { // … … pub fn is_close(&self) -> bool { self.header.opcode == OpCode::ConnectionClose } }
WebSocketServer
: impl Handler for WebSocketServer { // … … fn ready(&mut self, event_loop: &mut EventLoop<WebSocketServer>, token: Token, events: EventSet) { if events.is_readable() { … } if events.is_writable() { … } // : if events.is_hup() { let client = self.clients.remove(&token).unwrap(); client.socket.shutdown(Shutdown::Both); event_loop.deregister(&client.socket); } } }
Source: https://habr.com/ru/post/278635/
All Articles