📜 ⬆️ ⬇️

Scalding: a reason to switch from Java to Scala



In this article I will talk about Twitter Scalding - a framework for describing the processing of data in Apache Hadoop. I'll start from afar, with the history of frameworks over Hadoop. Then I will give an overview of the possibilities of Scalding. In conclusion, I’ll show examples of code that can be understood by those who know Java, but are barely familiar with Scala.

Interesting? Go!

Easier than MapReduce


When the MapReduce paradigm was in its infancy, it was a breakthrough step to simplify the development of distributed computing. However, it soon came to realize that it was very tiring to write mappers and reducer manually. To speed up development, high-level superstructures appeared above Map / Reduce - Pig, Hive and Cascading, and then others. Let us dwell on the latter.
')
Cascading is a Java-framework for describing data processing, the so-called. workflow. After the description, Cascading analyzes the workflow like the query analyzers in the DBMS, builds the execution plan as a sequence of map / reduce tasks and sends them to the Hadoop cluster, independently managing their launch and intermediate data. Unfortunately, Cascading operates with rather low-level abstractions; therefore, for a long time, in terms of popularity, it lost to other data processing mechanisms.

From this situation, there was a successful way out. Twitter adapted Cascading to fit its needs and wrapped its abstractions in traditional Scala tools. This is how Scalding - Scala framework on top of Cascading was born. Here you can make a digression and talk about Scala.

Lyrical digression about Scala
Scala is complicated . On the topic of its applicability in industrial development are terrible battles . In this sensitive issue, I will probably join the conservative supporters of Java. But I have to admit that there are things that Scala turns out to do better than in other languages, namely, to process data streams and build interflow interaction. For javista not familiar with Scala, I should note that working with collections , inline processing and functionalism are done in Scala easily and naturally. The Java Stream API and java.util.functional familiar to you are a dull pale copy of standard Scala tools.

So, the attempt to apply the standard approaches of Scala to the description of the workflow of data processing was crowned with success, and Scalding has a chance to catch up with Hive, Pig and their many new-fashioned analogues. It turned out so good that for the sake of it it even makes sense to learn Scala, which we now do.

Introduction to Scalding


Now I deliberately miss all that concerns the internal structure of Scalding and Cascading. We assume that this is a black box with a nice interface that counts some data for us. If everything works out, then there will be another article about the internal structure of these frameworks.

For those who are not familiar with Scala
A type declaration goes through the colon after the name of a variable or function.

A tuple is a few objects held together. A classic example of a tuple is Pair, Triple, etc. In Scala, they are part of the language. Tuples are written in brackets, separated by commas.

Generics are written in square brackets, not in the corner ones.

val longConstant: Long = 0L // final long longConstant = 0L; var list: List[Int] // List<Integer> list; (String, Int) // Pair<String, Integer> 


Flat operations


The main concept in Scalding is Pipe . Pipe is a pipeline from which data flows to the programmer. In fact, this is similar to the Stream from Java 8. The first implementation of Pipe did not support typing, but it did not last long. Fans of strict typing came up with TypedPipe - a pipeline with objects of a strictly specified type, generic, in terms of Java.

For TypedPipe, some standard stream operations are defined — map , flatMap , filter , limit, and others. These are all flat stream operations; theoretically, they can be effectively performed with unlimited parallelism and on any amount of data.

Data in TypedPipe should be read from somewhere. For this, in Scalding there is a Source - data source. Its only purpose is to spawn Pipe or TypedPipe. There are several ready-made sources, most of them are reading from files in various formats, but there is also the ability to read from an arbitrary iterator (and, therefore, from a collection in memory) and, of course, the ability to determine its sources. Importantly, the same Cascading and Scalding code works on both the Hadoop cluster and local data, and it is very convenient for testing.

When all operations are done, it is time to save the data. Sink, the final part of the pipeline, is responsible for writing to the disk in Scalding. Sinks are similar to Source's, often the same class that implements two interfaces.

Grouping operations


MapReduce allows us to reorganize the stream represented by TypedPipe. First of all, it is the groupBy grouping operation , which groups records from the entire stream by key, analogous to GROUP BY in SQL. After grouping, TypedPipe [V] takes on a special form Grouped [K, V] , over which additional operations become available.

First, using the mapGroup and mapValuesStream methods, you can get Grouped [K, V] elements in the form of a pair from the key K, which was grouped, and an iterator for all V values ​​that came to this key. Any functions of the Scala collections are applicable to the iterator by values. But usually this is not even required, because Grouped has many shortcut functions that cover the most popular cases.

Secondly, Grouped can be sorted by the sortBy operation. After that, the mapGroup, mapValuesStream and all their derivatives are also applicable to it.

Thirdly, Grouped [K, V1] can be joined (join) with another Grouped [K, V2]. Here the same rules work as in relational databases - leftJoin , rightJoin , join (inner), outerJoin (full) are available. The output is Grouped [K, (V1, V2)].

It is worth noting that when an ungrouped stream contains TypedPipe [(K, V)] pairs, the hashJoin operation can be applied to it. It is similar to the usual Grouped.join, but is done in memory. This works well for enriching data from small directories, but for large tables it can lead to OOM.

Grouped can be converted back to TypedPipe with toTypedPipe, keys or values ​​operations. The first will save both the key and the value, the rest will return one thing.

Scalding by example


Now, after reviewing the main features of the framework, let's see how this works, using an example.

Suppose we are an RTB site, and we have a history of our users clicking on URLs on observed sites. The story is presented in a huge TSV file with three columns - URL, Timestamp and UserId.

We also have a layout of sites on topics. We do not have a lot of sites, maximum, thousands. All markup is placed in a small TSV file with columns - Domain and Topic.

We want to understand how often the user switches between topics. To do this, we need to leave in the history of clicks only those events when the user navigates from the site of one subject to another site.

Let's write the code that will do this transformation for us. Launch infrastructure will not be considered. If interested, the full sample code is available on github .

In Scala, you can set aliases for types. This is convenient because will allow us to distinguish one String from another in type declarations.

 type Domain = String type UserId = String type Topic = String type Url = String type Timestamp = Long 

Now we will declare classes from the domain model:

 case class Click(url: Url, ts: Timestamp, userId: UserId) case class SiteInfo(domain: Domain, topic: Topic) 

Case class in Scala is a convenient way to describe classes for immutable values. A constructor, getters, and other similar code are automatically generated from it.

Let's read the table with clicks:

 val clicksPipe: TypedPipe[Click] = TypedPipe.from(TypedTsv[(Url, Timestamp, UserId)](pathToClicks)) .map(tuple => Click.tupled(tuple)) 

Here we announced the source - typed TSV with columns of type (String, Long, UserId). Then we wrapped this source in TypedPipe. Further, for convenience, we converted tuples from three columns (Url, Timestamp, UserId) into objects of the class Click.

It turned out TypedPipe [Click].

Let's leave only domains from urlov.

 def url2domain(url: Url): Domain = { return new URL(url).getHost } val domainsPipe: TypedPipe[Click] = clicksPipe .map(click => click.copy(url = url2domain(click.url))) 

Let's read the directory where the domains are divided by topics, and immediately group it in a form suitable for hashJoin.

 val sitesGroupByDomain: Grouped[Domain, SiteInfo] = TypedPipe.from(TypedTsv[(Domain, Topic)](pathToSites)) .map(tuple => SiteInfo.tupled(tuple)) .groupBy(siteInfo => siteInfo.domain) 

Add to the flow of clicks information about the subjects of sites. To do this, join the stream of clicks from the directory of domains.

 val clicksWithSiteInfo: TypedPipe[(Domain, (Click, SiteInfo))] = domainsPipe .map(click => (click.url, click)) .hashJoin(sitesGroupByDomain) 

Group the flow of clicks by users and sort by click timestamp. In addition, we are no longer interested in information about the domain, just enough information about the subject of the site. To do this, we introduce an auxiliary class that reflects the user's active interest in the topic at a time.

 case class TopicActivity(topic: Topic, ts: Timestamp, userId: UserId) val topicActivityStreamPerUser: SortedGrouped[UserId, TopicActivity] = clicksWithSiteInfo .map(tuple => { val (domain, (click, siteInfo)) = tuple TopicActivity(siteInfo.topic, click.ts, click.userId) }) .groupBy(activity => activity.userId) .sortBy(activity => activity.ts) 

The most difficult moment - in the stream of user activities you need to catch the moments of switching topics. To catch switching we will write a function on Scala in Java-style. It accumulates the result in an ArrayBuffer (analogous to an ArrayList), which can potentially lead to OOM on very long stories.

 def topicSwitches(userId: UserId, activities: Iterator[TopicActivity]): Iterator[TopicActivity] = { val result = ArrayBuffer[TopicActivity]() var firstTs = 0l var lastTopic = null.asInstanceOf[Topic] for (activity <- activities) { if (firstTs == 0l || lastTopic != activity.topic) { result.append(activity) firstTs = activity.ts lastTopic = activity.topic } } result.toIterator } val firstTopicActivitiesPipe: TypedPipe[TopicActivity] = topicActivityStreamPerUser .mapGroup((userId, activities) => topicSwitches(userId, activities)) .values 

Only the first activities of each interest remained in the stream. By them it is possible to trace how the user's interest focus changed over time. It remains to write the result to the file.

 firstTopicActivitiesPipe .map(activity => (activity.topic, activity.ts, activity.userId)) .write(TypedTsv(args.required("output"))) 

That's all. We have described a nontrivial transformation of data literally in 40 lines.

The final code in scala-way


If you follow the canonical scala-way, then the code will turn out even shorter. In addition, you can rewrite the search function switching between topics from an iterative approach to a functional one by removing the use of the buffer. Now the process does not fall even at the endless entrance. In theory…

 def topicSwitches(userId: UserId, activities: Iterator[TopicActivity]): Iterator[TopicActivity] = { activities.scanLeft(Helper())((helper, activity) => { if (helper.topic.isEmpty || helper.topic.get != activity.topic) { Helper(Some(activity.topic), activity.ts, true) } else { Helper(helper.topic, helper.firstTs, false) } }).filter(_.firstVisit).map(helper => TopicActivity(helper.topic.get, helper.firstTs, userId)) } TypedPipe.from(TypedTsv[(Url, Timestamp, UserId)](pathToClicks)) .map(tuple => Click.tupled(tuple)) .map(click => click.copy(url = new URL(click.url).getHost)) .map(click => (click.url, click)) .hashJoin( TypedPipe.from(TypedTsv[(Domain, Topic)](pathToSites)) .map(tuple => SiteInfo.tupled(tuple)) .groupBy(_.domain) ) .map({case (_, (click, siteInfo)) => TopicActivity(siteInfo.topic, click.ts, click.userId)}) .groupBy(_.userId) .sortBy(_.ts) .mapGroup(topicSwitches) .values .write(TypedTsv(outputPath)) 

In the following articles, I will deal with the organization of the streaming code and its testing. And, in the end, I will tell you how it all works from the inside.

Disclaimer
I wrote the example code as clear as possible to a Java programmer, avoiding any magic transformations and not saving on bytes. This is to show that you can add a little Scala to ETL processes quickly and painlessly. The code is not optimal. If you know a way to write more effectively - you are well done.



Resources


Full github example code
Scalding wiki
Book “Programming MapReduce with Scalding”

Source: https://habr.com/ru/post/273611/


All Articles