⬆️ ⬇️

Apache Spark - advantages, disadvantages, wishes

I have long wanted to express my impressions about Apache Spark, and then I caught sight of this article from Pivotal employee Robert Bennett , published recently, on June 26, 2018.



It will not be a translation, but rather all the same my impressions and comments on the topic.



What makes Spark popular?



Quote:

It's easy to see why Apache Spark is so popular. It is in-memory and it is particularly useful when working with machine learning algorithms. I would like to write, I can use it using iterative algorithms, painfully slow.

To begin with, this is all for the most part not entirely true. In memory? Well, yes, Spark will try, but what is written here about other tools will also take place. In the end, memory, processor cores and the network are limited resources, so that sooner or later any tool rests on their limits.



In a sense, Spark is never in-memory than any classic map-reduce. Anyway, the data should still either be on the disk (among other things, it will allow us to reliably survive the errors and not start the calculations from the very beginning), or be transmitted over the network (shuffle and other processes). I'm not saying that you, as a programmer, will not be able to save persist, and save intermediate results to disk if you suddenly want to. Do you want to save them in memory, if we say a terabyte of data? I doubt it.

')

I would rather say that unlike other tools (which usually means the classic map-reduce), Spark allows you to think a little less about optimal use of resources, and more optimizes this use by yourself. And the final speed, in the end, depends more likely on the straightness of the hands of the person who is writing the program.



Further, the author lists such qualities Spark, which seem to him the best:



Attractive API and lazy execution (Appealing APIs and Lazy Execution)



In general, I agree with this. Spark as a development tool is much more convenient than the classic map-reduce, and somewhat more convenient than tools like Apache Crunch and other tools from the conditional "second" generation. It is also somewhat more flexible than, for example, Hive, and is not limited to SQL per se.



Lazy performance is not always good. Sometimes it would be better if we say differences in the Hive and DataSet schemes were diagnosed not when all the data were processed, but a little earlier, and everything fell not in a couple of hours / day, but at startup.



Easy Conversion (Easy Conversion)



Here the author basically meant conversions between the Spark and Python / Pandas structures. I am far from this, so I will not speak. Perhaps about pySpark I will tell slightly below.



Easy Transformations



Another asset of Spark is the “map-side join” broadcast method. It’s not a problem. It doesn’t need to be gone. This also helps mitigate problems from skew. In the case of the keys, it will be a lot of effort.

I do not know what they have there in python, but in our area a map-side join is easily done either with bare hands, or with any of the tools of the Crunch type. I do not see any particular advantages in this, many people are able to do it, and Hive, for example. With de facto absence of indices in the Hadoop map ecosystem, side join is probably one of the main join optimization tools in general.



API for transformation is quite convenient, although not homogeneous. Say, the “old” RDD API, being probably a bit more flexible, gives at the same time more room for committing an error, especially if you are working not at the level of classes of a fixed structure (Java Beans), but Row and with a flexible data structure. The discrepancy between real and expected Spark schemes is quite common in this case.



As for the DataSet API, I would say that it is very good. After some practice, it is quite possible to write everything on it as easily as in SQL, supplementing it with your UDF, and seeking greater flexibility. UDFs themselves are easier to write than for Hive, and some difficulties arise only when returning from them complex data structures (arrays, map, struct), and then in Java, and rather because structures are expected for Scala.



Let's say I was able to quite easily use, in the form of a UDF, such a thing as the Java port of pymorphy2. Or a geocoder. In essence, all you need is to correctly initialize your UDF, keeping in mind the features of Spark serialization.



But API Spark ML, on the other hand, looks like it was designed by completely different people. This does not mean that he is bad - he is just different.



Open Source Community



Spark has a massive open-source community behind it. Add-on packages. For example, a team has been developed for a Spark. It has been shown that it can be used as the Natural Language Toolkit.

There is nothing to add here. The community is really big, skilled and friendly. Writes a huge number of extensions for Spark.



Let us leave the next passage about slow UDF on the conscience of a pitonist - Scala / Java UDF is not so slow at all, and at the same time very convenient.



What I would add from myself:



Development in different languages



Probably one of the reasons for the popularity is the support of several development languages ​​(Scala, Java, Python and R). By and large, the API for different languages ​​is about equally convenient, but I would not call this support ideal. Say, when launching your Spark application, you immediately choose between Java / Scala and Python, and you cannot combine languages ​​in one launch. Thus, integration between parts of the application on pySpark (on which ML or NLP parts are often written), and Java / Scala is really possible only through files / databases. Well, or something like Kafka, REST, and so on. Options.



Streaming



Spark Streaming (not to be confused with Hadoop Streaming, which is completely different) is another attractive part of Spark's capabilities. If we describe it in one sentence, then it is processing streaming data, for example, from Kafka, ZeroMQ, etc. by the same means as the data taken from the database.



The beauty is precisely that the means are the same, i.e. you practically do not have to change anything in the program to start processing data from Kafka. Neither map reduce, nor Crunch, nor Cascading will allow any such trick to you.



disadvantages



Everyone has their own shortcomings (s). What problems can you encounter when working with Spark?



Cluster management



Spark is notoriously difficult to tune and maintain. It doesn’t mean that it can work. If your cluster is not expertly managed, this can be negated. It makes it very common to make it possible for users to concurrent users.

Did someone promise? Actually, I already wrote above that everything is fine and just can be exactly in one case - if you either have a problem that is not very large, or you have plenty of resources - or in other words, the task is not too complicated.



In other cases, which are obviously the majority, Spark applications need to be tweaked, tuned and maintained.

Do you go with fixed or dynamic memory allocation? Spark to use? How much memory does each get executor get? How many partitions should Spark use when it shuffles data? Workloads is difficult.

Say, it would seem a relatively simple task of choosing the number of executors. In principle, knowing something about your data, you can safely calculate this number. But in conditions when resources are used not only by you, everything becomes much more fun. If your process also includes calls to other applications, then ...



For example, I have an application, part of the functionality of which is reverse geocoding. And it is engaged in a separate server ArcGIS. At the same time, ArcGIS has only 4 cores at its disposal, and the Hadoop cluster, where Spark runs, has dozens of nodes, in the end, if we just allocate Spark to just 8 executors, then the ArcGIS processor load curve jumps to 100%, where it remains on a couple of hours of application. And if we shift this task to Spark (by rewriting the application code in advance), then the work time is reduced by a couple of orders of magnitude - due to the fact that we can use the cluster resources for this task too.



That is, we often have a bottleneck, where either a fixed amount of resources is allocated, or these resources are managed in a different way (which Spark cannot influence). Accordingly, expecting Spark to optimize the use of these resources would be naive.



Debugging



It's true. Expected, however. We have a distributed parallel system, debugging and monitoring of which is a non-trivial task. SparkUI to some extent solves the issues of observation, and Spark Metrics measures performance, but try, say, to connect to the executable application with a debugger — you don’t know the host where it works, nor the port that is free to connect. The same metrics that can be easily obtained for example from a JMX for a normal application, in the case of a distributed application, should be transmitted over the network, and only then can they be collected. Yes, this is all relatively bad.



Poor UDF performance in PySpark (Slowness of PySpark UDFs)



Well what can I say? For what they fought, they ran into it (c). As far as I understand, UDF on python leads to the fact that there is a double conversion of data between the application and the UDF. Just because python is still an alien language for the Spark JVM ecosystem and the UDF is executed outside of it.



Here you can advise only one thing - do not write on python, write on Scala / Java. It is clear that this advice is not always desirable and can be followed, but I am afraid that only Graal will be able to solve this problem globally, when his version of the python is brought to the industrial level.



Difficult to guarantee the maximum level of parallelism (Hard-to-Guarantee Maximal Parallelism)



It is difficult to ensure that Spark parallel computations as much as possible. It tries not to scale up on its own. You may not need to use it if you need it. Also, Spark divides RDDs (Resilient Distributed Dataset) / DataFrames into partitions, which takes an executor to take. If you are not too much part, then there may be enough chunks of work. Also, fewer partitions means larger partitions, which can cause executors to run out of memory.

If only it were that simple. Let's start with a simple one - the parameters to start should be typed for each particular cluster. A prod cluster can have an order of magnitude more nodes, and many times more memory available on each. The settings for the Dev cluster will probably be understated when running on Prod. All this is even more complicated if you start to take into account the current cluster load tasks. In general, this task of allocating cluster resources is an optimization problem, rather nontrivial, and does not have the only correct solution.



If there are few partitions, then concurrency is insufficient. And if there are too many of them - then the size of each may be lower than some conditional lower limit, like the size of a HDFS block. Since each task is resources spent on its launch, there is obviously a lower limit to the size of the task, below which you should not fall, because overhead costs grow faster than productivity.



A simple example is an application that needs some significant amount of reference books. If in the case of the “normal” map-reduce task on Hadoop, we usually deliver the code to the data, i.e. we copy our application + Spark parts to the cluster nodes where our file (s) is located, then the directories - this is already similar to the map side join, and they need to be delivered along with the code. And suddenly, the size of the data delivered to each node grew by a couple of orders — for example, it was 10 megabytes (a small Spark application, without Spark itself), it became for example 20 gigabytes (a very real case, reference books needed for normalizing addresses, phones, and so on. quite pull on such a volume). Well, here it is - the price of excessive parallelism, there is.



Perhaps there is some natural number of partitions, which is determined by the number of blocks into which our input file is divided, taking into account the replication rate. It is likely that this number is close to optimal from the point of view of reading data. That is, if we have three blocks in the file, and each block has copies on 2 nodes of the cluster, then we can in parallel process processing in 6 threads, processing each replica on our node. Of course, Spark takes these parameters into account when dynamically allocating resources.



Unfortunately or fortunately, Spark is not a cluster resource scheduler. It is for example Yarn. So Spark simply may not have enough information to optimally plan the use of all resources.



Not very good integration with Hive



On the one hand, Spark works great with Hive data and metadata. I would say that most of the applications that I came across, and that’s what I’m doing. But not without annoying problems. Say, if you try to use its partitionBy and bucketBy tools in Spark, it is very likely that Hive will not see the results of your work. At the same time, all you get is a vague warning somewhere in the logs.



Compatibility



Unfortunately, my experience speaks on this topic rather bad. We ran into multiple problems when trying to run applications on clusters, where the Spark version was different from the expected one. When developing on Spark 2.2.0, there were problems when running on 2.1 and 2.3.



For example, in our case, Spark for some reason could not find one of the codecs (namely snappy) when running on version 2.3. This is not a serious problem if you need to write data (you can specify a codec when recording and select any, including unpacked data), but if you need to read something that is snappy packed, then you are clearly not lucky.



Perhaps some of the problems were caused by errors in the installer, but this is not much easier. Still, it seems to me that the migration between minor versions would have to be smoother.



Well, alas, but Spark does not imply a full-time parallel installation on one cluster of two different versions of the same line (the same 2.2 and 2.3).



Horrible side



API Awkwardness



Since it really doesn? For example, we consider accessing spark life.

I would not say that working with arrays is so terrible. Some inconveniences are brought by the fact that the Spark API was originally made on Scala, and there its own collection structure, which, working from Java, has to lead to the Scalov one. And so, if you are able to write a UDF, then you are able to do anything you want with arrays. Oh, yes - in the python, everything is bad with UDF, I forget all the time.



Not very convenient and not very effective - yes, it is possible. This is trying to solve a new version of Spark 2.4 today, where new functions of a higher order are introduced for working with complex structures (thus avoiding the use of explode / collect).



In my opinion, a much more uncomfortable side of the API is that, looking at the code, it is not always obvious which part will be executed on the driver, and which part on other nodes. At the same time, the mechanism of distributing code across sites implies its serialization (in one way or another), and the code that runs on executors must be serializable. Dealing with serialization errors you can learn a lot of new and interesting things about your code :).



Classloaders



Unfortunately, the issue of isolating application code from Spark code is not well resolved. However, the same applies to classic map-reduce Hadoop applications. At the same time, Hadoop code uses some ancient versions of such a library as Google Guava, and other libraries are far from new, frankly. If you remember that Guava authors like to introduce backward incompatibility to their API, removing deprecated methods, then we get a completely stupid picture - you write your code under the latest Guava version, run it, and it crashes - either because you really work with the Guava version from Hadoop (much older), and your code does not find methods from the new version, or Hadoop crashes because it is incompatible with the new version. This is a fairly typical, unfortunately problem that every second developer is probably facing. The Apache Http Components library is another example of this problem.



SQL without bind variables



Alas, the typical code for fulfilling a request for a pair looks like this:



val sqlDF = spark.sql ("SELECT * FROM people WHERE id = 1")



The API does not provide an option to execute the request id =? and substitution of parameters at each performance. Well, okay, let's say the authors do not care about the problem of SQL-injection, but the developers should substitute the parameters in the request, respectively, the replacement of special characters is entirely with you and me. For the sake of objectivity, Hive suffers the same way, where it is also impossible to define a query with parameters.



However, what is even funnier, for JDBC sources, formally, you cannot even write a query - you can only specify a table, but not columns. Informally, you can write something like (select a, b, c from d) t instead of a table, but whether it will work in all cases - no one will tell you for sure.



Lack of Maturity and Feature Completeness



Hmm. Another's head is darkness.

This is an optional feature. A sequential, unique column index is helpful for some types of analysis. According to the documentation, it doesn’t guarantee that the IDs are consecutive. Spark's older RDD format.
I do not understand such claims. Source codes are available, and it is quite possible to glance, and at least to read comments:



Returns monotonically increasing 64-bit integers.



  • The generated id is guaranteed to be monotonically increasing.
  • 31 bits, and the lower 33 bits
  • represent the record number within each partition. The data frame has
  • less than 1 billion partitions and less than 8 billion records.


Well, that is, this function simply takes the partition number, and adds a counter to it. , , . Spark — JVM, , JVM.



, id, , ( ), ( ).



Spark 2.4





. .



, map-, ().



.





barier . , . , , Spark map-reduce. — , , .



API — . , Yarn-, Spark . Spark .



Avro



Avro . , « » ( — ), Decimal, Date, Time, Duration .



, , Hive ( Spark ) , . , Avro .



.



Scala 2.12 ()



, , Java , , Java 8, , .

Source: https://habr.com/ru/post/329838/



All Articles