📜 ⬆️ ⬇️

Working with Aerospike on scala using macro magic

N |  Solid


In our bigdata department, some of the data is stored in Aerospike. There are a lot of consumers, among them there are two applications written in Scala, the interaction with the base in which will be expanded in connection with the constantly growing demands of the business. The only decent driver for us was the java client mentioned on the site of the aerospike.com database itself ( http://www.aerospike.com/docs/client/java ). Converting rock data types (and especially hierarchical) to the corresponding Aerospike types leads to a large amount of boilerplate. To avoid this, we need a more convenient, and at the same time, a type-safe interface.


Engineers do not like to write the same code many times and try to simplify and optimize all repetitive actions. Such problems are often solved by code generation. Therefore, we decided to write our own library for working with Aerospike, using macros .



A little bit about Aerospike


Aerospike is a distributed schema-less key-value database that works on the principle of a hash table. It is actively used in our bank for building distributed caches and for tasks requiring low response time. The database is easy to install and easy to administer, which simplifies its implementation and support.


About the storage model: the namespace and setName parameters are associated with record keys, and the data itself is stored in so-called bins. Values ​​can be of various types: Integers, Strings, Bytes, Doubles, Lists, Maps, Sorted Maps, GeoJSON . Interestingly, the type of the bean is not fixed and, having written down, say, Integer , you can then overwrite it with any other. Drivers written for this database have a fair amount of code for serializing the values ​​of the external model into the internal one.


About creating DSL


Consider the simple examples of the design process of our DSL, why we decided to use macros, and what came of it all.


In conditions of limited time (interaction with this base is only a small part of the project) it is difficult to write the entire client with the implementation of the protocol. In addition, it would require more effort in support. Therefore, we stopped at creating a wrapper for an existing client. Consider the examples.


The basis is the Aerospike Java Client version 3.3.1 (it can be found at www.aerospike.com, the source code is on GitHub ), a considerable part of the methods in which operates with keys and bins from the com.aerospike.client package. Java Client supports working with the database both in synchronous and asynchronous mode. We use asynchronous com.aerospike.client.async.AsyncClient . The easiest way to create it:


 val client = new AsyncClient(new AsyncClientPolicy, hosts.map(new Host(_, port)): _*) 

where hosts is a List[String] containing the hosts of your database, and port is an Int (default 3000).


If during the creation of the client to transfer invalid values ​​of the hosts or the wrong port, the driver throws an error, because during the call it checks the connection:


 scala> new AsyncClient(new AsyncClientPolicy, List().map(new Host(_, port)): _*) com.aerospike.client.AerospikeException$Connection: Error Code 11: Failed to connect to host(s): 

Type mapping table in DSL, Java CLient and database


 | Scala | Java Client | Aerospike | |-------------- |-------------- |----------- | | Int | IntegerValue | Integer | | Long | LongValue | Integer | | String | StringValue | String | | Boolean | BooleanValue | Integer | | Float | FloatValue | Double | | Double | DoubleValue | Double | | Seq | ListValue | List | | Map | MapValue | Map | | Char | StringValue | String | | Short | IntegerValue | Integer | | Byte | IntegerValue | Integer | | HList | MapValue | Map | | case class T | MapValue | Map | 

The table shows that there are quite a few similar transformations ahead. Write it all with your hands every time there is no desire.


The first thought was about reflexion, but the runtime version does not suit us - it is long and expensive. There remains a variant with a compile-time reflexion, which allows generating converters and receiving error messages at the compilation stage.


In the interface methods of our DSL for any actions with the database, we will transmit only specific values ​​of keys and bins, and all the transformations will be done by macros. The main idea was to get rid of the boilerplate and to protect the user from a thorough study of the internal data structure of Aerospike itself. We have previously described the most optimal storage option, based on the type of value transmitted for recording.


Consider the example of one of the most common operations with Aerospike - adding a record and then reading it in a key. We will use the Put method. First we need the functions to convert values ​​of certain types into internal driver models: keys in com.aerospike.client.Key , and bins in com.aerospike.client.Bin .
Let the key be String , and we will write bins of types String, Int, Boolean in various services.


Write the key conversion function:


 import com.aerospike.client.Key def createStringKey(namespace: String, setName: String, value: String): Key = new Key(namespace, setName, new StringValue(value)) 

and bins, respectively:


 import com.aerospike.client.Value.{IntegerValue, StringValue, BooleanValue} def createStringBin(name: String, value: String): Bin = new Bin(name, new StringValue(value)) def createIntBin(name: String, value: Int): Bin = new Bin(name, new IntegerValue(value)) def createBooleanBin(name: String, value: Boolean): Bin = new Bin(name, new BooleanValue(value)) 

The signature of the method we need in the library in java (several options, we take with the least number of parameters):


 public void put(WritePolicy policy, Key key, Bin... bins) throws AerospikeException; 

This means that calls using this library will look like this:


 import com.aerospike.client.policy.WritePolicy client.put(new WritePolicy, createStringKey("namespace", "setName", "keyValue1"), Seq(createStringBin("binName1", "binValue1"), createStringBin("binName2", "binValue2")): _*) client.put(new WritePolicy, createStringKey("namespace", "setName", "keyValue2"), Seq(createIntBin("binName1", 2), createIntBin("binName2", 4)): _*) client.put(new WritePolicy, createStringKey("namespace", "setName", "keyValue3"), Seq(createBooleanBin("binName1", true), createBooleanBin("binName2", false)): _*) 

Not too cute, right? Let's try to simplify:


  def createKey[T](ns: String, sn: String, value: T): Key = { val key = value match { case s: String => new StringValue(s) case i: Int => new IntegerValue(i) case b: Boolean => new BooleanValue(b) case _ => throw new Exception("Not implemented") } new Key(ns, sn, key) } def createBin[T](name: String, value: T): Bin = { value match { case s: String => new Bin(name, new StringValue(s)) case i: Int => new Bin(name, new IntegerValue(i)) case b: Boolean => new Bin(name, new BooleanValue(b)) case _ => throw new Exception("Not implemented") } } def putValues[K, B](client: AsyncClient, namespace: String, setName: String, keyValue: K, bins: Seq[(String, B)])(implicit wPolicy: WritePolicy): Unit = { client.put(wPolicy, createKey(namespace, setName, keyValue), bins.map(b => createBin(b._1, b._2)): _*) } 

Now we need to get rid of the functions createKey and createBin , we add of magic implies.


We need utility objects that, based on the types of input data, generate the appropriate models of the driver used:


 KeyWrapper: [K => Key] BinWrapper: [B => Bin] 

Now you can collect all the logic in one method:


 case class SingleBin[B](name: String, value: B) def putValues[K, B](client: AsyncClient, key: K, value: SingleBin[B])(implicit kC: KeyWrapper[K], bC: BinWrapper[B], wPolicy: WritePolicy): Unit = client.put(wPolicy, kC(key), bC(value)) 

where WritePolicy is a container object containing various write parameters. We will use the default one, creating it so new WritePolicy .


Obviously, the most commonplace option would be to describe the creation of wrappers of all types. But why do this when we know exactly how each of the instances will be created? This is where macros come in handy.


The simplest option is to describe the creation of one or another type of converter using quasi quotes . Let's start with the keys:


  trait KeyWrapper[KT] { val namespace: String = "" val setName: String = "" def apply(k: KT): Key def toValue(v: KT): Value = v match { case b: Int => new IntegerValue(b) case b: String => new StringValue(b) case b: Boolean => new BooleanValue(b) case _ => throw new Exception("not implemented") } } object KeyWrapper { implicit def materialize[T](implicit dbc: DBCredentials): KeyWrapper[T] = macro impl[T] def impl[T: c.WeakTypeTag](c: Context)(dbc: c.Expr[DBCredentials]): c.Expr[KeyWrapper[T]] = { import c.universe._ val tpe = weakTypeOf[T] val ns = reify(dbc.splice.namespace) val sn = reify(dbc.splice.setname) val imports = q""" import com.aerospike.client.{Key, Value} import collection.JavaConversions._ import com.aerospike.client.Value._ import scala.collection.immutable.Seq import ru.tinkoff.aerospikescala.domain.ByteSegment import scala.util.{Failure, Success, Try} """ c.Expr[KeyWrapper[T]] { q""" $imports new KeyWrapper[$tpe] { override val namespace = $ns override val setName = $sn def apply(k: $tpe): Key = new Key(namespace, setName, toValue(k)) } """ } } } 

where DBCredentials contains the namespace and setName.


Thus, we can describe the method for the service, during compilation of which the converters will be generated independently.


N |  Solid


With bins, our situation is somewhat more complicated. You must get the values ​​stored in the database, previously converted to the internal format Aerospike. To do this, we use the simplest of the driver methods:


 public Record get(Policy policy, Key key) throws AerospikeException; 

where the return value is:


 public Record( Map<String,Object> bins, int generation, int expiration ) 

and the data we need is in Map<String,Object> bins . There is a problem here (see the correspondence table). Since our goal is to generate converters at the compilation stage and provide the output of a type that is identical to the one written earlier, we need to predict how to describe the function that gets the value we need from the database. Among other things, the types that we get in bins from the java.util package means that we will need converters from the corresponding scala.collection packages.
Now we will write a bins converter:


 trait BinWrapper[BT] { import com.aerospike.client.Value._ import com.aerospike.client.{Bin, Record, Value} import scala.collection.JavaConversions._ import scala.collection.immutable.Map import scala.reflect.runtime.universe._ type Singleton = SingleBin[BT] type Out = (Map[String, Option[BT]], Int, Int) def apply(one: Singleton): Bin = { if (one.name.length > 14) throw new IllegalArgumentException("Current limit for bean name is 14 characters") else new Bin(one.name, toValue(one.value)) } def toValue(v: BT): Value = v match { case b: Int => new IntegerValue(b) case b: String => new StringValue(b) case b: Boolean => new BooleanValue(b) case _ => throw new Exception("not implemented") } def apply(r: Record): Out = { val outValue: Map[String, Option[BT]] = { val jMap = r.bins.view collect { case (name, bt: Any) => name -> fetch(bt) } jMap.toMap } if (outValue.values.isEmpty && r.bins.nonEmpty) throw new ClassCastException( s"Failed to cast ${weakTypeOf[BT]}. Please, implement fetch function in BinWrapper") else (outValue, r.generation, r.expiration) } def fetch(any: Any): Option[BT] } 

The apply method takes as a parameter Record - here you can summarize everything up to the moment of parsing the type of the value itself. The implementation of this method is easier to write in macros:


 object BinWrapper { implicit def materialize[T]: BinWrapper[T] = macro materializeImpl[T] def materializeImpl[T: c.WeakTypeTag](c: blackbox.Context): c.Expr[BinWrapper[T]] = { import c.universe._ val tpe = weakTypeOf[T] val singleton = weakTypeOf[SingleBin[T]] val out = weakTypeOf[(Map[String, Option[T]], Int, Int)] val tpeSt = q"${tpe.toString}" val fetchValue = tpe match { case t if t =:= weakTypeOf[String] => q"""override def fetch(any: Any): Option[$tpe] = any match { case v: String => Option(v) case oth => scala.util.Try(oth.toString).toOption } """ case t if t =:= weakTypeOf[Boolean] => q"""override def fetch(any: Any): Option[$tpe] = any match { case v: java.lang.Long => Option(v == 1) case _ => None } """ case t if t =:= weakTypeOf[Int] => q"""override def fetch(any: Any): Option[$tpe] = any match { case v: java.lang.Long => Option(v.toInt) case oth => scala.util.Try(oth.toString.toInt).toOption } """ case t if t.toString.contains("HNil") || t.toString.contains("HList") => q"""override def fetch(any: Any): Option[$tpe] = any match { case m: java.util.HashMap[Any, Any] => val newList = castHListElements(m.asScala.values.toList, $tpeSt) newList.toHList[$tpe] case oth => None } """ case _ => q"""""" } val imports = q""" import java.util.{List => JList, Map => JMap} import com.aerospike.client.{Bin, Record, Value} import com.aerospike.client.Value.{BlobValue, ListValue, MapValue, ValueArray} import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ import shapeless.{HList, _} import shapeless.HList.hlistOps import syntax.std.traversable._ .... """ c.Expr[BinWrapper[T]] { q""" $imports new BinWrapper[$tpe] { override def apply(one: $singleton): Bin = { if (one.name.length > 14) throw new IllegalArgumentException("Current limit for bean name is 14 characters") else new Bin(one.name, toValue(one.value)) } override def apply(r: Record): $out = { val outValue: Map[String, Option[$tpe]] = { val jMap = r.bins.view collect { case (name, bt: Any) => val res = fetch(bt) if (res.isEmpty && r.bins.nonEmpty) throwClassCast($tpeSt) else name -> res } jMap.toMap } (outValue, r.generation, r.expiration) } $fetchValue } """ } } } 

The macros have done all the work for us - the instances of all the required converters will be generated independently, the method calls will contain only the keys and bins values ​​themselves.


N |  Solid


Quasiquotes easy to work with: predictable behavior, no pitfalls. It is important to remember that when using this approach, all libraries that are needed in the methods described in Quasiquotes should be imported into the file where the macro is used. Therefore, I immediately added the imports parameter in both converters in order not to copy the set of libraries in each file.


Now we have everything to write a service wrapper:


 class SpikeImpl(client: IAsyncClient) { def putValue[K, B](key: K, value: SingleBin[B])(implicit kC: KeyWrapper[K], bC: BinWrapper[B]): Unit = { val wPolicy = new WritePolicy client.put(wPolicy, kC(key), bC(value)) } def getByKey[K, B](k: K)(implicit kC: KeyWrapper[K], bC: BinWrapper[B]): Option[B] = { val policy = new Policy val record = client.get(policy, kC(k)) bC.apply(record)._1.headOption.flatMap(_._2) } } 

Now you can check the work of our service:


 import shapeless.{HList, _} import shapeless.HList.hlistOps import scala.reflect.macros.blackbox._ import scala.language.experimental.macros object HelloAerospike extends App { val client = new AsyncClient(new AsyncClientPolicy, hosts.map(new Host(_, port)): _*) val database = new SpikeImpl(client) implicit val dbc = DBCredentials("namespace", "setName") database.putValue("key", SingleBin("binName", 123 :: "strValue" :: true :: HNil)) val hlistBin = database.getByKey[String, Int :: String :: Boolean :: HNil]("key") .getOrElse(throw new Exception("Failed to get bin value")) println("hlistBin value = " + hlistBin) } 

Run and go to the database:


 Mac-mini-administrator-5:~ MarinaSigaeva$ ssh user@host user@host's password: Last login: Wed Nov 23 19:41:56 2016 from 1.1.1.1 [user@host ~]$ aql Aerospike Query Client Version 3.9.1.2 Copyright 2012-2016 Aerospike. All rights reserved. aql> select * from namespace.setName +------------------------------------------+ | binName | +------------------------------------------+ | MAP('{"0":123, "1":"strValue", "2":1}') | +------------------------------------------+ 1 row in set (0.049 secs) aql> 

Data recorded. Now let's see what the application brought to the console:


 [info] Compiling 1 Scala source to /Users/Marina/Desktop/forks/playground/target/scala-2.11/classes... [info] Running HelloAerospike hlistBin value = 123 :: strValue :: true :: HNil [success] Total time: 0 s, completed 23.11.2016 20:01:44 

For scala developers, the solution may be more intuitive than the java library. The code of the current DSL is laid out on Github with a detailed description of how to and a book , which will be supplemented. In light of recent events ( scala 2.12 released ), a challenge has emerged for interesting experiments with scala-meta . I hope this experience will be useful to you in solving such problems.


')

Source: https://habr.com/ru/post/317742/


All Articles