In this example, we will analyze the creation and execution of a standard MapReduce task in the cloud implementation of Hadoop from Microsoft, which is called HDInsight.
In the
previous example, we created a 3-node Hadoop cluster and uploaded an abstract log of a slightly structured format, which is now to be processed. The magazine is generally large (in our concrete example it is small, but it does not affect the principle demonstration of the idea) a text file containing lines with TRACE, DEBUG, INFO, WARN, ERROR, FATAL attributes. Our elementary task will be to count the number of rows with each attribute, i.e. how many times the WARN situation occurred, how many ERROR, etc.
In terms of SQL, you need to make COUNT () ... GROUP BY across the field of the feature. It is clear that there is no field as such, since the file is not a nameplate, but a set of strings with a textual description of the problem, in which the substring with the name of the attribute occurs. It is necessary to run through all the lines, select the substring of the sign and sum up. Simply put, from
2012-02-03 18:35:34 SampleClass6 [INFO] everything normal for id 577725851 2012-02-03 18:35:34 SampleClass4 [FATAL] system problem at id 1991281254 2012-02-03 18:35:34 SampleClass3 [DEBUG] detail for id 1304807656 2012-02-03 18:35:34 SampleClass3 [WARN] missing id 423340895 2012-02-03 18:35:34 SampleClass5 [TRACE] verbose detail for id 2082654978 2012-02-03 18:35:34 SampleClass0 [ERROR] incorrect id 1886438513 ...
Script 1
want to get something like
')
[TRACE] 10 [DEBUG] 20 [INFO] 30 [WARN] 555 [ERROR] 777 [FATAL] 1
Script 2
The idea behind the MapReduce model is very simple. In the presence of a distributed system, which is the Hadoop cluster, the common task is divided (Map) into parallel subtasks. As noted in the previous example, to be processed when stored in the Hadoop file system is transparent to the user is divided into fragments by node. Theoretically, these nodes can be distributed geographically, i.e. be in different geographic locations. To minimize the costs associated with transferring data between data centers (or simply between individual nodes), Hadoop takes into account the territorial proximity of data — each subjob works with its own piece of data. In our case there are only 3 nodes in the cluster, not luxury. Subtasks will be performed on the same nodes where the data fragments are located. The results of the subtasks are then aggregated by the Reduce functions into a single result returned to the user. In other words, each node will give its own private result, for example, the first one -
[TRACE] 1 [DEBUG] 2 [INFO] 3 ...
Script 3
second -
[TRACE] 9 [DEBUG] 5 [INFO] 7 ...
Script 4
from which a total result of Script 2 will be compiled in the end. This is a general idea of parallel processing, which has been implemented in traditional relational database servers (eg, Oracle RAC, Microsoft SQL Server Parallel Datawarehouse, etc.) and relational cloud services data processing (eg, federated database in Windows Azure SQL Database, previously known as sharding in SQL Azure). But in this case, we are not dealing with a relational, but with a weakly structured input data format; therefore, instead of SQL scripts, we will have to write functions that perform the Map / Reduce role on our own. The idea of MapReduce is
implemented in various languages. For example, in the free open-source Apache Hadoop project, Java is used for this purpose. Since Microsoft HDInsight is compatible with Apache Hadoop, we will also use the Java language and the
org.apache.hadoop.mapreduce package.
First, the Map class derived from Mapper is implemented.
The Mapper class converts the original set of key / value pairs to an intermediate one. In our case, the input values are the text log file lines — the value parameter of type Text of the map method. Inside the method in each value, we look for square brackets, pull out what is between them, compare it with a constant set of attributes, which we initially put in the pattern variable and, if it matches (if (matcher.matches ())), we form the output key- value. The key is the substring of the TRACE / DEBUG / ... feature (text variable logLevel), and the value is 1. The value is contained in the accumulator variable of the IntWritable type, which we initialized in the constructor as a unit. IntWriteable is a wrapper around the java int type that implements the Writable interface. Hadoop uses its own serialization format. We will add these ones in the Reduce function to count the number of occurrences of each feature. Intermediate (output) values are grouped by Hadoop for each output key. At the mapping stage, you can pre-aggregate with setCombinerClass to reduce the data. transmitted to the Reducer. In this example, this feature is not used. The Reporter class (the last parameter of the map method) is designed to display the status and progress of the execution, update the counters, etc. In our simple example, it is also not used.
The Reduce class derived from Reducer solves the inverse problem. It collects intermediate mapping results and aggregates them, performing in this case the notorious COUNT () values, since GROUP BY by keys (including sorting) was performed during mapping. Input types (Text, IntWritable) for Reduce must correspond to output types from Map. During the merging of the results in the Reduce stage, Hadoop performs a secondary sorting, since the results obtained from different mappers may have the same keys. Thus, the input result for the Reduce method is a set of strings key - a collection of corresponding values. For example, one of the strings will be a TRACE (key) and a collection of as many units as the occurrences of this attribute identified one or another instance of the mapper. It remains for us to run through the collection and sum up the ones in the count variable. In the OutputCollector, we write the traditional key-value pair, only the value here will be the result of the aggregation by key.
The main () method is used to create a Hadoop job based on the created Map and Reduce classes and its execution. The JobConf object forms the job specification. The code is written to a JAR file that Hadoop will distribute across the cluster. Instead of explicitly specifying a file name, you can pass an enclosing class containing executable code (MapReduceTest) into the JobConf constructor, by which Hadoop will find the corresponding JAR file. The setOutputKeyClass () and setOutputValueClass () methods set the output types for the Map and Reduce functions. As a rule, they coincide, i.e. Map gives the same as Reduce. If they are different, the output types of the Map function can be specified using the setMapOutputKeyClass () and setMapOutputValueClass () methods. Which class will do the Map, and which Reduce, as you can guess, is set using the methods setMapperClass () and setReducerClass (). It remains to register the format of I / O. This is done using the setInputFormat () and setOutputFormat () methods. In this case, this could not be done, because text format is the default. In conclusion, you need to register the paths to the files with source data and results using the static methods FileInputFormat.setInputPaths () and FileOutputFormat.setOutputPath (). We will pass file names through command line arguments. As the name of the method shows, there may be several input files. There may be a directory, then all files contained in it will be taken. You can specify a file name pattern. As a location where the result files will be added, a directory is assigned. It should not exist, otherwise an error will occur during the execution. A kind of protection measure so that one task does not fray the result of another. You can delete a directory using the hadoop fs -rmr command.
Putting this together, we get the following code:
Script 5
Let's go to the Khadupov cluster via Remote Desktop, as shown in the previous article, and save this code to the MapReduceTest.java file, say, in the same d: \ Temp. Java support libraries in HDInsight are located in C: \ apps \ java \ bin. Hadoop doesn't know about this. It makes sense to go to the Hadoop command line window (
D:\Windows\system32\cmd.exe /k pushd "c:\apps\dist\hadoop-1.1.0-SNAPSHOT" && "c:\apps\dist\hadoop-1.1.0-SNAPSHOT\bin\hadoop.cmd"
, for convenience, there is a shortcut on the HDInsight desktop) and write this path to the% path% environment variable:
set PATH=%PATH%;C:\apps\java\bin
Script 6
Go to the d: \ Temp directory and compile the java file into the bytecode class files. The switch -encoding was needed because I saved Unicode encoding MapReduceTest.java.
javac -encoding UNICODE -classpath C:\apps\dist\hadoop-1.1.0-SNAPSHOT\hadoop-core-*.jar d:\Temp\MapReduceTest.java
Script 7
In d: \ Temp, a MapReduceTest.class file and MapReduceTest $ Map.class and MapReduceTest $ Reduce.class files were created, corresponding to the altered classes. Build the build:
jar -cvf MapReduceTest.jar *.class
Script 8
Pic1
On the current d: \ Temp path, a java archive MapReduceTest.jar was formed.
hadoop jar MapReduceTest.jar MapReduceTest Sample1/input/Sample.log Sample1/output
Script 9
Pic2
Here, Sample1 / input / Sample.log is the log file to be processed, downloaded from the local d: \ Temp directory to the HDFS / Sample1 / Input directory - see Figure 5 of the previous article. Last time I forgot to point out that before loading it is necessary to explicitly create the HDFS input directory (hadoop fs -mkdir Sample1 / input /) and only after that put the file into it (hadoop fs -put d: \ Temp \ Sample.log Sample1 / input /). If you try to load a file without first creating a directory, it is created, but the file is not loaded into it, as hadoop fs -ls Sample1 / input / can be seen.
In the meantime, the task successfully completed. In the output directory of HDFS Sample1 / output, a file with results was generated, containing the number of occurrences of each attribute in the log, as ordered:
hadoop fs -cat Sample1/output/part-00000
Script 10
Pic.3