And, perhaps, the most difficult problem here is safe work with memory. It is the incorrect work with memory that causes many bugs: buffer overflow , memory leaks , double memory freeings , hanging references , pointer dereferencing to already freed memory, etc. And such errors sometimes entail serious problems in security - for example, the cause of the not-long-ago sensational bug in OpenSSL, Heartbleed , is nothing more than a careless handling of memory. And this is only the tip of the iceberg - no one knows how many similar gaps lie in the software that we use every day.let foo = vec![1, 2, 3]; // (), 1, 2, 3, // `foo`. let bar = foo; // `bar`. // `foo`, // "" - .., . { and } ).
To work effectively with I / O, we will use multiplexing system APIs based on the event loop . The Linux kernel has an epoll mechanism [7] for this, and kqueue [8] in FreeBSD and OS X.The event loop in Node.js and the EventMachine gems in Ruby work in exactly the same way.
The same is true for the nginx web server, which uses exclusively asynchronous I / O [9] .
Further text implies that you already have Rust installed. If not, then follow the documentation on the official site .
cargo that performs functions similar to Maven, Composer, npm, or rake - it manages dependencies of our application, builds a project, runs tests, and most importantly, it simplifies the process of creating a new project. cargo new chat --bin --bin indicates to Cargo that it is necessary to create a running application, and not a library. Cargo.toml src/main.rs Cargo.toml contains a description and links to project dependencies (similar to package.json in JavaScript).src/main.rs is the main source file and entry point to our program.cargo run . The same command displays errors in the code, if any.If you are a happy Emacs user, you will be happy to know that it is compatible with Cargo “out of the box” - just install therust-modepackage from the MELPA repository and configure the compile command to start thecargo build.
mio - there are many useful changes in the development version 0.4, which are also incompatible with the old versions. Therefore, we connect it directly through GitHub, adding such lines to Cargo.toml : [dependencies.mio] git = "https://github.com/carllerche/mio" main.rs : extern crate mio; use mio::*; mio is easy. First of all, let's create a loop of events by calling the EventLoop::new() function. From the empty cycle, however, there is no use, so let's immediately add event handling to our chat, defining a structure with functions that will correspond to the Handler interface. struct WebSocketServer; Handler for her: impl Handler for WebSocketServer { // , // Handler : // . // , // mio: type Timeout = usize; type Message = (); } fn main() { let mut event_loop = EventLoop::new().unwrap(); // Handler: let mut handler = WebSocketServer; // ... : event_loop.run(&mut handler).unwrap(); } &mut on the last line. This means that we temporarily transfer “ownership” of a value, associating it with another variable with the possibility of changing (mutation) the data.
// "" - owner: let mut owner = value; // : { let mut borrow = owner; // . // : borrow.mutate(); // : owner = borrow; } // "" - owner: let owner = value; { // : let mut borrow = &mut owner; // , . // : borrow.mutate(); // // . } &mut , changeable borrowing, it does not set any limits on reading, only on writing - as long as there are unchangeable borrowings in the field of visibility, the value cannot change and be borrowed through &mut .cargo run ” command and Cargo will download all necessary dependencies, compile the program (with some warnings that we can ignore for now), and launch it.To interrupt the execution of the program, use the key combination Ctrl + C.
TcpListener from the mio::tcp package. The process of creating a server-side TCP socket is fairly straightforward - we bind to a specific address (IP + port number), listen to the socket, and accept connections. We will not depart much from him. use mio::tcp::*; use std::net::SocketAddr; ... let address = "0.0.0.0:10000".parse::<SocketAddr>().unwrap(); let server_socket = TcpListener::bind(&address).unwrap(); event_loop.register(&server_socket, Token(0), EventSet::readable(), PollOpt::edge()).unwrap(); main.rs module a packet for working with TCP and the SocketAddr structure describing the socket address — we add these lines to the beginning of the file: use mio::tcp::*; use std::net::SocketAddr; "0.0.0.0:10000" into the structure describing the address and assign a socket to this address: let address = "0.0.0.0:10000".parse::<SocketAddr>().unwrap(); server_socket.bind(&address).unwrap(); server_socket.bind expects an argument of type SockAddr , we do not need to specify it explicitly and clutter the code — the Rust compiler is able to determine it itself. let server_socket = TcpListener::bind(&address).unwrap(); unwrap for the result of the function — this is the error handling pattern in Rust, and we will return to this topic soon. event_loop.register(&server_socket, Token(0), EventSet::readable(), PollOpt::edge()).unwrap(); register call is more complicated - the function takes the following arguments:Token(0) token with a server socket waiting for a connection.EventSet::readable() in the case of a server socket only signs us up for one event — the establishment of a connection with a new client.PollOpt::edge() means that events are triggered by a front (edge-triggered) , and not by a level (level-triggered) .readable() event) in the event of a level trigger, we get alert if the socket buffer has readable data. In the case of a signal at the front, we will receive an alert at the moment when the socket receives new data — that is, if we did not read the entire contents of the buffer while processing the event, we will not receive new alerts until new ones are received. data. A more detailed description (in English) is in response to the Stack Overflow .cargo run command. In the terminal, we still will not see anything except the blinking cursor - but if we separately execute the netstat command, we will see that our socket is waiting for connections to the port number 10,000:$ netstat -ln | grep 10,000 tcp 0 0 127.0.0.1:10000 0.0.0.0:* LISTEN
Connection: Upgrade and Upgrade: websocket , and we must respond to this request in a certain way. And that's all - we do not need to write a full-fledged web server with the distribution of files, static content, etc. - for this there are more advanced and suitable tools (for example, the same nginx).
use std::collections::HashMap; struct WebSocketServer { socket: TcpListener, clients: HashMap<Token, TcpStream>, token_counter: usize } const SERVER_TOKEN: Token = Token(0); impl Handler for WebSocketServer { type Timeout = usize; type Message = (); fn ready(&mut self, event_loop: &mut EventLoop<WebSocketServer>, token: Token, events: EventSet) { match token { SERVER_TOKEN => { let client_socket = match self.socket.accept() { Err(e) => { println!(" : {}", e); return; }, Ok(None) => panic!(" accept 'None'"), Ok(Some(sock)) => sock }; self.token_counter += 1; let new_token = Token(self.token_counter); self.clients.insert(new_token, client_socket); event_loop.register(&self.clients[&new_token], new_token, EventSet::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); } } } } WebSocketServer - it will store the server socket and the sockets of the connected clients. use std::collections::HashMap; struct WebSocketServer { socket: TcpListener, clients: HashMap<Token, TcpStream>, token_counter: usize } HashMap data structure from the standard collection library, std::collections is the standard implementation for hash tables (also known as dictionaries and associative arrays). As a key, we will use tokens that are already familiar to us, which should be unique for each connection.token_counter variable in the structure.Handler type from the mio library comes in handy again: impl Handler for WebSocketServer ready . Redefinition is understood to mean that the Handler type already contains a ready dummy function and blanks for some other callback functions. The implementation defined in the type does not, of course, do anything useful, so we need to define our own version of the function to handle the events of interest to us: fn ready(&mut self, event_loop: &mut EventLoop<WebSocketServer>, token: Token, events: EventSet) EventSetthat contains a set of flags with information about the event ( readable in case of notification of the availability of a socket for reading, or writable - for writing, respectively). match token { SERVER_TOKEN => { ... } } matchresembles the standard switch construction of “traditional” imperative programming languages, but gives much more possibilities. For example, in Java, a construct is switchlimited to a specific set of types and works only for numbers, strings, and enum enums. In Rust, however, matchit allows you to make comparisons for almost any type, including multiple values, structures, etc. In addition to matching, it matchalso allows capturing content or parts of samples in a manner similar to regular expressions.Token(0)— as you remember, it is associated with a listening socket. And to make our intentions more understandable when reading the code, we defined this token as a constant SERVER_TOKEN : const SERVER_TOKEN: Token = Token(0); matchin this case is equivalent to this: match { Token(0) => ... }. let client_socket = match self.socket.accept() { Err(e) => { println!(" : {}", e); return; }, Ok(None) => unreachable!(), Ok(Some(sock)) => sock }; accept()that returns the client socket in a type “wrapper” Result<Option<TcpStream>>. ResultThis is a special type that is fundamental to error handling in Rust — it is a wrapper around “unspecified” results, such as errors, timeouts (time-out), etc.unwrap(), which provides standard behavior: interrupting program execution in the event of an error, and “unpacking” the result of executing the function from the containerResultin the event that everything is in order. Thus, using unwrap(), we mean that we are only interested in the immediate result, and the situation with the fact that the program will stop its execution in case of an error suits us.accept()it, it would be unwise to use it unwrap(), since in the case of an unsuccessful set of circumstances, a call may turn into stopping our server and disconnecting all users. Therefore, we simply output the error to the log and continue with the execution: Err(e) => { println!(" : {}", e); return; }, Option- similar to the Result“wrapper”, which determines the presence or absence of any value. The absence of a value is indicated by None, otherwise, the value takes the form Some(value). As you probably guess, this type is comparable to the types of null or None in other languages, only Optionsafer due to the fact that all null values ​​are localized and (as well as Result) require mandatory “unpacking” before use - so you will never see “Famous” mistake NullReferenceException, if you don’t want it yourself.accept()by: Ok(None) => unreachable!(), Nonenot returned as a result - accept()it will be returned only if we try to call this function as applied to the client (that is, not listening) socket. And since we are sure that we are dealing with a server-side socket, then we shouldn’t have to do this before executing this piece of code in a normal situation - so we use a special construct unreachable!()that interrupts the program with an error. let client_socket = match self.socket.accept() { ... Ok(Some(sock)) => sock } matchis not just an instruction, but an expression (that is, it matchalso returns a result), in addition to matching, it also allows you to capture values. Thus, we can use it to assign results to variables — which we do above, unpacking the value from a type Result<Option<TcpStream>>and assigning it to a variable client_socket. let new_token = Token(self.token_counter); self.clients.insert(new_token, client_socket); self.token_counter += 1; event_loop.register(&self.clients[&new_token], new_token, EventSet::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); PollOpt::edge(), we have added a new option PollOpt::oneshot(). It instructs to temporarily remove the registration of a socket from a loop when an event is triggered, which is useful for simplifying server code. Without this option, we would need to manually monitor the current state of the socket — can we now write, can we now read, etc. Instead, we will simply register the socket every time, with the set of options and subscriptions we need at the moment. On top of that, this approach is useful for multi-threaded event cycles, but more on that next time.WebSocketServercomplicated, we need to change the server registration code in the event loop. The changes are quite simple and mainly concern the initialization of the new structure: let mut server = WebSocketServer { token_counter: 1, // 1 clients: HashMap::new(), // -, HashMap socket: server_socket // }; event_loop.register(&server.socket, SERVER_TOKEN, EventSet::readable(), PollOpt::edge()).unwrap(); event_loop.run(&mut server).unwrap(); http-muncherHTTP parsing library , adding it to the dependency list. The library adapts the HTTP parser from Node.js for Rust (which is also a parser in nginx), which allows processing requests in streaming mode, which will be very useful for TCP connections.Cargo.toml: [dependencies] http-muncher = "0.2.0" extern crate http_muncher; use http_muncher::{Parser, ParserHandler}; struct HttpParser; impl ParserHandler for HttpParser { } struct WebSocketClient { socket: TcpStream, http_parser: Parser<HttpParser> } impl WebSocketClient { fn read(&mut self) { loop { let mut buf = [0; 2048]; match self.socket.try_read(&mut buf) { Err(e) => { println!(" : {:?}", e); return }, Ok(None) => // . break, Ok(Some(len)) => { self.http_parser.parse(&buf[0..len]); if self.http_parser.is_upgrade() { // ... break; } } } } } fn new(socket: TcpStream) -> WebSocketClient { WebSocketClient { socket: socket, http_parser: Parser::request(HttpParser) } } } readyin the structure WebSocketServer: match token { SERVER_TOKEN => { ... self.clients.insert(new_token, WebSocketClient::new(client_socket)); event_loop.register(&self.clients[&new_token].socket, new_token, EventSet::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); ... }, token => { let mut client = self.clients.get_mut(&token).unwrap(); client.read(); event_loop.reregister(&client.socket, token, EventSet::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); } } extern crate http_muncher; use http_muncher::{Parser, ParserHandler}; struct HttpParser; impl ParserHandler for HttpParser { } ParserHandlerthat contains some useful callback functions (as well as Handlerfrom miothe case of a structure WebSocketServer). These callbacks are called as soon as the parser has any useful information - HTTP headers, request content, and so on. But now we just need to find out if the client sent a set of special headers to switch the HTTP connection to the WebSocket protocol. In the structure of the parser, there are already necessary functions for this, therefore, we will not redefine callbacks yet, leaving their standard implementations.HttpParserfor every new customer. Given that each client will keep parser status, let's create a new structure that describes an individual client: struct WebSocketClient { socket: TcpStream, http_parser: Parser<HttpParser> } HashMap<Token, TcpStream>with HashMap<Token, WebSocketClient>in the server structure.ready, the code will quickly turn into “noodles”. So let's add a separate implementation readin the structure WebSocketClient: impl WebSocketClient { fn read(&mut self) { ... } } loop { let mut buf = [0; 2048]; match self.socket.try_read(&mut buf) { ... } } loop { ... }), allocate 2 KB of memory for the buffer where we will write data, and try to write incoming data to it.try_readmay fail, so we are matching the sample by type Result: match self.socket.try_read(&mut buf) { Err(e) => { println!(" : {:?}", e); return }, ... } match self.socket.try_read(&mut buf) { ... Ok(None) => // . break, ... } try_readreturns the result Ok(None)if we have read all the available data from the client. When this happens, we interrupt the endless cycle and continue to wait for new events.try_readwritten data to our buffer: match self.socket.try_read(&mut buf) { ... Ok(Some(len)) => { self.http_parser.parse(&buf[0..len]); if self.http_parser.is_upgrade() { // ... break; } } } Connection: Upgrade).newthat we need in order to make it more convenient to create instances of the client structure WebSocketClient: fn new(socket: TcpStream) -> WebSocketClient { WebSocketClient { socket: socket, http_parser: Parser::request(HttpParser) } } newwe can compare the function specifically with the designer. Here we simply create an instance WebSocketClient, but it should be understood that we can do it in the same way without the “constructor” function — this is more a matter of convenience, because without using constructor functions, the code can become often repetitive, without any particular need. In the end, the principle of DRY (“do not repeat”) was invented for a reason.returnexplicitly - Rust allows you to automatically return the last expression of a function as its result. http_parser: Parser::request(HttpParser) Parserusing an associative function Parser::request. As an argument, we pass the created instance of the previously defined structure HttpParser.readywe make the following changes in the handler : match token { SERVER_TOKEN => { ... }, token => { let mut client = self.clients.get_mut(&token).unwrap(); client.read(); event_loop.reregister(&client.socket, token, EventSet::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); } } match, which handles all other tokens, besides SERVER_TOKEN- that is, events in client sockets. With the existing token, we can borrow a variable reference to the corresponding instance of the client structure from the hash table: let mut client = self.clients.get_mut(&token).unwrap(); readwe defined above: client.read(); oneshot()): event_loop.reregister(&client.socket, token, EventSet::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); registerto reregister, passing all the same parameters. HTTP/1.1 101 Switching Protocols Connection: Upgrade Upgrade: websocket Sec-WebSocket-Accept. According to the RFC , you need to do this by following certain rules - we need to get and remember the header sent by the client Sec-WebSocket-Key, add a certain static string ( "258EAFA5-E914-47DA-95CA-C5AB0DC85B11") to it, then hash the result with the SHA-1 algorithm, and finally encode it all in base64.Cargo.toml: [dependencies] ... rustc-serialize = "0.3.15" sha1 = "0.1.1" rustc-serializecontains functions for encoding binary data in base64, and sha1, obviously, for hashing in SHA-1. extern crate sha1; extern crate rustc_serialize; use rustc_serialize::base64::{ToBase64, STANDARD}; fn gen_key(key: &String) -> String { let mut m = sha1::Sha1::new(); let mut buf = [0u8; 20]; m.update(key.as_bytes()); m.update("258EAFA5-E914-47DA-95CA-C5AB0DC85B11".as_bytes()); m.output(&mut buf); return buf.to_base64(STANDARD); } gen_key, create a new instance of the SHA-1 hash, add the key sent by the client to it, then add the constant string defined in the RFC, and return the result as a string encoded in base64.Sec-WebSocket-Key. Let's go back to the HTTP parser from the previous section. As you remember, the type ParserHandlerallows us to redefine callbacks that are called when new headers are received. Now is the time to take advantage of this opportunity - let's improve the implementation of the relevant structure: use std::cell::RefCell; use std::rc::Rc; struct HttpParser { current_key: Option<String>, headers: Rc<RefCell<HashMap<String, String>>> } impl ParserHandler for HttpParser { fn on_header_field(&mut self, s: &[u8]) -> bool { self.current_key = Some(std::str::from_utf8(s).unwrap().to_string()); true } fn on_header_value(&mut self, s: &[u8]) -> bool { self.headers.borrow_mut() .insert(self.current_key.clone().unwrap(), std::str::from_utf8(s).unwrap().to_string()); true } fn on_headers_complete(&mut self) -> bool { false } } headers- WebSocketClientand ParserHandler.
To resolve this contradiction in Rust there is a special type Rc- this is a wrapper with reference counting (which can be considered a type of garbage collection). Basically, we transfer ownership of the container.Rcwhich in turn we can safely share into many owners with the help of black language magic - we just clone the value Rcusing the function clone(), and the container manages the memory for us.Rc unchangeable , and due to the limitations of the compiler, we cannot somehow influence it. In fact, this is just a consequence of the Rust rule about the variability of data - you can have as many borrowings as a variable, but you can only change it if the owner is one.Rc, a compilation error will occur.RefCell. It solves it due to the mechanism of data internal variability . Simply put, RefCellit allows us to put aside all the validation rules to the runtime (runtime) - instead of having to check them statically at compile time. Thus, we will need to wrap our headers in two containers at the same time - Rc<RefCell<...>>(which, of course, looks pretty scary for unprepared minds).HttpParser: self.headers.borrow_mut() .insert(self.current_key.clone().unwrap(), ... &mut, with the difference that all checks for limiting the number of borrowings will be carried out dynamically during program execution, so we, and not the compiler, should be closely watched, otherwise a runtime error may occur.headerswill directly own the structure WebSocketClient, so let's add new properties to it and write a new constructor function: // RefCell Rc use std::cell::RefCell; use std::rc::Rc; ... struct WebSocketClient { socket: TcpStream, http_parser: Parser<HttpParser>, // WebSocketClient: headers: Rc<RefCell<HashMap<String, String>>> } impl WebSocketClient { fn new(socket: TcpStream) -> WebSocketClient { let headers = Rc::new(RefCell::new(HashMap::new())); WebSocketClient { socket: socket, // : headers: headers.clone(), http_parser: Parser::request(HttpParser { current_key: None, // ... : headers: headers.clone() }) } } ... } WebSocketClienthave access to parsed headings, and therefore we can find among them the one that is interesting to us - Sec-WebSocket-Key. Taking into account the fact that we have a client key, the procedure for drafting an answer will not cause any difficulties. We just need to piece together a string and write it to the client socket.EventSetto EventSet::writable()when the socket is reregistered. event_loop.reregister(&client.socket, token, EventSet::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); WebSocketClient: struct WebSocketClient { socket: TcpStream, http_parser: Parser<HttpParser>, headers: Rc<RefCell<HashMap<String, String>>>, // , `interest`: interest: EventSet } event_loop.reregister(&client.socket, token, client.interest, // `EventSet` PollOpt::edge() | PollOpt::oneshot()).unwrap(); interestin the right places. To simplify this process, let's formalize it using connection states: #[derive(PartialEq)] enum ClientState { AwaitingHandshake, HandshakeResponse, Connected } AwaitingHandshake,, means that we expect a new client to connect using HTTP. HandshakeResponsewill mean a state when we respond to the client via HTTP. And finally, the Connectedstate when we have successfully established a connection with the client and communicate with it using the WebSocket protocol. struct WebSocketClient { socket: TcpStream, http_parser: Parser<HttpParser>, headers: Rc<RefCell<HashMap<String, String>>>, interest: EventSet, // : state: ClientState } impl WebSocketClient { fn new(socket: TcpStream) -> WebSocketClient { let headers = Rc::new(RefCell::new(HashMap::new())); WebSocketClient { socket: socket, ... // Initial events that interest us interest: EventSet::readable(), // Initial state state: ClientState::AwaitingHandshake } } } read. Remember these lines? match self.socket.try_read(&mut buf) { ... Ok(Some(len)) => { if self.http_parser.is_upgrade() { // ... break; } } } is_upgrade()the code for changing the connection state: if self.http_parser.is_upgrade() { // HandshakeResponse self.state = ClientState::HandshakeResponse; // Writable // (.. ): self.interest.remove(EventSet::readable()); self.interest.insert(EventSet::writable()); break; } Writable, we will add the code necessary to send a response to establish the connection.readyin the implementation of the structure WebSocketServer. The procedure of writing the response to the socket is simple (and practically does not differ from the reading procedure), and we only need to separate one type of event from the other: fn ready(&mut self, event_loop: &mut EventLoop<WebSocketServer>, token: Token, events: EventSet) { // ? if events.is_readable() { // Move all read handling code here match token { SERVER_TOKEN => { ... }, ... } ... } // : if events.is_writable() { let mut client = self.clients.get_mut(&token).unwrap(); client.write(); event_loop.reregister(&client.socket, token, client.interest, PollOpt::edge() | PollOpt::oneshot()).unwrap(); } } use std::fmt; ... impl WebSocketClient { fn write(&mut self) { // - Rc<RefCell<...>>: let headers = self.headers.borrow(); // : let response_key = gen_key(&headers.get("Sec-WebSocket-Key").unwrap()); // . // (printf , format Python, ..), // Rust - // , "" // . . let response = fmt::format(format_args!("HTTP/1.1 101 Switching Protocols\r\n\ Connection: Upgrade\r\n\ Sec-WebSocket-Accept: {}\r\n\ Upgrade: websocket\r\n\r\n", response_key)); // : self.socket.try_write(response.as_bytes()).unwrap(); // : self.state = ClientState::Connected; // `readable()` ( ): self.interest.remove(EventSet::writable()); self.interest.insert(EventSet::readable()); } } ws = new WebSocket('ws://127.0.0.1:10000'); if (ws.readyState == WebSocket.OPEN) { console.log('Connection is successful'); } 
unique_ptrand shared_ptrfrom C ++.malloc(). Instead, local variables are allocated on the stack and preallocated memory is used.malloc()and free()have the same problem due to memory fragmentation .pthread_create(3)tells you about 2 MB on a 32-bit Linux system.Source: https://habr.com/ru/post/268609/
All Articles