📜 ⬆️ ⬇️

Apache Spark as the core of the project. Part 1

Hello colleagues.

Recently, we have a Spark on the project. In the development process, we face many difficulties and learn a lot of new things. I would like to systematize this knowledge for myself, and share it with others for one thing. So I decided to write a series of articles about using Apache Spark. This article is the first, and it will be introductory.

So, about Spark itself has already been written quite a lot, including at Habré itself one and two times . Therefore it is necessary to repeat a little.

Apache Spark is a framework with which you can create applications for distributed data processing. For its part, Spark provides a software API for working with data, which includes: loading, saving, transforming, and aggregating, plus many small things, such as the ability to run locally for the purposes of developing and debugging code.
')
In addition, Spark is responsible for the distributed execution of your application. He himself scatters your code across all nodes in the cluster, breaks it down into subtasks, creates an execution plan and monitors success. If any node fails, and some sort of subtask fails, it will definitely be re-started.

The flexibility of Spark lies in the fact that your applications can be run under control of different distributed systems:


All these systems have their advantages, which are relevant for all sorts of tasks and requirements.

Why does Spark become # 1?


Let's see why the popularity of Spark has been growing lately, and why it began to crowd out the good old Hadoop MapReduce (hereinafter referred to as MR).

It's all about the new architectural approach, which greatly benefits in performance from classic MR applications.

The point is this: MR began to be developed in the 2000s, the code memory was expensive, and 64-bit systems have not yet taken over the world. Therefore, the developers then followed the only correct path - they implemented the exchange of intermediate data through a hard disk (or to be precise, through the distributed file system HDFS). That is, all intermediate data between the Map and the Reduce phases were reset to HDFS. As a result, a lot of time was spent on disk I / O and data replication between the Hadoop nodes of the cluster.

Spark appeared later, and in completely different conditions. Now intermediate data is serialized and stored in RAM, and data exchange between nodes occurs directly, through the network, without unnecessary abstractions. It is worth saying that disk I / O is still used (at the shuffle stage). But its intensity is much less.

In addition, the initialization and launch of Spark tasks are now much faster due to JVM optimizations. MapReduce launches a new JVM for each task, with all the ensuing consequences (downloading all JAR files, JIT compilation, etc.), while Spark on each node keeps the running JVM while managing the launch of tasks through RPC calls.
Finally, Spark handles RDD abstractions (Resilient Distributed Dataset), which are more versatile than MapReduce. Although for justice I must say that there is Cascading. This is a wrap over MR, designed to add flexibility.

In addition, there is one more, very important circumstance - Spark allows you to develop applications not only for batch processing tasks (batch processing), but also for working with data streams (stream processing). While providing a unified approach, and a single API (albeit with minor differences).

And what does this look like in code?


Spark API is well documented at the office. site , but for the integrity of the story, let's briefly look at it with an example that you can run locally:

Calculate for each user the total number of sites visited by him. And return the Top 10, in a sorted descending form.

public class UsersActivities { public static void main( String[] args ) { final JavaSparkContext sc = new JavaSparkContext( new SparkConf() .setAppName("Spark user-activity") .setMaster("local[2]") //local -     . .set("spark.driver.host", "localhost") //      ); //      sc.textFile("users-visits.log"); //        parallelize();   List<String> visitsLog = Arrays.asList( "user_id:0000, habrahabr.ru", "user_id:0001, habrahabr.ru", "user_id:0002, habrahabr.ru", "user_id:0000, abc.ru", "user_id:0000, yxz.ru", "user_id:0002, qwe.ru", "user_id:0002, zxc.ru", "user_id:0001, qwe.ru" //,    :) ); JavaRDD<String> visits = sc.parallelize(visitsLog); //    :  (user_id),  (1 -   ) // (user_id:0000 : 1) JavaPairRDD<String, Integer> pairs = visits.mapToPair( (String s) -> { String[] kv = s.split(","); return new Tuple2<>(kv[0], 1); } ); //     user_id JavaPairRDD<String, Integer> counts = pairs.reduceByKey( (Integer a, Integer b) -> a + b ); //  Value    10  List<Tuple2<String, Integer>> top10 = counts.takeOrdered( 10, new CountComparator() ); System.out.println(top10); } // ,    Serializable.  (   ),   //SparkException: Task not serializable //http://stackoverflow.com/questions/29301704/apache-spark-simple-word-count-gets-sparkexception-task-not-serializable public static class CountComparator implements Comparator<Tuple2<String, Integer>>, Serializable { @Override public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) { return o2._2()-o1._2(); } } } 

Yes, it is worth saying that the Spark API is available for Scala, Java and Python. But all the same, it was originally designed specifically for Scala. Anyway, we have Java 8 in our project and in general we are quite satisfied. Go to the rock until we see no point.

In the next article we will look at stream processing in detail, why it is needed, how it is used in our project, and what SparkSQL is.

Source: https://habr.com/ru/post/271375/


All Articles