📜 ⬆️ ⬇️

Hadoop, part 3: Pig, data processing

des-48-5

In a previous publication, we discussed in detail the process of collecting data using the specialized tool Flume. But in order to fully work with information, it is not enough just to collect and save it: it needs to be processed and something necessary and useful is extracted from it.

Hadoop uses MapReduce technology to process data.

')

MapReduce technology



Story



Data processing in Hadoop is performed using MapReduce technology. Initially, this technology was developed by Google in 2004.

Google developers Jeffrey Dean and Sanjay Gemavat in 2004 published an article in which they proposed the following solution for handling large amounts of raw data (indexed documents, query logs, etc.): a huge array of information is divided into parts, and the processing of each of These parts are charged to a separate server. As a rule, data are processed on the same servers where they are stored, which allows speeding up the processing process and avoiding unnecessary data movement between servers. After that, the results are combined into a single whole.

The Google specialists in the article mentioned above limited themselves to a description of the basic algorithms, not stopping at the implementation details. However, this information turned out to be enough for Hadoop developers to create their own MapReduce framework.

Today it is used in many well-known web projects - Yahoo !, Facebook, Last.Fm, and others.
Consider the architecture and operation of Hadoop MapReduse in more detail.

Architecture and working principles



The architecture of MapReduce is built on the principle of “master-slave” (master-workers). The main role is JobTracker, which distributes tasks to subordinate nodes of the cluster and monitors their execution.

Architecture and working principles

Data processing is divided into the following steps:
  1. Application launch: transfer of application code to the main (master) and subordinate nodes (workers);
  2. The wizard assigns specific tasks (Map or Reduce) and distributes the parts of the input data to computing nodes (workers);
  3. Map nodes read the input data assigned to them and begin processing them;
  4. Map nodes locally store intermediate results: each node saves the result to local disks;
  5. Reduce-nodes read intermediate data from Map-nodes and perform Reduce data processing;
  6. Reduce-nodes save the final results in the output files, usually in HDFS.


Creating applications for MapReduce is quite laborious. Writing all the functions, compiling and packaging takes a lot of time. To facilitate the work of the company Yahoo! developed a specialized tool called Pig, which increases the level of abstraction in data processing.

Pig


Pig consists of two parts:


The Pig script includes a series of operations (transformations) that must be applied to the input data to get the output data. These operations describe the data stream, which is then converted (compiled) by the Pig execution environment into an executable representation and launched for execution. In the internal implementation, Pig transforms the transformations into a series of MapReduce tasks.

Initially, the Pig was created to work from the console (the Grunt Shell shell). In the implementation from Cloudera, Pig is handled via a simple and convenient web interface. You can open it through the already familiar to us interface Hue http: // [node_in__which_installed_Hue]: 8888 / pig /

Pig

The web interface includes a full-fledged editor (there is even an automatic substitution of operators) and a script manager. With it, you can save scripts directly in Hue, run them, view a list of running tasks, results and run logs.

Test task


As a test task, we will process the access logs of our repository for a certain day (day). Calculate the following parameters:

Below is a script that solves the problem. Immediately it should be noted that this script (like all scripts in Pig) is not executed line by line, as in interpreted languages. The Pig compiler parses dependencies and sets up data streams. Script compilation starts from the end, that is, from the STORE command. For data, after processing of which there is no save command, no tasks will be created and the data itself will not even be read. This allows you to write a script in a fairly arbitrary form, all the work on optimization, determining the order of execution and parallelization will take on Pig.

The full listing of the script will look like this:
  records = LOAD '/ log / flume / events / 14-02-20 /' USING PigStorage ('\ t')
 AS (
 date: chararray,
 clientip: chararray,
 clientport: chararray,
 proto: chararray,
 statuscode: int,
 bytes: int,
 sq: chararray,
 bq: chararray,
 request: chararray);

 count_total = FOREACH (GROUP records ALL) GENERATE COUNT (records);

 count_ip = FOREACH (GROUP records BY clientip) GENERATE group AS ip, COUNT (records) AS cnt;
 top_ip = ORDER count_ip BY cnt DESC;

 filtered_req = FILTER records BY statuscode == 200 OR statuscode == 206;
 count_req = FOREACH (GROUP filtered_req BY request) GENERATE group AS req, COUNT (filtered_req) AS cnt, SUM (filtered_req.bytes) AS bytes;
 top_req = ORDER count_req BY bytes DESC;

 % declare DT `date +% y% m% dT% H% M`
 STORE count_total INTO '$ DT / count_total';
 STORE top_ip INTO '$ DT / top_ip';
 STORE top_req INTO '$ DT / top_req'; 


It consists of three parts: data loading, processing and saving. This order is common to most tasks. In some cases, solving problems may include additional steps — for example, generating data (for example, structured artificial data for testing an algorithm) or storing intermediate results of calculations. A detailed description of the syntax, data types and operators can be found in the official documentation.

Consider each stage in more detail.

Loading


  records = LOAD '/ log / flume / events / 14-02-20 /' USING PigStorage ('\ t')
 AS (
 date: chararray,
 clientip: chararray,
 clientport: chararray,
 proto: chararray,
 statuscode: int,
 bytes: int,
 sq: chararray,
 bq: chararray,
 request: chararray); 


We use web server logs as input. For a better understanding of further processing, we give an example of input data:

  07 / Dec / 2013: 20: 05: 13 95.153.193.56 37877 http 200 1492030 0 0 GET /745dbda3-894e-43aa-9146-607f19fe4428.mp3 HTTP / 1.1
 08 / Dec / 2013: 15: 00: 28 178.88.91.180 13600 http 200 4798 0 0 GET /public/cars/bmw7l/down.png HTTP / 1.1
 08 / Dec / 2013: 15: 00: 29 193.110.115.45 64318 http 200 1594 0 0 GET /K1/img/top-nav-bg-default.jpg HTTP / 1.1 


First, consider the data model and terminology. The main object in Pig Latin is the “ relation ”. It is with relationships that all language operators work. In the form of relationships are input and output.

Each relation is a collection of similar objects - “ tuples ” (tuples). Analogs in the database: a tuple is a string, a relation is a table.

Tuples, respectively, consist of numbered or named objects - " fields ", arbitrary base types (number, string, Boolean, etc.).

So, in Pig Latin, the result of any operator is a relation, which is a collection of tuples.
The LOAD statement creates a records relationship from files in HDFS from the '/ log / flume / events / 14-02-20 /' directory, using the standard PigStorage interface (also note that the separator in the files is the tab character '\ t'). Each line of the files will appear in a tuple relation. The AS section assigns to the fields in the tuple the types and names by which it will be more convenient for us to refer to them.

Treatment


Calculate the total number of entries in the logs using the COUNT operator. Before this, it is necessary to combine all the rows in records into one group with FOREACH and GROUP operators.

  count_total = FOREACH (GROUP records ALL) GENERATE COUNT (records);
 count_ip = FOREACH (GROUP records BY clientip) GENERATE group AS ip, COUNT (records) AS cnt;
 top_ip = ORDER count_ip BY cnt DESC; 


Translated from Pig Latin into a natural language, the following script looks like this: for each record (FOREACH), from records grouped together (GROUP ALL), count records in records (GENERATE COUNT).

Now count the number of requests from unique addresses. In our tuples with respect to records, the clientip field contains the IP addresses from which requests were made. Group the tuples in records by the clientip field and define a new relation consisting of two fields:


Next, we define another top_ip relation consisting of the same data as count_ip, but sorted by the cnt field by the ORDER operator. Thus, in top_ip we will have a list of client IP addresses from which requests most often occurred. In the future, we can bind this data to GEO-IP and see in which cities and countries our storage is most popular =)

  filtered_req = FILTER records BY statuscode == 200 OR statuscode == 206;
 count_req = FOREACH (GROUP filtered_req BY request) GENERATE group AS req, COUNT (filtered_req) AS cnt, SUM (filtered_req.bytes) AS bytes;
 top_req = ORDER count_req BY bytes DESC; 


After that, we calculate the number of successful requests for each URL, as well as the total amount of data downloaded for each URL. To do this, we first use the filter operator FILTER, selecting only successful requests with HTTP codes 200 OK and 206 Partial Content. This statement defines the new filtered_req from the records relationship by filtering it across the statuscode field.

Further, similarly to the calculation of IP addresses, we count the number of unique URLs, grouping the records for requests by the request field. We are also interested in the transferred data volume for each URL: it can be calculated using the SUM operator, which adds the bytes fields in the grouped records of the filtered_req relationship.

Now we will sort by the field bytes, defining a new relation top_req.

Saving results



  % declare DT `date +% y% m% dT% H% M`
 STORE count_total INTO '$ DT / count_total';
 STORE top_ip INTO '$ DT / top_ip';
 STORE top_req INTO '$ DT / top_req'; 


It is preferable to save the results of each script execution to a separate directory, the name of which includes the date and time of execution. To do this, you can use the function to call an arbitrary shell command directly from the Pig script (you need to write it in backquotes). In the example, the result of the date command is entered in the variable DT, which is then substituted in the data storage path. Save the results with the STORE command: each relationship is in its own directory.

You can view the output through the file manager in Hue; By default, the path in HDFS is relative to the home directory of the user running the script.

File browser

Information on the results of the tasks will be displayed in the Pig logs as follows:
  http: // cdh3: 8888 / pig / # logs / 1100715
 Input (s):
 Successfully read 184442722 records (32427523128 bytes) from: "/ log / flume / events / 14-02-20"

 Output (s):
 Successfully stored 1 records (10 bytes) in: "hdfs: // cdh3: 8020 / user / admin / 140225T1205 / count_total"
 Successfully stored 8168550 records (1406880284 bytes) in: "hdfs: // cdh3: 8020 / user / admin / 140225T1205 / top_req"
 Successfully stored 2944212 records (49039769 bytes) in: "hdfs: // cdh3: 8020 / user / admin / 140225T1205 / top_ip"

 Counters:
 Total records written: 11112763
 Total bytes written: 1455920063 


Report from Oozie:

  Last Modified Tue, Feb 25, 2014 00:22:00
 Start Time Tue, Feb 25, 2014 00:05:16
 Created Time Tue, Feb 25, 2014 00:05:15
 End Time Tue, Feb 25, 2014 00:22:00 


From the above logs, it is clear that during the execution of the test task, more than 180 million records were processed with a total volume of more than 32 GB. The whole processing procedure took about 15 minutes.

During the active phase of the Map, 22 processor cores and 91GB of RAM were used. For a small cluster consisting of three servers five years ago, this result can be considered quite good.

As mentioned above, Pig creates MapReduce tasks during script execution and sends them to the MR cluster for execution. This process is graphically shown in the statistics graphs in the Cloudera Manager control panel:

Home - Cloudera Manager 1

Activities: mapreduce1

  1. Stage Map: The processors and disks at each node are busy processing their pieces of data.
  2. Stage Reduce: the results obtained in the first stage are transmitted over the network and merged.
  3. At the third stage, the results are saved in the file system (a jump in HDFS is visible on the graph).


On the graphs you can see that the solution of the problem included two passes of MapReduc. During the first pass, unique records were counted, and during the second, sorting. These procedures cannot be parallelized and executed in a single pass, since the second procedure works with the results of the first.

Conclusion


In this article, we talked about the architecture of MapReduce, and also discussed the features of its work using the Pig tool. Writing programs for MapReduce is a very difficult task that requires a special approach. All the difficulties, however, are compensated by power, scalability and high speed processing of huge amounts of arbitrary data.

In the near future we plan to continue the cycle of articles on Hadoop. The next post will be about working with the Impala database.

Readers who are not able to comment on posts on Habré, we invite to our blog .

Source: https://habr.com/ru/post/215307/


All Articles