⬆️ ⬇️

Behavior Analysis with Apache Spark

It will be about using Apache Spark to analyze behavioral factors on a site that has a very large attendance. Accounting for behavioral factors is very often used to increase resource conversion. In addition, the capabilities of the Internet allow you to very quickly and easily collect and analyze a huge amount of very different statistical information. Code samples will be shown and some tips based on the author’s personal experience will be given.



Behavioral factors are one of the most effective methods for identifying the value of a document. In essence, these are the variables on the basis of which the decision on the rating of a particular entity will be made. If there is relatively little information, then it can be presented in a human-friendly graphical form (histogram, matrix, heat map, regression analysis graph, or hierarchical cluster analysis dendrogram). For example, in the R programming language (it is very often used in data analysis) it is quite convenient to display graphs (plot) or matrices (mosaicplot, image, persp, contour). In addition, a lot of useful functions are built into the R programming language (descriptive statistics, combinatorics, probability theory). I'm not even talking about libraries for more complex analysis (for example, Random forest).



However, the most important limitation of the assessor is productivity. A person cannot appreciate a huge amount of information in a short period of time. On the other hand, there is a wide range of tasks where it is impossible to do without a person, since some things are almost impossible to evaluate with an algorithm.



If there is so much information that it cannot even fit into the memory of a supercomputer (on one node), then distributed data processing is used at several stations. For example, Apache Spark is one of the most popular frameworks for distributed data processing. Some tasks can be performed literally with one command (line of code in the console):

sc.textFile(path).map(s => (s, 1)).reduceByKey((a, b) => a + b).saveAsTextFile(pathSave) 


The mentioned fragment of the source code in the Scala programming language allows you to count events (lines) in the log, where each line is strictly one name (unique identifier) ​​of the event. As a result of launching this program, we will see a directory with several text files containing the results of data processing. The data processing report will be displayed in a convenient user interface (http://127.0.0.1:4040/jobs/) with detailed information, including a visual display:



After the data is loaded from a text file (textFile), all strings will be written as a special distributed dataset RDD (Resilient Distributed Dataset). Now with the obtained data you can perform various operations. It is important to note that transformations (the term Transformations is used in the original documentation) do not modify the set (it is immutable), but create a new one. Moreover, Spark will not perform operations with this set until the desired moment (commands from the Actions list).

')

In this particular case, the data set will be transformed into a map (the map will apply an action to each element by creating a new RDD). The key will be a string, and one will be entered as a value. Another transformation (reduceByKey) is the reduction of set elements by key. The specified formula (a + b) summarizes the values ​​for each key to be abbreviated. Thus, only unique keys will remain, and as values ​​they will have the number of repetitions in the original set. If there is a desire to further simplify the above example, then we recall countByValue, which will make the mentioned task quite trivial.



But what if I need to perform a real cluster analysis using the KMeans method? For this there is a package spark.mllib, which contains many ready-made algorithms (including, but not limited to: clustering, linear regression, classification, collaborative filtering, decision tree, random forests, gradient boosting).

 import org.apache.spark.mllib.clustering.{KMeans, KMeansModel} import org.apache.spark.mllib.linalg.Vectors val lines = sc.textFile(pathCsv) val data = lines.map(s => Vectors.dense(s.split(";").map(_.toDouble))).cache() //      val clusters = KMeans.train(data, 3, 20) clusters.clusterCenters.mkString("\n") //    


I would especially like to note that the accuracy of the detection of the number of clusters will affect the accuracy of the algorithm. Therefore, it is better to clean and check the data in advance.



Associative rules can be attributed to one of the most common methods of analyzing behavioral factors. They are very often used when finding template actions, for example, a typical market basket in online stores. The idea of ​​association rules is as follows: to establish the probability of meeting in the set of some elements upon the presence of others. For example, by the presence of a flashlight in the basket, guess the presence of a battery for it. Of course, we are talking not only about purchases, but about any other template actions. I will give an example of such an analysis:

 import org.apache.spark.mllib.fpm.AssociationRules import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset // ,     val freqItemsets = sc.parallelize(Seq( new FreqItemset(Array("milk"), 31L), new FreqItemset(Array("red", "milk", "fantazia"), 84L), new FreqItemset(Array("milk", "fantazia"), 89L), new FreqItemset(Array("lemon"), 49L), new FreqItemset(Array("red", "milk", "lemon"), 14L), new FreqItemset(Array("green", "lemon"), 25L) )) val results = new AssociationRules().setMinConfidence(0.5).run(freqItemsets) results.collect().foreach { rule => println(rule.antecedent.mkString(",") + " -> " + rule.consequent.mkString(",") + " // " + rule.confidence) } 


The tasks of the analysis of behavioral factors also include various components of the activity of people in social networks. For such tasks it is more convenient to present the data in the form of a graph. Fortunately, there is a component called GraphX. The meaning of this component is to simplify the distributed processing of graphs. His work can be shown by the example of identifying PageRank for sites. First of all, we will define nodes and edges of the graph, and then we will execute with them some of the most elementary operations:

 import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD val v = Array( (1L, ("www.1.com", 10)), (2L, ("www.2.com", 20)), (3L, ("www.3.com", 30)), (4L, ("www.4.com", 40)), (5L, ("www.5.com", 50)), (6L, ("www.6.com", 0)) ) val e = Array( Edge(1L, 5L, 1), Edge(2L, 5L, 2), Edge(3L, 5L, 3), Edge(4L, 5L, 4), Edge(5L, 1L, 5) ) //    val graph: Graph[(String, Int), Int] = Graph(sc.parallelize(v), sc.parallelize(e)) //    ,      graph.vertices.filter{ case (id, (url, visits)) => visits > 35 }.collect().mkString("\n") //     for (triplet <- graph.triplets.collect()) { print(s"Link from ${triplet.srcAttr._1} (visits = ${triplet.srcAttr._2}) ") println(s"to ${triplet.dstAttr._1} (visits = ${triplet.dstAttr._2})") } // ,     PageRank graph.pageRank(0.001, 0.4).vertices.collect().foreach { site => println("Site Id = " + site._1 + ", PR = " + site._2) } 


The Apache Spark framework has an API for popular programming languages ​​(Scala, Python, and Java). There is an opinion that in the Java programming language the code is a bit cumbersome. In addition, to perform relatively simple tasks, I do not really want to write an application in Java. But if necessary, and Java. Connecting to the application should not cause much difficulty. In addition to importing libraries, nothing was needed:

 public class Run { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local").setAppName("HABR"); JavaSparkContext ctx = new JavaSparkContext(conf); /** *  â„–1.  */ JavaRDD<Point> points = ctx.textFile(path).map(new Function<String, Point>() { public Point call(String line) { Double t = Double.parseDouble(line); Double x = 5.5 * (Math.cos(t) + Math.cos(1.1 * t) / 1.1); Double y = 5.5 * (Math.sin(t) - Math.sin(1.1 * t) / 1.1); return new Point(x, y); } }); points.saveAsTextFile(savePath); /** *  â„–2.   SQLContext */ String sql = "SELECT * FROM sites WHERE type = 'b' AND id IN (1,2)"; SQLContext sqlContext = new SQLContext(ctx); DataFrame sites = sqlContext.read().json(jsonPath); sites.show(); sites.registerTempTable("sites"); DataFrame results = sqlContext.sql(sql); results.show(); } } 


As for the logic of calculating the rating, it is very dependent on the project. General indicators (views, failures, time on the site, viewing depth, repeated visits) are not always more informative than specially tracked events. In my personal experience, they usually followed a fairly commonplace path. At the time of data preparation (the site is written in Yii 2, and the Java service is responsible for filling it), all documents were processed to populate the database. At the stage of this processing, the document rating was calculated, and the finished result was recorded in the database.



As you understand, it is not necessary to perform calculations dynamically (with each page refresh). Moreover, on sites with a huge load even caching (Memcached, Redis, Tarantool) will not help. Consequently, it is necessary to carry out all the calculations in advance, and the PHP application should be used only for display (the model receives the already calculated rating from the database and displays it in the view using the controller). By the way, on the part of the site (in its database), the rating is best stored with an integer. Another very simple schematic example for clarity.



Create a migration for Yii 2:

 use yii\db\Migration; class m160401_134629_doc extends Migration { public function up() { $this->createTable('{{%doc}}', [ 'id' => $this->primaryKey(), 'name' => $this->string(150)->notNull()->unique(), 'content' => $this->text()->notNull(), 'rating' => $this->integer()->notNull()->defaultValue(0), 'created_at' => $this->integer()->notNull(), 'updated_at' => $this->integer()->notNull(), ], 'CHARACTER SET utf8 COLLATE utf8_unicode_ci ENGINE=InnoDB'); } public function down() { $this->dropTable('{{%doc}}'); } } 


And after her and the model:

 namespace common\models; use Yii; /** * This is the model class for table "{{%doc}}". * * @property integer $id * @property string $name * @property string $content * @property integer $rating * @property integer $created_at * @property integer $updated_at */ class Doc extends \yii\db\ActiveRecord { /** * @inheritdoc */ public static function tableName() { return '{{%doc}}'; } /** * @inheritdoc */ public function rules() { return [ [['name', 'content', 'created_at', 'updated_at'], 'required'], [['content'], 'string'], [['rating', 'created_at', 'updated_at'], 'integer'], [['name'], 'string', 'max' => 150], [['name'], 'unique'], ]; } /** * @inheritdoc */ public function attributeLabels() { return [ 'id' => 'ID', 'name' => 'Name', 'content' => 'Content', 'rating' => 'Rating', 'created_at' => 'Created At', 'updated_at' => 'Updated At', ]; } /** * @param integer $limit * @return Doc[] */ public static function getDocs($limit = 10) { return static::find()->orderBy('rating DESC')->limit($limit)->all(); } } 


Naturally, in the real project, there was already a ready-made functional that would display a list of certain entities sorted by rating. Thought indexes, which I have not described in the example. But the simple controller looked like the one shown below:

 namespace frontend\controllers; use Yii; use common\models\Doc; use yii\web\Controller; class DocController extends Controller { /** *   (-10) * * @return string */ public function actionIndex() { $limit = 10; $cacheTime = 60; $docs = Doc::getDb()->cache(function ($db) use ($limit) { return Doc::getDocs($limit); }, $cacheTime); if(empty($docs)) { $this->goHome(); } return $this->render('index', ['docs' => $docs]); } } 


And there were no difficulties in changing the code on the site. Actually, why should they arise? The main work is done by a non-website: uploading data on the website will contain a pre-processed Spark field. In secret, in a real project, the Scala programming language was used to write very compact code for Spark.



Of course, if the task is simple, then there is no point in implementing very complex algorithms. For the preparation of a moderate amount of data it will turn out to use a very simple service. Let's look at a schematic prototype in Java without using third-party libraries. The interface asks us to pass the document object to the method, and return the number (Double), which reflects the rating level.

 public interface IRating { Double rating(Doc doc); } 


Accordingly, we add several classes that calculate the rating differently. In order to avoid artificial complication and going beyond the scope of this article, we will add a logical constraint: let there be two types of document ratings on the site. For the first type, we find the rating by the number of views (visits) of the document:

 public class MainRating implements IRating { @Override public Double rating(Doc doc) { return (Math.log(doc.getVisits()) * 2); } } 


And for the second we calculate by another fictional formula:

 public class AdditionalRating implements IRating { @Override public Double rating(Doc doc) { return (doc.getEvents() * 0.5) + Math.random(); } } 


Next, we will create an abstract class for the future factory:

 public abstract class IRatingFactory { public abstract IRating getRatingType(String type); } 


We will not forget about the factory itself:

 public class RatingFactory extends IRatingFactory { @Override public IRating getRatingType(String type) { switch (type) { case "main": return new MainRating(); case "additional": return new AdditionalRating(); default: return null; } } } 


I really do not want to resort to value judgments. However, I had a very good impression about Apache Spark. When creating a very high-performance statistical system, he played an important role. Moreover, its use in “production” pleasantly surprised me with its efficiency and simplicity. But this is my personal opinion, which I just want to share, and not impose. In any case, sufficiently detailed documentation and the presence of several English-language books help to quickly explore its main features.



For several years I had the desire to find some almost perfect tool. Honestly, it took me a long time to get rid of this naive idea. Now in my beliefs there is another idea - there cannot be a “magic pill”. The effectiveness of each instrument depends on the specific situation. A lot of books and articles have already been written on this topic, but over the years of daily reading of books on these topics, I’m imbued with just one thought - it is very rarely possible to immediately design a huge statistical system or web resource. More often, this is an evolutionary process, during the development of which new modules are born with new modern technologies and the “victim of refactoring” that has not passed the natural selection goes into history. The world is not static, is it not? Actually, I want to wish your systems continuous and constructive development.

Source: https://habr.com/ru/post/280774/



All Articles