📜 ⬆️ ⬇️

Small code for big data or Apache Spark in 3 days

Let the Giraffe was wrong
But the Giraffe is not guilty,
And the one who shouted from the branches:
"Big giraffe - he knows better!" (C)

It took to quickly deal with Apache Spark technology sharpened to use Big Data. In the process of finding out, I actively used habrahabr, so I’ll try to return the information debt by sharing my experience.

Namely: installing the system from scratch, setting up and actually programming the code that solves the data processing task to create a model that calculates the probability of bank bankruptcy for a set of features such as loan amount, rate, etc.
')
It seems like there should be a lot of big data, but for some reason it’s not easy to find that hot spot where they all feel. At first I tried the ambari option , but on my Window7, errors occurred on the settings for the network bridge. As a result, I rolled the option with a pre-configured virtual machine from Cloudera ( CDH ). Simply install VirtualBox, launch the downloaded file, specify the main parameters (memory, location) and after 5 minutes the honorable Apache Hadoop gin is eager for your instructions.

A few words why Spark. As far as I understand, the key differences from the original MapReduce are that the data is kept in memory, instead of being flushed to disk, which gives acceleration many times over. But perhaps more important are the implementation of a number of statistical functions and a convenient interface for loading / processing data.

Next, the actual code for the next task. There is really big data (for the hand is very tired to scroll these 2000 lines) in the format:



There is an assumption that the default is somehow connected with the other parameters (except for the first one, there are no complaints about the respected Ivanov1 ... N) and it is necessary to build a linear regression model. Before you begin, it’s worth mentioning that this is my first Java code, I myself work as an analyst and in general this is my first launch of Eclipse, setting up Maven, etc. So one should not wait for exquisite miracles, lower the solution of the problem head on in the way that for some reason has earned. Go:

1. Create a Spark session. The important point is that it all works only with version 2.0.0, whereas in the CDH delivery comes v1.6. So you need to make an upgrade, otherwise there will be an exception at startup.

SparkSession ss = SparkSession .builder() .appName("Bankrupticy analyser") .getOrCreate(); 

2. Load data into a special type of JavaRDD. In fact, this is about how List in C #, at least I explained it to myself this way. The library can read a lot of things, but for a start, a regular csv file will come down.

 JavaRDD<Client> peopleRDD = ss.read() .textFile(filename) .javaRDD() .map(new Function<String, Client>() { public Client call(String line) throws Exception { String[] parts = line.split(","); //  Client client = new Client(); client.setName(parts[0]); //   (   ) client.setYearOfBirth(Double.parseDouble(parts[1])); client.setAmount(Double.parseDouble(parts[2])); client.setTerm(Double.parseDouble(parts[3])); client.setRate(Double.parseDouble(parts[4])); client.setPaid(Double.parseDouble(parts[5])); client.setStatus(Double.parseDouble(parts[6])); //    (1 - , 0 –   ) return client; } }); 

Where Client is a normal class with our attributes (can be found in the project file, following the link at the end of the post).

3. Create a dataset that is required for data normalization. Without normalization, the calculation of the linear regression model by the method of gradient descent does not roll. At first I tried to fasten StandardScalerModel: fit -> transform but I had problems with data types, it seems because of the difference in versions. In general, for the time being it has managed a workaround, namely through a select to the data, performing normalization right in it:

 Dataset<Row> clientDF = ss.createDataFrame(peopleRDD, Client.class); clientDF.createOrReplaceTempView("client"); Dataset<Row> scaledData = ss.sql( "SELECT name, (minYearOfBirth - yearOfBirth) / (minYearOfBirth - maxYearOfBirth)," + "(minAmount - amount) / (minAmount - maxAmount)," + "(minTerm - term) / (minTerm - maxTerm)," + "(minRate - rate) / (minRate - maxRate)," + "(minPaid - paid) / (minPaid - maxPaid)," + "(minStatus - status) / (minStatus - maxStatus) " + "FROM client CROSS JOIN " + "(SELECT min(yearOfBirth) AS minYearOfBirth, max(yearOfBirth) AS maxYearOfBirth," + "min(amount) AS minAmount, max(amount) AS maxAmount," + "min(term) AS minTerm , max(term) AS maxTerm," + "min(rate) AS minRate, max(rate) AS maxRate," + "min(paid) AS minPaid, max(paid) AS maxPaid," + "min(status) AS minStatus, max(status) AS maxStatus " + "FROM client)").cache(); 

4. The model accepts data in the format of JavaRDD in which we stuff the name of the client. This is the norm for a beautiful display of the test version, in life of course you should not do that, although in general this may be necessary for other purposes.

 JavaRDD<Row> rowData = scaledData.javaRDD(); // Dataset to JavaRDD JavaRDD<Tuple2<String,LabeledPoint>> parsedData = rowData.map( new Function<Row, Tuple2<String,LabeledPoint>>() { public Tuple2<String,LabeledPoint> call(Row row) { int last = row.length(); String cname = row.getString(0); //   -  double label = row.getDouble(last - 1); //  –   double[] v = new double[last]; for (int i = 1; i < last - 1; i++) //    v[i] = row.getDouble(i); v[last - 1] = 1; // +intercept return new Tuple2<String, LabeledPoint> (cname, new LabeledPoint(label, Vectors.dense(v))); } }); 

5. Select the LabeledPoint data for the model:

 JavaRDD<LabeledPoint> parsedDataToTrain = parsedData.map( new Function<Tuple2<String,LabeledPoint>, LabeledPoint>() { public LabeledPoint call(Tuple2<String,LabeledPoint> namedTuple) { return namedTuple._2(); // 2      <String,LabeledPoint> } }); parsedData.cache(); 

6. Create the model itself:

 int numIterations = 200; double stepSize = 2; final LinearRegressionModel model = LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedDataToTrain), numIterations, stepSize); 

7. And actually the main work + result:

 final NumberFormat nf = NumberFormat.getInstance(); //     nf.setMaximumFractionDigits(2); JavaRDD<Tuple2<Double, Double>> valuesAndPreds = parsedData.map( new Function<Tuple2<String,LabeledPoint>, Tuple2<Double, Double>>() { public Tuple2<Double, Double> call(Tuple2<String,LabeledPoint> namedTuple) { double prediction = model.predict(namedTuple._2().features()); //         System.out.println(namedTuple._1() + " got the score " + nf.format(prediction) + ". The real status is " + nf.format(namedTuple._2().label())); return new Tuple2<Double, Double>(prediction, namedTuple._2().label()); } }); 

8. And we calculate the average squared error (from paragraph 7):

 double MSE = new JavaDoubleRDD(valuesAndPreds.map( new Function<Tuple2<Double, Double>, Object>() { public Object call(Tuple2<Double, Double> pair) { return Math.pow(pair._1() - pair._2(), 2.0); } }).rdd()).mean(); 

In this case, the output will look like this:

Ivanov1983 got the score 0.57. The real status is 1
Ivanov1984 got the score 0.54. The real status is 1
Ivanov1985 got the score -0.08. The real status is 0
Ivanov1986 got the score 0.33. The real status is 1
Ivanov1987 got the score 0.78. The real status is 1
Ivanov1988 got the score 0.63. The real status is 1
Ivanov1989 got the score 0.63. The real status is 1
Ivanov1990 got the score 0.03. The real status is 0
Ivanov1991 got the score 0.57. The real status is 1
Ivanov1992 got the score 0.26. The real status is 0
Ivanov1993 got the score 0.07. The real status is 0
Ivanov1994 got the score 0.17. The real status is 0
Ivanov1995 got the score 0.83. The real status is 1
Ivanov1996 got the score 0.31. The real status is 0
Ivanov1997 got the score 0.48. The real status is 0
Ivanov1998 got the score 0.16. The real status is 0
Ivanov1999 got the score 0.36. The real status is 0
Ivanov2000 got the score -0.04. The real status is 0
16/11/21 21:36:40 INFO Executor: Finished task 0.0 in stage 176.0 (TID 176). 3194 bytes result sent to driver
16/11/21 21:36:40 INFO TaskSetManager: Finished task 0.0 in stage 176.0 (TID 176) in 432 ms on localhost (1/1)
16/11/21 21:36:40 INFO TaskSchedulerImpl: Removed TaskSet 176.0, whose tasks have completed, from pool
16/11/21 21:36:40 INFO DAGScheduler: ResultStage 176 (mean at App.java:242) finished in 0.433 s
16/11/21 21:36:40 INFO DAGScheduler: Job 175 finished: mean at App.java:242, took 0.452851 s
Training Error = 0.11655428630639536

Now it makes sense to compare it with the analytical solution in Excel:



As you can see, the result is very close, the model turned out to be suitable, you can set it on a test sample. The project code with source data can be downloaded here .

In general, I would like to note that the excitement around big data seems to be quite excessive (much like this). It seems to me that the more valuable is not the volume, but how exactly to process this data. Those. any combination of TF-IDF - neural network - ALS can give an amazing result if it is possible to creatively work on a limited volume. The problem is probably that managers can beat budgets for the Big Data magic words, and spending resources on research objectives requires a too long-term planning horizon for an ordinary company.

To understand this thought, I will clarify that the zoo of the Hadoop ecosystem (Hive, Pig, Impala, etc.) is gorgeous. I myself have been developing a distributed computing system on neural networks (simultaneous execution of multi-threaded applications on multiple servers with synchronization and aggregation of results) for macroeconomic modeling and I understand approximately how much rake lies on this path. Yes, there are tasks where there are no alternatives to these technologies - for example, primitive, but streaming online processing of wild data volumes (conditionally speaking, some analysis of the traffic of cellular subscribers in Moscow). Here Apache Storm or Spark Streaming can create a miracle.

But if we have an array of data on a million customers per year, then sampling every 10th (or even 100th) randomly to build a model of some scoring will give almost the same result as the full array. In other words, instead of the queen of the ball, Data mining has become a stepdaughter, although this is most likely temporary. The excitement will subside, but the experimental approaches that are now being tested on Hadoop clusters will spread and those who are first aware of the prospects for exploring the “small” data will be in the queues.

Source: https://habr.com/ru/post/316088/


All Articles