This document will help you learn about the container for the programming language Rust - futures
, which provides the implementation of futures and flows with zero cost. Futures are available in many other programming languages, such as C++
, Java
, and Scala
, and the futures
container draws inspiration from the libraries of these languages. However, it is ergonomic, and also adheres to the zero-cost abstraction philosophy inherent in Rust, namely, to create and compose futures no memory allocations are required, and for Task
, which controls them, only one allocation is needed. Futures should be the basis of asynchronous composable high-performance I / O in Rust, and early performance measurements show that a simple HTTP server built on futures is really fast.
This publication is a translation of the official futures-rs tutorial .
This documentation is divided into several sections:
Stream
;Stream
);Task
and future;Container futures
requires Rust version 1.10.0 or higher, which can be easily installed using Rustup
. The container is tested and works fine on Windows, macOS and Linux, but PRs for other platforms are always welcome. You can add futures
to your project's Cargo.toml
as follows:
[dependencies] futures = { git = "https://github.com/alexcrichton/futures-rs" } tokio-core = { git = "https://github.com/tokio-rs/tokio-core" } tokio-tls = { git = "https://github.com/tokio-rs/tokio-tls" }
Note: this library is in active development and requires getting source code from git directly, but later the container
will be published on crates.io.
Here we add three containers depending:
Future
and Stream
;mio
container, providing specificFuture
and Stream
implementations for TCP and UDP;The futures container is a low-level implementation of futures that does not carry any runtime or I / O layer. For the examples below, we will use the specific implementations available in tokio-core to show how futures and streams can be used to perform complex I / O operations with zero overhead.
Now that we have everything we need, we will write the first program. As a hello-world example, download the homepage.
Rust page:
extern crate futures; extern crate tokio_core; extern crate tokio_tls; use std::net::ToSocketAddrs; use futures::Future; use tokio_core::reactor::Core; use tokio_core::net::TcpStream; use tokio_tls::ClientContext; fn main() { let mut core = Core::new().unwrap(); let addr = "www.Rust-lang.org:443".to_socket_addrs().unwrap().next().unwrap(); let socket = TcpStream::connect(&addr, &core.handle()); let tls_handshake = socket.and_then(|socket| { let cx = ClientContext::new().unwrap(); cx.handshake("www.Rust-lang.org", socket) }); let request = tls_handshake.and_then(|socket| { tokio_core::io::write_all(socket, "\ GET / HTTP/1.0\r\n\ Host: www.Rust-lang.org\r\n\ \r\n\ ".as_bytes()) }); let response = request.and_then(|(socket, _)| { tokio_core::io::read_to_end(socket, Vec::new()) }); let (_, data) = core.run(response).unwrap(); println!("{}", String::from_utf8_lossy(&data)); }
If you create a file with such content along the path src/main.rs
and run the cargo run
command, the HTML of the Rust home page is displayed.
Note: Rustc 1.10 compiles this example slowly. From 1.11 compilation is faster.
This code is too big to figure it out right away, so let's go through line by line.
Take a look at the main()
function:
let mut core = Core::new().unwrap(); let addr = "www.Rust-lang.org:443".to_socket_addrs().unwrap().next().unwrap();
This creates a cycle of events in which all I / O will be executed. Then we convert the host name "www.Rust-lang.org" using the to_socket_addrs
method from the standard library.
Further:
let socket = TcpStream::connect(&addr, &core.handle());
We get the event loop handle and connect to the host using TcpStream :: connect . Notably, TcpStream :: connect returns the future. In fact, the socket is not connected, but the connection will occur later.
After the socket becomes available, we need to follow three steps to load the Rust-lang.org home page:
Run TLS handshake. You can work with this homepage only via HTTPS, so we have to connect to port 443 and follow the TLS protocol.
Send HTTP GET
request. As part of this guide, we will write a request manually, however, in combat programs you should use an HTTP client built on futures
.
Consider each of these steps in detail.
First step:
let tls_handshake = socket.and_then(|socket| { let cx = ClientContext::new().unwrap(); cx.handshake("www.Rust-lang.org", socket) });
Here, the and_then type of the future is used, calling it on the result of executing the TcpStream :: connect method. The and_then method accepts a closure that gets the value of the previous future. In this case, the socket
will be of type TcpStream .
It is worth noting that the closure passed to and_then will not be executed if TcpStream :: connect returns an error.
Once the socket
received, we create the client TLS context using the ClientContext :: new . This type from the tokio-tls
represents the client part of the TLS connection. Next, call the handshake method to perform a TLS handshake . The first argument is the domain name to which we are connecting, the second is the input / output object (in this case, the socket
object).
Like TcpStream :: connect earlier, the handshake method returns the future. TLS handshake can take some time, because the client and server need to do some I / O, confirmation of certificates, etc. After completing the future, it will return TlsStream , similar to the one TcpStream examined above.
The combinator and_then do a lot of hidden work, ensuring that futures are executed in the correct order and tracked on the fly. In this case, the value returned by and_then realizes the Future type, so we can make chains of calculations.
Next, send an HTTP request:
let request = tls_handshake.and_then(|socket| { tokio_core::io::write_all(socket, "\ GET / HTTP/1.0\r\n\ Host: www.Rust-lang.org\r\n\ \r\n\ ".as_bytes()) });
Here we got the future from the previous step ( tls_handshake
) and used and_then again to continue the calculation. The write_all combinator completely writes an HTTP request, producing multiple entries as necessary.
The future returned by the write_all method will be executed as soon as all data has been written to the socket. It is noteworthy that TlsStream covertly encrypts all the data we recorded before sending it to the socket.
The third and last part of the query looks like this:
let response = request.and_then(|(socket, _)| { tokio_core::io::read_to_end(socket, Vec::new()) });
The previous future request
again linked, this time with the result of the read_to_end combinator. This future will read all the data from the socket and put it in the provided buffer and return the buffer when the connection to be processed sends EOF.
As before, reading from the socket is in fact hidden decrypts data received from the server, so we read the decrypted version.
If the execution is interrupted at this place, you will be surprised, since nothing will happen. This is because everything we did was based on future calculations, and we actually didn’t run them. Up to this point, we have not done any I / O and did not perform HTTP requests, etc.
To truly run futures and manage them until completion, you need to start the event loop:
let (_, data) = core.run(response).unwrap(); println!("{}", String::from_utf8_lossy(&data));
Here, the future response
is placed in the event loop, requesting it to fulfill the future . The event loop will run until a result is received.
Notably, calling core.run(..)
blocks the calling thread until the future can be returned. This means that data
is of type Vec<u8>
. Then we can type it in stdout as usual.
Whew! We looked at futures that initialize TCP connections , create chains of computations and read data from a socket . But this is only an example of the possibilities of futures, then we consider the nuances.
The future is the core of the container futures
. This type represents asynchronous calculations and their result.
Take a look at the following code:
trait Future { type Item; type Error; fn poll(&mut self) -> Poll<Self::Item, Self::Error>; // ... }
I am sure that the definition contains a number of points causing questions:
Item
and Error
;poll
;We analyze them in detail.
Item
and Error
type Item; type Error;
The first feature of the future type, as you probably noticed, is that it contains two associated types. They are the types of values that the future can get. Each Future
instance can be processed as Result<Self::Item, Self::Error>
.
These two types will be used very often in where
conditions when passing futures and in type signatures when futures will be returned.
For example, when returning future, you can write:
fn foo() -> Box<Future<Item = u32, Error = io::Error>> { // ... }
Or, when we accept the future:
fn foo<F>(future: F) where F: Future<Error = io::Error>, F::Item: Clone, { // ... }
poll
fn poll(&mut self) -> Poll<Self::Item, Self::Error>;
Work type Future built on this method. The poll method is the only entry point for retrieving a value calculated in the future. As a future user, you rarely need to call this method directly. Most likely, you will interact with futures through combinators, which create high-level abstractions around futures. However, knowing how futures work under the hood will be useful.
Consider the poll method in more detail.
Pay attention to the argument &mut self
, which causes a number of restrictions and properties:
poll
method, futures can change their state;poll
ownership of futures can be transferred to another entity.In fact, the type Poll is a pseudonym:
type Poll<T, E> = Result<Async<T>, E>;
Just take a look at what the Async enumeration is:
pub enum Async<T> { Ready(T), NotReady, }
Through this enumeration, futures can interact when the future value is ready for use. If an error occurs, then Err
will be returned immediately. Otherwise, the Async enumeration displays when the value of the Future is fully received or is not yet ready.
The Future type, like Iterator
, does not define what happens after calling the poll method, if the future has already been processed. This means that those who implement the Future type do not need to maintain state in order to check whether the poll method successfully returned the result.
If the poll call returns NotReady
, the future still needs to know when to execute again. To achieve this goal, the future must provide the following mechanism: when receiving NotReady
current task must be able to receive a notification when the value becomes available.
The park method is the main entry point for notification delivery. This function returns a Task , which implements the Send
and 'static
types, and has the main method, unpark . Calling the unpark method indicates that the future can perform calculations and return a value.
More detailed documentation can be found here .
Now it seems that the poll method can add a little pain to your workflow. What if you have a future that has to return a String
, and you want to convert it into a future, returning u32
? For such compositions, the future provides a large number of combinators .
These combinators are similar to combinators from the Iterator type, and they all accept the future and return the new future.
For example, we could write:
fn parse<F>(future: F) -> Box<Future<Item=u32, Error=F::Error>> where F: Future<Item=String> + 'static, { Box::new(future.map(|string| { string.parse::<u32>().unwrap() })) }
Here, for a future transformation that returns a String
type, in the future that returns a u32
, a map is used. Packing in Box is not always necessary and will be discussed in more detail in the futures returns section.
Combinators allow you to express the following concepts:
poll
after calculations ( fuse ).The use of combinators is similar to using the type of Iterator
in Rust or futures
in Scala. Most futures manipulations end up using these combinators. All combinators have zero cost, which means there is no memory allocation, and that the implementation will be optimized as if you wrote it manually.
Stream
Previously, we considered the type of Future , which is useful in the case of calculating just one value for the entire time. But sometimes calculations are better presented as a stream of values. For example, a TCP listener produces many TCP connections during its lifetime. Let's see which entities from the standard library are equivalent to Future and Stream :
# items | Sync | Async | Common operations |
---|---|---|---|
one | [Result] | [Future] | [map], [and_then] |
∞ | [Iterator] | [Stream] | [map] [stream-map], [fold], [collect] |
Let's take a look at the Stream type:
trait Stream { type Item; type Error; fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>; }
You may have noticed that the Stream type is very similar to the Future type. The main difference is that the poll method returns Option<Self::Item>
, not Self::Item
.
Stream produces many optional values over time, signaling the end of the stream by returning Ready(None)
. At its core, Stream is an asynchronous stream that produces values in a specific order.
In fact, Stream is a special copy of the Future type, and it can be converted into the future using the into_future method.
The returned future gets the next value from the stream plus the stream itself, which allows to get more values later. It also allows you to compose streams and other arbitrary futures using the basic combinators of the future.
Like the Future type, the Stream type provides a large number of combinators. In addition to future-like combinators (for example, then ), stream-specific combinators are supported, such as fold .
Stream
typeAn example of using futures was considered at the beginning of this tutorial, and now let's look at an example of using streams using the incoming method implementation. This simple server that accepts connections writes the word "Hello!" and closes the socket:
extern crate futures; extern crate tokio_core; use futures::stream::Stream; use tokio_core::reactor::Core; use tokio_core::net::TcpListener; fn main() { let mut core = Core::new().unwrap(); let address = "127.0.0.1:8080".parse().unwrap(); let listener = TcpListener::bind(&address, &core.handle()).unwrap(); let addr = listener.local_addr().unwrap(); println!("Listening for connections on {}", addr); let clients = listener.incoming(); let welcomes = clients.and_then(|(socket, _peer_addr)| { tokio_core::io::write_all(socket, b"Hello!\n") }); let server = welcomes.for_each(|(_socket, _welcome)| { Ok(()) }); core.run(server).unwrap(); }
As before, let's go through the lines:
let mut core = Core::new().unwrap(); let address = "127.0.0.1:8080".parse().unwrap(); let listener = TcpListener::bind(&address, &core.handle()).unwrap();
Here we initialized the event loop by calling LoopHandle's TcpListener :: bind method to create a TCP listener that will accept sockets.
Next, take a look at the following code:
let server = listener.and_then(|listener| { // ... });
Here you can see that TcpListener::bind
, like TcpStream::connect
, does not return TcpListener
, rather, the future calculates it. We then use the Future's and_then method to determine what happens when the TCP listener becomes available.
We got a TCP listener and can determine its state:
let addr = listener.local_addr().unwrap(); println!("Listening for connections on {}", addr);
Call the local_addr method to print the address to which the listener was associated. From this point on, the port is successfully connected, so that clients can connect.
Next, create a stream .
let clients = listener.incoming();
Here, the incoming method returns the Stream pairs TcpListener and SocketAddr . This is similar to the TcpListener from the standard library and the accept method ; only in this case, we rather receive all events as a stream, rather than accepting sockets manually.
The stream of clients
produces sockets constantly. This reflects the work of the servers - they take clients in a loop and direct
them to the rest of the system for processing.
Now, having a stream of client connections, we can manipulate it using standard Stream type methods:
let welcomes = clients.and_then(|(socket, _peer_addr)| { tokio_core::io::write_all(socket, b"Hello!\n") });
Here we use the Stream method and_then to perform an action on each element of the stream. In this case, we form a chain of calculations for each element of the stream ( TcpStream
). We saw the write_all method earlier; it writes the transferred data buffer to the passed socket.
This block means that welcomes
now a stream of sockets that welcomes
a string of "Hello!" Characters. In this tutorial, we complete the work with the connection, so we convert the entire flow of welcomes
to the future using the for_each method:
welcomes.for_each(|(_socket, _welcome)| { Ok(()) })
Here we take the results of the previous future, write_all , and discard them, causing the socket to close.
It should be noted that an important limitation of this server is the lack of parallelism. Streams are ordered data processing, and in this case, the order of the original stream is the order in which the sockets were received, and the and_then and for_each methods preserve this order. Thus, chaining creates an effect when each socket is taken from a stream and all related operations on it are processed before proceeding to the next socket.
If, instead, we want to manage all clients in parallel, we can use the spawn method:
let clients = listener.incoming(); let welcomes = clients.map(|(socket, _peer_addr)| { tokio_core::io::write_all(socket, b"hello!\n") }); let handle = core.handle(); let server = welcomes.for_each(|future| { handle.spawn(future.then(|_| Ok(()))); Ok(()) });
Instead of the and_then method, the map method is used, which converts a stream of clients into a futures stream. Then we change the closure passed in for_each using the spawn method, which allows the future to be launched in parallel in the event loop. Note that spawn requires future c item / error of the type ()
.
At this stage, there is a clear understanding of the types of Future
and Stream
, how they are implemented and how to combine them. But where did all these futures originally come from?
Take a look at a few specific implementations of futures and threads.
First of all, any available value of the future is in the "ready" state. For this, the done , failed, and finished functions are sufficient. The done function accepts Result<T,E>
and returns Future<Item=T, Error=E>
. For the failed and finished
functions, you can specify T
or E
and leave the other associated type as a wildcard.
For streams, the equivalent concept of a “finished” stream value is the iter function, which creates a stream that provides elements of the resulting iterator. In situations where the value is not in the “finished” state, there are also many common implementations of Future
and Stream
, the first of which is the oneshot function:
extern crate futures; use std::thread; use futures::Future; fn expensive_computation() -> u32 { // ... 200 } fn main() { let (tx, rx) = futures::oneshot(); thread::spawn(move || { tx.complete(expensive_computation()); }); let rx = rx.map(|x| x + 3); }
, oneshot , , , mpsc::channel . tx
("transmitter") Complete oneshot
, future . Complete::complete .
, rx
("receiver"), Oneshot , Future . Item
T
, Oneshot
. Error
Canceled
, , Complete .
future ( ) . Send
. , , , future , .
Stream channel . , , , Stream
, .
Sender : , , future, , , . , .
futures — Future . Iterator , .
:
, , - :
fn foo() -> Box<Future<Item = u32, Error = io::Error>> { // ... }
. future, future , .
, boxed BoxFuture
, Box<Future + Send>
:
fn foo() -> BoxFuture<u32, u32> { finished(1).boxed() }
, future . Box
, future . , , , future . , , future (. , , ), Box
.
Box
, future .
Example:
struct MyFuture { inner: Oneshot<i32>, } fn foo() -> MyFuture { let (tx, rx) = oneshot(); // ... MyFuture { inner: tx } } impl Future for MyFuture { // ... }
MyFuture
Future
. future Oneshot<i32>
, future .
, , Box
- . MyFuture
, .
, . , futures .
— :
fn add_10<F>(f: F) -> Map<F, fn(i32) -> i32> where F: Future<Item = i32>, { fn do_map(i: i32) -> i32 { i + 10 } f.map(do_map) }
future, Box
, .
. - . ( fn(i32) -> i32
), . , .
impl Trait
Rust, impl Trait , future.
Example:
fn add_10<F>(f: F) -> impl Future<Item = i32, Error = F::Error> where F: Future<Item = i32>, { f.map(|i| i + 10) }
, — ", Future
" . future .
: Box
, , future , Box
.
, impl Trait Rust. , , , futures, . -, Box
impl Trait
.
Task
Future
, futures, , . , poll
, , poll
NotReady
, , ? , poll
?
Task .
Task , futures. future , . ", !" future, . Task , " " , future .
future . (poll), , future. spawn , puPool::spawn Handle::spawn . spawn poll
.
Task
futures
: Task
, Future
. , futures
. Task
, , .
, future . , futures , , , .
Futures 'static
, futures:
future , future, ;
Arc / Rc
) , , ( Arc<Mutex>
), ., , .
Task
Future
, Task
, poll
, . API Task
Task
. Task
:
task_local!
, thread_local!
. , , Task
, , Task
;
Rc
., , , .
!
"" — - "", . "" . — .
: "future" ?
Source: https://habr.com/ru/post/310234/
All Articles