let foo = vec![1, 2, 3]; // (), 1, 2, 3, // `foo`. let bar = foo; // `bar`. // `foo`, // "" - .., .
{
and }
).The event loop in Node.js and the EventMachine gems in Ruby work in exactly the same way.
The same is true for the nginx web server, which uses exclusively asynchronous I / O [9] .
Further text implies that you already have Rust installed. If not, then follow the documentation on the official site .
cargo
that performs functions similar to Maven, Composer, npm, or rake - it manages dependencies of our application, builds a project, runs tests, and most importantly, it simplifies the process of creating a new project. cargo new chat --bin
--bin
indicates to Cargo that it is necessary to create a running application, and not a library. Cargo.toml src/main.rs
Cargo.toml
contains a description and links to project dependencies (similar to package.json
in JavaScript).src/main.rs
is the main source file and entry point to our program.cargo run
. The same command displays errors in the code, if any.If you are a happy Emacs user, you will be happy to know that it is compatible with Cargo “out of the box” - just install therust-mode
package from the MELPA repository and configure the compile command to start thecargo build
.
mio
- there are many useful changes in the development version 0.4, which are also incompatible with the old versions. Therefore, we connect it directly through GitHub, adding such lines to Cargo.toml
: [dependencies.mio] git = "https://github.com/carllerche/mio"
main.rs
: extern crate mio; use mio::*;
mio
is easy. First of all, let's create a loop of events by calling the EventLoop::new()
function. From the empty cycle, however, there is no use, so let's immediately add event handling to our chat, defining a structure with functions that will correspond to the Handler
interface. struct WebSocketServer;
Handler
for her: impl Handler for WebSocketServer { // , // Handler : // . // , // mio: type Timeout = usize; type Message = (); }
fn main() { let mut event_loop = EventLoop::new().unwrap(); // Handler: let mut handler = WebSocketServer; // ... : event_loop.run(&mut handler).unwrap(); }
&mut
on the last line. This means that we temporarily transfer “ownership” of a value, associating it with another variable with the possibility of changing (mutation) the data. // "" - owner: let mut owner = value; // : { let mut borrow = owner; // . // : borrow.mutate(); // : owner = borrow; }
// "" - owner: let owner = value; { // : let mut borrow = &mut owner; // , . // : borrow.mutate(); // // . }
&mut
, changeable borrowing, it does not set any limits on reading, only on writing - as long as there are unchangeable borrowings in the field of visibility, the value cannot change and be borrowed through &mut
.cargo run
” command and Cargo will download all necessary dependencies, compile the program (with some warnings that we can ignore for now), and launch it.To interrupt the execution of the program, use the key combination Ctrl + C.
TcpListener
from the mio::tcp
package. The process of creating a server-side TCP socket is fairly straightforward - we bind to a specific address (IP + port number), listen to the socket, and accept connections. We will not depart much from him. use mio::tcp::*; use std::net::SocketAddr; ... let address = "0.0.0.0:10000".parse::<SocketAddr>().unwrap(); let server_socket = TcpListener::bind(&address).unwrap(); event_loop.register(&server_socket, Token(0), EventSet::readable(), PollOpt::edge()).unwrap();
main.rs
module a packet for working with TCP and the SocketAddr
structure describing the socket address — we add these lines to the beginning of the file: use mio::tcp::*; use std::net::SocketAddr;
"0.0.0.0:10000"
into the structure describing the address and assign a socket to this address: let address = "0.0.0.0:10000".parse::<SocketAddr>().unwrap(); server_socket.bind(&address).unwrap();
server_socket.bind
expects an argument of type SockAddr
, we do not need to specify it explicitly and clutter the code — the Rust compiler is able to determine it itself. let server_socket = TcpListener::bind(&address).unwrap();
unwrap
for the result of the function — this is the error handling pattern in Rust, and we will return to this topic soon. event_loop.register(&server_socket, Token(0), EventSet::readable(), PollOpt::edge()).unwrap();
register
call is more complicated - the function takes the following arguments:Token(0)
token with a server socket waiting for a connection.EventSet::readable()
in the case of a server socket only signs us up for one event — the establishment of a connection with a new client.PollOpt::edge()
means that events are triggered by a front (edge-triggered) , and not by a level (level-triggered) .readable()
event) in the event of a level trigger, we get alert if the socket buffer has readable data. In the case of a signal at the front, we will receive an alert at the moment when the socket receives new data — that is, if we did not read the entire contents of the buffer while processing the event, we will not receive new alerts until new ones are received. data. A more detailed description (in English) is in response to the Stack Overflow .cargo run
command. In the terminal, we still will not see anything except the blinking cursor - but if we separately execute the netstat
command, we will see that our socket is waiting for connections to the port number 10,000:$ netstat -ln | grep 10,000 tcp 0 0 127.0.0.1:10000 0.0.0.0:* LISTEN
Connection: Upgrade
and Upgrade: websocket
, and we must respond to this request in a certain way. And that's all - we do not need to write a full-fledged web server with the distribution of files, static content, etc. - for this there are more advanced and suitable tools (for example, the same nginx). use std::collections::HashMap; struct WebSocketServer { socket: TcpListener, clients: HashMap<Token, TcpStream>, token_counter: usize } const SERVER_TOKEN: Token = Token(0); impl Handler for WebSocketServer { type Timeout = usize; type Message = (); fn ready(&mut self, event_loop: &mut EventLoop<WebSocketServer>, token: Token, events: EventSet) { match token { SERVER_TOKEN => { let client_socket = match self.socket.accept() { Err(e) => { println!(" : {}", e); return; }, Ok(None) => panic!(" accept 'None'"), Ok(Some(sock)) => sock }; self.token_counter += 1; let new_token = Token(self.token_counter); self.clients.insert(new_token, client_socket); event_loop.register(&self.clients[&new_token], new_token, EventSet::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); } } } }
WebSocketServer
- it will store the server socket and the sockets of the connected clients. use std::collections::HashMap; struct WebSocketServer { socket: TcpListener, clients: HashMap<Token, TcpStream>, token_counter: usize }
HashMap
data structure from the standard collection library, std::collections
is the standard implementation for hash tables (also known as dictionaries and associative arrays). As a key, we will use tokens that are already familiar to us, which should be unique for each connection.token_counter
variable in the structure.Handler
type from the mio
library comes in handy again: impl Handler for WebSocketServer
ready
. Redefinition is understood to mean that the Handler
type already contains a ready
dummy function and blanks for some other callback functions. The implementation defined in the type does not, of course, do anything useful, so we need to define our own version of the function to handle the events of interest to us: fn ready(&mut self, event_loop: &mut EventLoop<WebSocketServer>, token: Token, events: EventSet)
EventSet
that contains a set of flags with information about the event ( readable in case of notification of the availability of a socket for reading, or writable - for writing, respectively). match token { SERVER_TOKEN => { ... } }
match
resembles the standard switch construction of “traditional” imperative programming languages, but gives much more possibilities. For example, in Java, a construct is switch
limited to a specific set of types and works only for numbers, strings, and enum enums. In Rust, however, match
it allows you to make comparisons for almost any type, including multiple values, structures, etc. In addition to matching, it match
also allows capturing content or parts of samples in a manner similar to regular expressions.Token(0)
— as you remember, it is associated with a listening socket. And to make our intentions more understandable when reading the code, we defined this token as a constant SERVER_TOKEN
: const SERVER_TOKEN: Token = Token(0);
match
in this case is equivalent to this: match { Token(0) => ... }
. let client_socket = match self.socket.accept() { Err(e) => { println!(" : {}", e); return; }, Ok(None) => unreachable!(), Ok(Some(sock)) => sock };
accept()
that returns the client socket in a type “wrapper” Result<Option<TcpStream>>
. Result
This is a special type that is fundamental to error handling in Rust — it is a wrapper around “unspecified” results, such as errors, timeouts (time-out), etc.unwrap()
, which provides standard behavior: interrupting program execution in the event of an error, and “unpacking” the result of executing the function from the containerResult
in the event that everything is in order. Thus, using unwrap()
, we mean that we are only interested in the immediate result, and the situation with the fact that the program will stop its execution in case of an error suits us.accept()
it, it would be unwise to use it unwrap()
, since in the case of an unsuccessful set of circumstances, a call may turn into stopping our server and disconnecting all users. Therefore, we simply output the error to the log and continue with the execution: Err(e) => { println!(" : {}", e); return; },
Option
- similar to the Result
“wrapper”, which determines the presence or absence of any value. The absence of a value is indicated by None
, otherwise, the value takes the form Some(value)
. As you probably guess, this type is comparable to the types of null or None in other languages, only Option
safer due to the fact that all null values ​​are localized and (as well as Result
) require mandatory “unpacking” before use - so you will never see “Famous” mistake NullReferenceException
, if you don’t want it yourself.accept()
by: Ok(None) => unreachable!(),
None
not returned as a result - accept()
it will be returned only if we try to call this function as applied to the client (that is, not listening) socket. And since we are sure that we are dealing with a server-side socket, then we shouldn’t have to do this before executing this piece of code in a normal situation - so we use a special construct unreachable!()
that interrupts the program with an error. let client_socket = match self.socket.accept() { ... Ok(Some(sock)) => sock }
match
is not just an instruction, but an expression (that is, it match
also returns a result), in addition to matching, it also allows you to capture values. Thus, we can use it to assign results to variables — which we do above, unpacking the value from a type Result<Option<TcpStream>>
and assigning it to a variable client_socket
. let new_token = Token(self.token_counter); self.clients.insert(new_token, client_socket); self.token_counter += 1;
event_loop.register(&self.clients[&new_token], new_token, EventSet::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
PollOpt::edge()
, we have added a new option PollOpt::oneshot()
. It instructs to temporarily remove the registration of a socket from a loop when an event is triggered, which is useful for simplifying server code. Without this option, we would need to manually monitor the current state of the socket — can we now write, can we now read, etc. Instead, we will simply register the socket every time, with the set of options and subscriptions we need at the moment. On top of that, this approach is useful for multi-threaded event cycles, but more on that next time.WebSocketServer
complicated, we need to change the server registration code in the event loop. The changes are quite simple and mainly concern the initialization of the new structure: let mut server = WebSocketServer { token_counter: 1, // 1 clients: HashMap::new(), // -, HashMap socket: server_socket // }; event_loop.register(&server.socket, SERVER_TOKEN, EventSet::readable(), PollOpt::edge()).unwrap(); event_loop.run(&mut server).unwrap();
http-muncher
HTTP parsing library , adding it to the dependency list. The library adapts the HTTP parser from Node.js for Rust (which is also a parser in nginx), which allows processing requests in streaming mode, which will be very useful for TCP connections.Cargo.toml
: [dependencies] http-muncher = "0.2.0"
extern crate http_muncher; use http_muncher::{Parser, ParserHandler}; struct HttpParser; impl ParserHandler for HttpParser { } struct WebSocketClient { socket: TcpStream, http_parser: Parser<HttpParser> } impl WebSocketClient { fn read(&mut self) { loop { let mut buf = [0; 2048]; match self.socket.try_read(&mut buf) { Err(e) => { println!(" : {:?}", e); return }, Ok(None) => // . break, Ok(Some(len)) => { self.http_parser.parse(&buf[0..len]); if self.http_parser.is_upgrade() { // ... break; } } } } } fn new(socket: TcpStream) -> WebSocketClient { WebSocketClient { socket: socket, http_parser: Parser::request(HttpParser) } } }
ready
in the structure WebSocketServer
: match token { SERVER_TOKEN => { ... self.clients.insert(new_token, WebSocketClient::new(client_socket)); event_loop.register(&self.clients[&new_token].socket, new_token, EventSet::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); ... }, token => { let mut client = self.clients.get_mut(&token).unwrap(); client.read(); event_loop.reregister(&client.socket, token, EventSet::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); } }
extern crate http_muncher; use http_muncher::{Parser, ParserHandler}; struct HttpParser; impl ParserHandler for HttpParser { }
ParserHandler
that contains some useful callback functions (as well as Handler
from mio
the case of a structure WebSocketServer
). These callbacks are called as soon as the parser has any useful information - HTTP headers, request content, and so on. But now we just need to find out if the client sent a set of special headers to switch the HTTP connection to the WebSocket protocol. In the structure of the parser, there are already necessary functions for this, therefore, we will not redefine callbacks yet, leaving their standard implementations.HttpParser
for every new customer. Given that each client will keep parser status, let's create a new structure that describes an individual client: struct WebSocketClient { socket: TcpStream, http_parser: Parser<HttpParser> }
HashMap<Token, TcpStream>
with HashMap<Token, WebSocketClient>
in the server structure.ready
, the code will quickly turn into “noodles”. So let's add a separate implementation read
in the structure WebSocketClient
: impl WebSocketClient { fn read(&mut self) { ... } }
loop { let mut buf = [0; 2048]; match self.socket.try_read(&mut buf) { ... } }
loop { ... }
), allocate 2 KB of memory for the buffer where we will write data, and try to write incoming data to it.try_read
may fail, so we are matching the sample by type Result
: match self.socket.try_read(&mut buf) { Err(e) => { println!(" : {:?}", e); return }, ... }
match self.socket.try_read(&mut buf) { ... Ok(None) => // . break, ... }
try_read
returns the result Ok(None)
if we have read all the available data from the client. When this happens, we interrupt the endless cycle and continue to wait for new events.try_read
written data to our buffer: match self.socket.try_read(&mut buf) { ... Ok(Some(len)) => { self.http_parser.parse(&buf[0..len]); if self.http_parser.is_upgrade() { // ... break; } } }
Connection: Upgrade
).new
that we need in order to make it more convenient to create instances of the client structure WebSocketClient
: fn new(socket: TcpStream) -> WebSocketClient { WebSocketClient { socket: socket, http_parser: Parser::request(HttpParser) } }
new
we can compare the function specifically with the designer. Here we simply create an instance WebSocketClient
, but it should be understood that we can do it in the same way without the “constructor” function — this is more a matter of convenience, because without using constructor functions, the code can become often repetitive, without any particular need. In the end, the principle of DRY (“do not repeat”) was invented for a reason.return
explicitly - Rust allows you to automatically return the last expression of a function as its result. http_parser: Parser::request(HttpParser)
Parser
using an associative function Parser::request
. As an argument, we pass the created instance of the previously defined structure HttpParser
.ready
we make the following changes in the handler : match token { SERVER_TOKEN => { ... }, token => { let mut client = self.clients.get_mut(&token).unwrap(); client.read(); event_loop.reregister(&client.socket, token, EventSet::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); } }
match
, which handles all other tokens, besides SERVER_TOKEN
- that is, events in client sockets. With the existing token, we can borrow a variable reference to the corresponding instance of the client structure from the hash table: let mut client = self.clients.get_mut(&token).unwrap();
read
we defined above: client.read();
oneshot()
): event_loop.reregister(&client.socket, token, EventSet::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
register
to reregister
, passing all the same parameters. HTTP/1.1 101 Switching Protocols Connection: Upgrade Upgrade: websocket
Sec-WebSocket-Accept
. According to the RFC , you need to do this by following certain rules - we need to get and remember the header sent by the client Sec-WebSocket-Key
, add a certain static string ( "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
) to it, then hash the result with the SHA-1 algorithm, and finally encode it all in base64.Cargo.toml
: [dependencies] ... rustc-serialize = "0.3.15" sha1 = "0.1.1"
rustc-serialize
contains functions for encoding binary data in base64, and sha1
, obviously, for hashing in SHA-1. extern crate sha1; extern crate rustc_serialize; use rustc_serialize::base64::{ToBase64, STANDARD}; fn gen_key(key: &String) -> String { let mut m = sha1::Sha1::new(); let mut buf = [0u8; 20]; m.update(key.as_bytes()); m.update("258EAFA5-E914-47DA-95CA-C5AB0DC85B11".as_bytes()); m.output(&mut buf); return buf.to_base64(STANDARD); }
gen_key
, create a new instance of the SHA-1 hash, add the key sent by the client to it, then add the constant string defined in the RFC, and return the result as a string encoded in base64.Sec-WebSocket-Key
. Let's go back to the HTTP parser from the previous section. As you remember, the type ParserHandler
allows us to redefine callbacks that are called when new headers are received. Now is the time to take advantage of this opportunity - let's improve the implementation of the relevant structure: use std::cell::RefCell; use std::rc::Rc; struct HttpParser { current_key: Option<String>, headers: Rc<RefCell<HashMap<String, String>>> } impl ParserHandler for HttpParser { fn on_header_field(&mut self, s: &[u8]) -> bool { self.current_key = Some(std::str::from_utf8(s).unwrap().to_string()); true } fn on_header_value(&mut self, s: &[u8]) -> bool { self.headers.borrow_mut() .insert(self.current_key.clone().unwrap(), std::str::from_utf8(s).unwrap().to_string()); true } fn on_headers_complete(&mut self) -> bool { false } }
headers
- WebSocketClient
and ParserHandler
.Rc
- this is a wrapper with reference counting (which can be considered a type of garbage collection). Basically, we transfer ownership of the container.Rc
which in turn we can safely share into many owners with the help of black language magic - we just clone the value Rc
using the function clone()
, and the container manages the memory for us.Rc
unchangeable , and due to the limitations of the compiler, we cannot somehow influence it. In fact, this is just a consequence of the Rust rule about the variability of data - you can have as many borrowings as a variable, but you can only change it if the owner is one.Rc
, a compilation error will occur.RefCell
. It solves it due to the mechanism of data internal variability . Simply put, RefCell
it allows us to put aside all the validation rules to the runtime (runtime) - instead of having to check them statically at compile time. Thus, we will need to wrap our headers in two containers at the same time - Rc<RefCell<...>>
(which, of course, looks pretty scary for unprepared minds).HttpParser
: self.headers.borrow_mut() .insert(self.current_key.clone().unwrap(), ...
&mut
, with the difference that all checks for limiting the number of borrowings will be carried out dynamically during program execution, so we, and not the compiler, should be closely watched, otherwise a runtime error may occur.headers
will directly own the structure WebSocketClient
, so let's add new properties to it and write a new constructor function: // RefCell Rc use std::cell::RefCell; use std::rc::Rc; ... struct WebSocketClient { socket: TcpStream, http_parser: Parser<HttpParser>, // WebSocketClient: headers: Rc<RefCell<HashMap<String, String>>> } impl WebSocketClient { fn new(socket: TcpStream) -> WebSocketClient { let headers = Rc::new(RefCell::new(HashMap::new())); WebSocketClient { socket: socket, // : headers: headers.clone(), http_parser: Parser::request(HttpParser { current_key: None, // ... : headers: headers.clone() }) } } ... }
WebSocketClient
have access to parsed headings, and therefore we can find among them the one that is interesting to us - Sec-WebSocket-Key
. Taking into account the fact that we have a client key, the procedure for drafting an answer will not cause any difficulties. We just need to piece together a string and write it to the client socket.EventSet
to EventSet::writable()
when the socket is reregistered. event_loop.reregister(&client.socket, token, EventSet::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
WebSocketClient
: struct WebSocketClient { socket: TcpStream, http_parser: Parser<HttpParser>, headers: Rc<RefCell<HashMap<String, String>>>, // , `interest`: interest: EventSet }
event_loop.reregister(&client.socket, token, client.interest, // `EventSet` PollOpt::edge() | PollOpt::oneshot()).unwrap();
interest
in the right places. To simplify this process, let's formalize it using connection states: #[derive(PartialEq)] enum ClientState { AwaitingHandshake, HandshakeResponse, Connected }
AwaitingHandshake
,, means that we expect a new client to connect using HTTP. HandshakeResponse
will mean a state when we respond to the client via HTTP. And finally, the Connected
state when we have successfully established a connection with the client and communicate with it using the WebSocket protocol. struct WebSocketClient { socket: TcpStream, http_parser: Parser<HttpParser>, headers: Rc<RefCell<HashMap<String, String>>>, interest: EventSet, // : state: ClientState }
impl WebSocketClient { fn new(socket: TcpStream) -> WebSocketClient { let headers = Rc::new(RefCell::new(HashMap::new())); WebSocketClient { socket: socket, ... // Initial events that interest us interest: EventSet::readable(), // Initial state state: ClientState::AwaitingHandshake } } }
read
. Remember these lines? match self.socket.try_read(&mut buf) { ... Ok(Some(len)) => { if self.http_parser.is_upgrade() { // ... break; } } }
is_upgrade()
the code for changing the connection state: if self.http_parser.is_upgrade() { // HandshakeResponse self.state = ClientState::HandshakeResponse; // Writable // (.. ): self.interest.remove(EventSet::readable()); self.interest.insert(EventSet::writable()); break; }
Writable
, we will add the code necessary to send a response to establish the connection.ready
in the implementation of the structure WebSocketServer
. The procedure of writing the response to the socket is simple (and practically does not differ from the reading procedure), and we only need to separate one type of event from the other: fn ready(&mut self, event_loop: &mut EventLoop<WebSocketServer>, token: Token, events: EventSet) { // ? if events.is_readable() { // Move all read handling code here match token { SERVER_TOKEN => { ... }, ... } ... } // : if events.is_writable() { let mut client = self.clients.get_mut(&token).unwrap(); client.write(); event_loop.reregister(&client.socket, token, client.interest, PollOpt::edge() | PollOpt::oneshot()).unwrap(); } }
use std::fmt; ... impl WebSocketClient { fn write(&mut self) { // - Rc<RefCell<...>>: let headers = self.headers.borrow(); // : let response_key = gen_key(&headers.get("Sec-WebSocket-Key").unwrap()); // . // (printf , format Python, ..), // Rust - // , "" // . . let response = fmt::format(format_args!("HTTP/1.1 101 Switching Protocols\r\n\ Connection: Upgrade\r\n\ Sec-WebSocket-Accept: {}\r\n\ Upgrade: websocket\r\n\r\n", response_key)); // : self.socket.try_write(response.as_bytes()).unwrap(); // : self.state = ClientState::Connected; // `readable()` ( ): self.interest.remove(EventSet::writable()); self.interest.insert(EventSet::readable()); } }
ws = new WebSocket('ws://127.0.0.1:10000'); if (ws.readyState == WebSocket.OPEN) { console.log('Connection is successful'); }
unique_ptr
and shared_ptr
from C ++.malloc()
. Instead, local variables are allocated on the stack and preallocated memory is used.malloc()
and free()
have the same problem due to memory fragmentation .pthread_create(3)
tells you about 2 MB on a 32-bit Linux system.Source: https://habr.com/ru/post/268609/
All Articles