CDRS ( Apache C assandra d river written in R s ) is my own open source project that I decided to develop after I discovered that there was a shortage of drivers for Cassandra in the Rust ecosystem.
Of course, I will not say that they are not at all. They are, but one part is the packages abandoned in their infancy Hello World, and the second part is probably the only binding to the DataStax driver written in C ++ .
As for CDRS, by means of Rust it fully implements the specification of the 4th version of the protocol .
To include a driver in your project, as usual, you need the following.
First, add the CDRS to the dependencies
section of your cargo.toml
file:
[dependencies] cdrs = "1.0.0-beta.1"
This will allow you to use a TCP connection without encryption.
If you intend to create an SSL-encrypted connection to your database, then CDRS must be enabled with the "ssl" feature:
[dependencies] openssl = "0.9.6" [dependencies.cdrs] version = "1.0.0-beta.1" features = ["ssl"]
Second, add it to lib.rs
extern crate CDRS
To install an unencrypted connection, you will need the following modules.
use cdrs::client::CDRS; use cdrs::authenticators::{NoneAuthenticator, PasswordAuthenticator}; use cdrs::transport::TransportPlain;
If it so happens that your cluster does not require password authorization, then the connection can be established as follows:
let authenticator = NoneAuthenticator; let addr = "127.0.0.1:9042"; let tcp_transport = TransportPlain::new(addr).unwrap(); // pass authenticator and transport into CDRS' constructor let client = CDRS::new(tcp_transport, authenticator); use cdrs::compression; // start session without compression let mut session = try!(client.start(compression::None));
To establish a connection that requires password authentication, instead of NoneAuthenticator
you need to use PasswordAuthenticator
:
let authenticator = PasswordAuthenticator::new("user", "pass");
Establishing a TLS connection is very similar to the process described in the previous step, except that you need a PEM certificate to create an SSL transport.
use cdrs::client::CDRS; use cdrs::authenticators::PasswordAuthenticator; use cdrs::transport::TransportTls; use openssl::ssl::{SslConnectorBuilder, SslMethod}; use std::path::Path;
let authenticator = PasswordAuthenticator::new("user", "pass"); let addr = "127.0.0.1:9042"; // here needs to be a path of your SSL certificate let path = Path::new("./node0.cer.pem"); let mut ssl_connector_builder = SslConnectorBuilder::new(SslMethod::tls()).unwrap(); ssl_connector_builder.builder_mut().set_ca_file(path).unwrap(); let connector = ssl_connector_builder.build(); let ssl_transport = TransportTls::new(addr, &connector).unwrap(); // pass authenticator and SSL transport into CDRS' constructor let client = CDRS::new(ssl_transport, authenticator);
For easier management of existing connections, CDRS contains a ConnectionManager
, which in essence is an adapter for r2d2 .
use cdrs::connection_manager::ConnectionManager; //... let config = r2d2::Config::builder() .pool_size(3) .build(); let transport = TransportPlain::new(ADDR).unwrap(); let authenticator = PasswordAuthenticator::new(USER, PASS); let manager = ConnectionManager::new(transport, authenticator, Compression::None); let pool = r2d2::Pool::new(config, manager).unwrap(); for _ in 0..20 { let pool = pool.clone(); thread::spawn(move || { let conn = pool.get().unwrap(); // use the connection // it will be returned to the pool when it falls out of scope. }); }
To use lz4
and snappy
compression, it is enough to transfer the desired decoder to the session constructor:
// session without compression let mut session_res = client.start(compression::None); // session with lz4 compression let mut session_res = client.start(compression::Lz4); // session with snappy compression let mut session_res = client.start(compression::Snappy);
Further, CDRS will independently inform the cluster that it is ready to receive information in a compressed form with the selected decoder. Further unpacking will take place automatically and does not require any further action from the developer.
Requests to Cassandra server are carried out exclusively within the existing session, after the choice of authorization methods, compression, and the type of transport.
To execute a particular query, you must create a Query
object , which at first glance may seem somewhat redundant for simple queries, since it contains many parameters that are probably not used so often.
For this reason, a builder
was created that simplifies the process of configuring a query. For example, for simple ' USE my_namespace;
'simple enough
let create_query: Query = QueryBuilder::new("USE my_namespace;").finalize(); let with_tracing = false; let with_warnings = false; let switched = session.query(create_query, with_tracing, with_warnings).is_ok();
To create a new table in the Cassandra cluster, as before, you must first configure Query
and then run the query:
use std::default::Default; use cdrs::query::{Query, QueryBuilder}; use cdrs::consistency::Consistency; let mut create_query: Query = QueryBuilder::new("CREATE TABLE keyspace.authors ( id int, name text, messages list<text>, PRIMARY KEY (id) );") .consistency(Consistency::One) .finalize(); let with_tracing = false; let with_warnings = false; let table_created = session.query(create_query, with_tracing, with_warnings).is_ok();
As for the CQL request for creating a new table, for more complete information it is better to turn to specialized resources, for example DataStax .
Suppose that in our database there is a table of authors, and each author has a list of his posts. Let these messages be stored inside the list column. In Rust terms, the author should have the following form:
struct Author { pub name: String, pub messages: Vec<String> }
The query itself can be executed via the Session::query
method, as was done when creating a table. Naturally, CQL should be in this case something like ' SELECT * FROM keyspace.authors;
'. If the table contains data about some authors, we can try to display the data in the collection of Rust structures, such as ' Vec<Author>
'
//... use cdrs::error::{Result as CResult}; let res_body = parsed.get_body(); let rows = res_body.into_rows().unwrap(); let messages: Vec<Author> = rows .iter() .map(|row| { let name: String = row.get_by_name("name").unwrap(); let messages: Vec<String> = row // unwrap Option<CResult<T>>, where T implements AsRust .get_by_name("messages").unwrap().unwrap() .as_rust().unwrap(); return Author { author: name, text: messages }; }) .collect();
While displaying the results, you should pay attention to the following traits:
IntoRustByName . In simple terms, this trait is applied to complex Cassandra types such as row (which, strictly speaking, is not a separate type defined in the specification, but in its internal structure can be considered as something close to the User Defined Type) and UDT. Roughly speaking, get_by_name
tries to find a "property" by its name, and if it finds it, it returns the result of converting this property to a Rust type or to CDRS types, such as List
, 'Map', UDT
. These types themselves are a display of the corresponding data types defined in the specification.
Sometimes it is convenient to first prepare a complex query once, and then execute it several times with different data at different times. Prepare & Execute is perfect for this.
// prepare just once let insert_table_cql = " insert into user_keyspace.users (user_name, password, gender, session_token, state) values (?, ?, ?, ?, ?)"; let prepared = session.prepare(insert_table_cql.to_string(), true, true) .unwrap() .get_body() .into_prepared() .unwrap(); // execute later and possible few times with different values let v: Vec<Value> = vec![Value::new_normal(String::from("john").into_bytes()), Value::new_normal(String::from("pwd").into_bytes()), Value::new_normal(String::from("male").into_bytes()), Value::new_normal(String::from("09000").into_bytes()), Value::new_normal(String::from("FL").into_bytes())]; let execution_params = QueryParamsBuilder::new(Consistency::One).values(v).finalize(); // without tracing and warnings let executed = session.execute(prepared.id, execution_params, false, false);
It also makes sense to combine Prepare & Batch to fulfill several prepared queries at once. The simplest example of Batch can also be found in the examples .
In addition to all the above, CDRS provides the ability to subscribe and follow the events published by the server.
let (mut listener, stream) = session.listen_for(vec![SimpleServerEvent::SchemaChange]).unwrap(); thread::spawn(move || listener.start(&Compression::None).unwrap()); let topology_changes = stream // inspects all events in a stream .inspect(|event| println!("inspect event {:?}", event)) // filter by event's type: topology changes .filter(|event| event == &SimpleServerEvent::TopologyChange) // filter by event's specific information: new node was added .filter(|event| { match event { &ServerEvent::TopologyChange(ref event) => { event.change_type == TopologyChangeType::NewNode }, _ => false } }); println!("Start listen for server events"); for change in topology_changes { println!("server event {:?}", change); }
To find a complete list of events, it is best to refer to the specification itself, as well as to the driver documentation .
In the future, there are plans to use events for smart load balancing.
Source: https://habr.com/ru/post/321594/
All Articles