📜 ⬆️ ⬇️

Apache Spark in combat projects - survival experience

We offer you materials based on the speech of Alexander Serbul at the BigData Conference . I, as the author and speaker, have slightly edited the text and added modern thoughts and actual problems, so I hope the post will bring you both additional practical useful knowledge in the industry and food for thought - where to go with your knowledge. So - into battle!


Bigdata


In my understanding, BigData is something crazy, i.e. the term is, and the convolutions inside it are not. On the one hand, administrators are puzzled where to put a significant amount of information. On the other hand, it is a high load development. It is necessary to create a cluster, look for hardcore developers, very experienced. In addition, it is necessary to use higher mathematics: differential computing, linear algebra, probability theory, machine learning, graphs and training on them, logistic regression, linear discriminant analysis, and so on. How to survive in this situation, what tools to take, what else is raw (unfortunately, the majority), how to bring money to the company. After all, this is the main purpose of using BigData, and everything else is populism.

MapReduce


MapReduce is a paradigm of convolution functions for performing large and complex data queries proposed by Google. It allows parallel processing of information in the spirit of data parallelizm . Ideally, you need all the algorithms used to work with large and not only big data, remake on MapReduce. But no one does it for free ;-) In the old book, your grandmother on the shelf on the web will rather quickly find a good K-means clustering algorithm and it will work reliably. But suddenly, before the release, or when there is more data than you expected, you will find that this algorithm does not work in "parallel mode", does not work through MapReduce - they can only load one processor core. Therefore, you will need to urgently and re-invent another algorithm that can work in parallel, and figure out how it should work in the MapReduce paradigm. And not everyone succeeds in doing so - this is Computer Science (in fact, sometimes, without PhD knowledge, it is possible to remake the algorithm on the MapReduce algorithm using the strong coffee method and blackboards with markers).
In our projects, we started using algorithms on the Hadoop MapReduce platform, but then switched to Spark, because it turned out to be more reasonable and practical. Classic Hadoop MapReduce works very simply: completed the task, put the result in the file. Took, completed, put. Spark - takes, performs all tasks, and then unloads the result. In my opinion, and not only mine, Hadoop MapReduce, if not cool - an outdated conglomerate that is constantly and convulsively trying to change, which is why developers and system administrators constantly have to relearn, and businesses need to play Russian roulette with "raw technologies." But ... we have almost no choice (yes, we watched Apache Storm - but it is from a completely different area: task parallel computng ).
')
Alternatively, Apache Spark, to be honest, is not yet visible. This is the most active open-source project in the Apache infrastructure, this is also an object to follow - look at least at Prajna from Microsoft.

You can throw in response to Apache Tez or find something small in the Apache Zoo - but, believe me, to reduce risks, it is better to use mainstream technologies that evolve in tune with the market.

Somewhere nearby, not quite from this area, but from the interesting and if you really want to - look also at the Apache Giraph and TensorFlow . And ask the question: this is TaskParallel or DataParallel technology - and everything will become clear.

For what tasks we use Spark


We use parallel data processing technologies like this. On MySQL-servers hundreds of thousands of databases of clients-companies, with staff from units to thousands of employees turn. Spark is mainly used in the service of personal recommendations, which was described in one of the previous publications .

We collect events - viewing orders, adding to the cart, paying for orders - processing, putting in Amazon Kinesis, processing with workers, saving in NoSQL (DynamoDB), and finally sending to Spark to generate models and analytics.

At first glance, the scheme looks unnecessarily complicated, but gradually comes an understanding of why all this is needed. This is the minimum necessary tools that are necessary for survival.

We upload the results to Apache Mahout and get specific recommendations for the client. The total volume of events recorded by the recommendation service reaches tens of millions per day.

So far, we are intensively using and developing algorithms for collaborative filtering, but we see something like this roadmap for the development of algorithms:
• Multimodality
• Content-based recommendations
• Clustering
• Machine learning, deep learning
• Targeting

Now, more than ever, multimodality is valued - i.e. using several different algorithms to answer a question (in our case, issuing a personal recommendation). It is not enough just to start a service based on Apache Mahout, it will not give any gains over competitors. Today you will not surprise anyone with collaborative filtering. It is necessary to take into account the cloud of user tags when he went to the store and received some information. Clustering allows for more flexible organization of information targeting. Here, of course, can not do without machine learning. Deep learning is, in simple terms, “quality” machine learning, implying a very detailed study of the problem by the machine and, often, the use of a multilayered recurrent neural network. With proper application, this helps to increase the conversion even more, to work more effectively with customers.

Reverse side of diversity


There are many software environments, tools, tools and products for developing and analyzing data on the market today. Thank you for opensource (yes, it’s full of raw, glued ghouls, but there are some great solutions)! On the one hand, diversity is an undoubted benefit. On the other hand, the development process can become rather chaotic and confused. For example, first try to use Apache Pig, when something does not work, refer to Apache Hive. We looked, played, started writing on Spark SQL. When requests begin to fall - rush to Impala (and it's still worse). Under the threat of suicide, some curse the world of BigData and return to the good old RDBMS. This is sometimes the impression that a lot of toys have been created for “specialists”, often with the same “specialists”. But business does not understand all these searches, it needs to earn money, it requires specifics on time.
Today, perhaps, only Apache Hive can be considered a classic and reliable tool for working with SQL queries on distributed data, like HDFS is a classic among cluster file systems (yes, there is of course also GusterFS, CEPH). Nevertheless, many are switching to Spark SQL, because this framework is written (as one would like to believe), taking into account the interests of the business. Also, HBase, Casandra, Mahout, Spark MLLib are increasingly being used. However, to demand that every developer and / or sysadmin freely orientates himself in all these tools is stupid. This is a profanation. Technology - deep, with a bunch of settings, and each requires a deep dive for a month. Do not rush - quality will inevitably suffer because of the race for quantity.

What to read


First of all I want to recommend to everyone who works or intends to start working with parallel algorithms and MapReduce, read the book “Mining of Massive Datasets”, which is in the public domain. It must be read several times, with a notebook and a pencil, otherwise porridge in your head can not be avoided. At first, nothing will most likely be clear (I began to open from 3 times). But this is one of the basic and, importantly, accessible to engineers who do not have a black belt in mathematics, a book on algorithms for working with big data. In particular, chapter 2.3.6 is devoted to relational algebra and methods of projecting its operations on MapReduce. Very useful material, in fact, here are ready-made tips for developers, it’s enough just to implement them carefully.

Reading a literature full of mathematical details, remember the joke and smile :-)
 Two of them fly in a balloon, fall into the fog, get lost.  Suddenly their
 presses to the ground and they see a man.  One of them shouts down: "Where
 we? ". The person, having thought, answers:" You are in a balloon ... ". Another
 in a gust of wind, the orb flies high.
 - He's an idiot?
 - No, a mathematician.
 - ??????
 - First, he thought before answering.  Secondly, he gave absolutely
 the exact answer.  And thirdly, this answer is completely unnecessary.


Goodies Apache Spark


• DAG (directed acyclic graph) vs Hadoop MapReduce vs Hadoop Streaming. You can write a large SQL query, which consists of several MapReduce operations in a chain, which will be executed correctly. Streaming is implemented in Spark much better than in Hadoop, they are much more convenient to use and often work more efficiently by caching data in memory.
• Spark Programming Guide + API. Documentation is very sensible and useful.
• Can be programmed in Java. C ++ was considered a difficult language, but Scala is much ... no, not more difficult, rather, in my opinion, more highly intelligent. Scala is a cool language, despite some academic rottenness and inorganic communication with the living dead with widely bulging eyes like Haskel. I really love Scala, but you can go crazy about it, and the compilation time leaves much to be desired for yourself and your children. Therefore, if you wish, you can connect to Spark from both Java and Python and from R.
• Convenient abstraction: Resilient Distributed Dataset (RDD). A beautiful, simply divine concept from the world of functional programming, allowing you to parallelize files of enormous size - into hundreds of gigabytes, or even terabytes.
• Convenient collections: filter, map, flatMap. A convenient approach to Spark is collections and operations on them: filter, map, flatMap. Where does this come from? This came from functional programming, which Scala is now actively preaching (and not only in it).

Java7 and Spark


Historically, we write to Spark in Java 7, and not in Java 8. Unfortunately, there is no normal support for functional objects in the G-7, so we are forced to do sadomasochism and create objects like PairFunction, Tuple2, TupleN. In general, get ready - when Java7 integrates with Scala, you get a terribly unreadable code. I, of course, exaggerate a little, but everything is mixed in it and I want to wear glasses with 13 eyepieces.
JavaRDD<String> combinedRowsOrdered = combinedRows.mapToPair(new PairFunction<String,String,String>() {… public Tuple2<String,String> call( String row ) { …return new Tuple2<String, String>… 

If you do not want to climb into the jungle of Scala, then better use Java 8. The code is more readable and shorter.

More about Scala


The name Scala is derived from English scalable. It is a language for developing large component applications created by mathematicians. I personally have the impression that Scala is a trend. The trend of reviving functional programming (hello again Haskel, F #, ...). Somehow “suddenly” it turned out (although scientists had guessed about it much earlier) that processing data arrays in the Data-Parallel paradigm is more convenient in the functional style, wow! :-) Spark actively uses Scala Actors (hello Erlang). They allow you to write simple, readable, single-threaded code that runs on a large number of servers. You get rid of the risks of multi-threaded programming, which are forced to engage in Java - it is difficult, long and expensive (but cool). In addition, due to the complexity of multi-threaded programming, many errors occur. And thanks to the actors, life “suddenly” becomes simpler.

Spark cluster


For deployment in Amazon, Spark offers us a script called Spark-es2. He downloads half the Berkeley repository, does something on Ruby under the hood (hello Japan) and installs a bunch of some kind of software. The resulting system is quite fragile, sensitive to changes in the configuration of machines. There are also complaints about logging and updating system components.
For a while we existed with the Spark-ec2 script, but it turned out that it was better to write the Spark installer ourselves. In addition, the installer will be able to take into account the ability to connect new spot-machines.
All this is painful for us, because we do not have a large staff of system administrators - we are more programmers. If we had 30 system administrators, I would say: “Guys, I will program on Scala, and you are here, please do not sleep at night and work in clusters Spark. A much more attractive option was a bunch of Spark and Elastic MapReduce. Also, colleagues praise the solutions with Spark from Cloudera and HortonWorks - maybe they will also be useful to you.

Amazon EMR


Amazon offers us not to waste time and deploy a Spark cluster in them using the ElasticMapReduce service. Here almost everything will work out of the box. Spark is integrated into the Yarn-cluster, there is a lot of software, there is sub-monitoring, you can add machines, scale HDFS, change cluster size, increase and decrease the number of tasks due to spot-machines. Spot machines in Amazon are 5-10 times cheaper. We always use them because it is convenient, cheap and fast.

Spark in EMR is professionally integrated with S3. That's right, because that's where you'll most likely be storing files in Amazon. We compared the storage of large files in S3 and HDFS, and it turned out that the access speed is about the same. Then why bother with HDFS, suffer with a cluster file system, if there is a ready service. Also in Elastic MapReduce to the giblets of Sparlk / Hadoop you can prokinut through ssh-tunneling of the web admin and get used to them (although I'm not used to it).

Amazon EMR Cost


It turns out a little more expensive than conventional cars, the difference is about 10%. At the same time, you get a lot of “bundled”, though a bit buggy software (Hue is the most buggy) and the ability to scale the cluster. At the same time, you don't even need an admin - you, as developers, are kings and gods there.

Types of cars


Cars here are of three types:
• Master-machines that control the entire cluster in general. Spark Master is installed on them.
• Core-machines on which the file system is deployed - HDFS. There may be several of them. However, it is recommended only to increase the number of core-machines, and not to reduce, otherwise the data is lost.
• For everything else, task machines are used. These are the usual Spark servers on which the workers work. The number of spot-machines can be freely changed, creating a park of at least hundreds of cars.

Soft


• Spark. In previous versions of Amazon images, spark.dynamicAllocation.enabled is not yet supported, so you have to tell yourself how many machines are needed to complete the task. If the cluster is partially idle, then Spark will not take the remaining machines to run. You have to strictly prescribe how many cars he needs. It is not comfortable. But since AMI 4, this feature is already working.
• Hadoop / Yarn / HDFS. In Yarn-clusters, as in Oracle, a lot of settings are used, and, for good, you need an admin who is very good at this. But, despite the pain, Hadoop-clusters confidently solve their problems. Most of all I do not like in Yarn and Hadoop that there is a mess with logging. Everything is written into logs, the settings for logging levels are scattered in different parts of the cluster giblets and therefore their number grows very quickly. And there is no normal solution to this problem. It would be nice to learn from the good old inhabitants of unix - for example, from mysql, apache.
• Ganglia. This is a time-series software that builds graphs on various metrics: load, number of tasks, etc. It helps to get an idea of ​​the cluster status. A handy thing, but there are drawbacks - the “dead” spot cars continue to hang and overload the graphics.
• Hive. This is support for SQL commands that works on files in HDFS and S3. A good tool, but sometimes its capabilities are not enough. We use But when you need more, we go to Spark and directly solve problems using relational algebra.
• Pig. We do not use it, so I find it difficult to give any assessment.
• Hbase. Option NoSQL, while not using.
• Impala. A very interesting thing about which you can write a separate post. While it seems like a raw software. So use at your own risk.
• Hue. This is the admin panel for bigdate written in Python. Its GUI allows you to combine Impala, Hbase, Pig, and Hive together. That is, you can make your analyst corner on the web :-) I used it for a week, it began to fail, hang, then it stopped opening at all - in general, unfinished software

The main problems Spark, with whom we met


Memory drops


What is a Map? We take something, we scatter on the keys, and we scatter them on the cluster. Nothing should fall here - algorithmically.
What is Reduce? When in one worker the data grouped in one key is collected. If it is good to program, then it is possible to transfer in portions to the reducer all the values ​​inside one key and nothing will fall. But in practice it turned out that Spark could fall at different points - then the buffer was not enough for serialization, then the memory was not enough for the worker. And, in my opinion, this is the main problem. But you can still tidy up. We Spark now does not fall, although we have achieved this with the help of magic.

Be sure to set a reasonable Executor Memory: --executor-memory 20G, --conf spark.kryoserializer.buffer.mb = 256, --conf spark.default.parallelism = 128, --conf spark.storage.memoryFraction = 0.1
KryoSerializer allows you to compress objects (spark.serializer org.apache.spark.serializer.KryoSerializer). Without this, they consume much more memory. I also do not recommend reducing the value of the constant spark.default.parallelism = 128, otherwise it can often fall from memory. As for memoryFraction, we do not use caching.

Uploading results


Suppose you need to upload data from a Spark to a model. If the volume is large, then it will take a very long time.
• Thanks to --driver-memory 10G, you understand what you are unloading from the driver.
• When using Collekt (), the entire result is collected in the memory in the driver and it may fall. Therefore, I recommend using toLocalIterator (). Alas, its performance is very very low. So we had to write the code to build the partitions. Who cares, I will tell more.

Logging


This code is the only thing that helped us deal with the problem of logging:
 export SPARK_LOCAL_DIRS="/mnt/spark,/mnt2/spark" export SPARK_WORKER_DIR="/mnt/spark_worker" export SPARK_WORKER_OPTS="-Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.interval=1800 -Dspark.worker.cleanup.appDataTtl=259200" #Worker executor stdout/stderr logs spark.executor.logs.rolling.maxRetainedFiles 2 spark.executor.logs.rolling.strategy time spark.executor.logs.rolling.time.interval daily 


Conclusion


I hope it was both useful and interesting. Parallel algorithms on MapReduce more and more actively in our life. There are few of them, they are looking for them, but there seems to be no other way out (well, maybe something will turn out to count faster on the Apache Giraph and TensorFlow and through the Task-Parallel paradigm). The classic platform, Hadoop MapReduce, gives way to the Apache Spark platform, functionally written in modern language and mathematics. Most likely you will be forced to begin to understand, at least at the level of terms, the inhabitants of the Hadoop-zoo: Hive, Pig, HDFS, Presto, Impala. But constantly learning is our everything and in order to be ahead of the competitors you need to know more, write faster and think more brightly. Good luck to everyone and Happy New Year!

Source: https://habr.com/ru/post/273279/


All Articles