
Today I would like to tell you about a new package that has appeared in version 1.2, called spark.ml. It is designed to provide a single high-level API for machine learning algorithms that will help simplify the creation and customization, as well as the integration of several algorithms in a single pipeline or workflow. Now we have version 1.4.1, and the developers claim that the package came out of alpha, although many components are still labeled as Experimental or DeveloperApi.
Well, let's check that the new package can and how good it is.
First we need to get acquainted with the basic concepts introduced in spark.ml.
1.
ML Dataset - spark.ml uses for working with data
DataFrame from the spark.sql package. A DataFrame is a distributed collection in which data is stored as named columns. Conceptually, a DataFrame is equivalent to a table in a relational database or a type of data like frame in R or Python, but with richer optimization under the hood. (Examples and ways of working will be given below in the article).
')
2.
Transformer (modifier) ​​is just any algorithm that can convert one DataFrame into another. For example: any trained model is a modifier, because it converts a set of characteristics (features) into a prediction (prediction)
3.
Estimator (estimation algorithm) is an algorithm that can perform the conversion from a DataFrame to a Transformer. For example, any learning algorithm is also an evaluation algorithm, since it accepts a set of data for training and creates a trained model as its output.
4.
Pipeline - a pipeline that combines any number of modifiers and evaluation algorithms to create machine learning workflow.
5.
Param is a general type used by modifiers and estimation algorithms to set parameters.
According to the interface described, each Estimator must have a fit method that accepts a DataFrame and returns a Transformer. In turn, the Transformer must have a transform method that transforms one DataFrame to another.
In the course of
Scalable Machine Learning, in one of the laboratory works, teachers, talking about linear regression, solved the problem “about determining the year of creation of a song based on the audio characteristics”. It was implemented quite a lot of methods for data processing, and for assessing and finding the best model. This was done to acquaint students in more detail with the main processes in machine learning, but let's check how much spark.ml will make life easier for us.
In the laboratory, we were provided with already prepared and slightly cropped data. But since we are interested in going all the way, I propose to take a
raw data set . Each line of the form:
2007, 45.17809 46.34234 -40.65357 -2.47909 1.21253 -0.65302 -6.95536 -12.20040 17.02512 2.00002 -1.87785 9.85499 25.59837 1905.18577 3676.09074 1976.85531 913.11216 1957.52415 955.98525 942.72667 439.85991 591.66138 493.40770 496.38516 33.94285 -255.90134 -762.28079 -66.10935 -128.02217 198.12908 -34.44957 176.00397 -140.80069 -22.56380 12.77945 193.30164 314.20949 576.29519 -429.58643 -72.20157 59.59139 -5.12110 -182.15958 31.80120 -10.67380 -8.13459 -122.96813 208.69408 -138.66307 119.52244 -17.48938 75.58779 93.29243 85.83507 47.13972 312.85482 135.50478 -32.47886 49.67063 -214.73180 -77.83503 -47.26902 7.58366 -352.56581 -36.15655 -53.39933 -98.60417 -82.37799 45.81588 -16.91676 18.35888 -315.68965 -3.14554 125.45269 -130.18808 -3.06337 42.26602 -9.04929 26.41570 23.36165 -4.36742 -87.55285 -70.79677 76.57355 -7.71727 3.26926 -298.49845 11.49326 -89.21804 -15.09719
where the year goes first, then the 12 numbers are the average timbres, and the last 78 are the covariations of timbres.
First of all, we need to tighten this data into a DataFrame, but first we will slightly convert the data format:
val sc = new SparkContext("local[*]", "YearPrediction") val rawData: RDD[(Double, linalg.Vector, linalg.Vector)] = sc.textFile("data/YearPredictionMSD.txt") .map(_.split(',')) .map(x => ( x.head.toDouble, Vectors.dense(x.tail.take(12).map(_.toDouble)), Vectors.dense(x.takeRight(78).map(_.toDouble)) ))
Now each RDD element is a tuple containing a year and two characteristics vectors, to get a DataFrame, you need to perform another transformation:
val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val rawDF: DataFrame = labeledPointsRDD.toDF("label", "avg", "cov")
Notice that we created sqlContext and pulled up implicit conversion methods (in this case, we could write
import sqlContext.implicits.rddToDataFrameHolder
import sqlContext.implicits.rddToDataFrameHolder
) to use the toDF method. We also specified the column names, and now the data structure will look like this:
label | avg | cov -------|-----------------------------------------|--------------------------------------------- 2001 | [49.94357 21.47114 73.07750 8.74861... | [10.20556 611.10913 951.08960 698.11428... -------|-----------------------------------------|--------------------------------------------- 2007 | [50.57546 33.17843 50.53517 11.5521... | [44.38997 2056.93836 605.40696 457.4117...
The gradient method, which is used in linear regression, is sensitive to the variation of the characteristic values, so the data before training should be normalized or standardized. For these purposes, the spark.ml.feature package has two classes: StandardScaler and Normalizer.
import org.apache.spark.ml.feature.{Normalizer, StandardScalerModel, StandardScaler} val scalerAvg: StandardScalerModel = new StandardScaler() .setWithMean(true) .setWithStd(true) .setInputCol("avg") .setOutputCol("features")
Please note that StandardScaler is an Estimator, which means we need to call the fit method to get a Transformer, in this case StandardScalerModel. All classes that work with DataFrame have two general methods:
setInputCol - set the name of the column from which to read data
setOutputCol - specify the name of the column into which you want to write the converted data.
The differences in the result of the work of these classes in this case will be that the scaler will return data in the range from -1 to 1, and Normalizer in the range from 0 to 1. You can read more about the algorithms of work
here and
here .
We prepared the training sample (or rather, received modifiers, which we will use for data processing); now we need to create an estimation algorithm (Estimator), which will give us a trained model as a result. We set almost standard settings, at this stage they are not particularly interesting.
import org.apache.spark.ml.regression.LinearRegression val linReg = new LinearRegression() .setFeaturesCol("features") .setLabelCol("label") .setElasticNetParam(0.5) .setMaxIter(500) .setRegParam(1e-10) .setTol(1e-6)
Now we have everything we need to build a simple conveyor:
import org.apache.spark.ml.Pipeline val pipeline = new Pipeline().setStages(Array( normAvg, linReg ))
Pipeline has a setStages method that accepts an array of steps that will be performed in the specified order when entering a training set. Now, all we have to do is not to forget to divide the data into a training and test sample:
val splitedData = rawDF.randomSplit(Array(0.8, 0.2), 42).map(_.cache()) val trainData = splitedData(0) val testData = splitedData(1)
Let's launch the pipeline we created and evaluate the result of its work:
val pipelineModel = pipeline.fit(trainData) val fullPredictions = pipelineModel.transform(testData) val predictions = fullPredictions.select("prediction").map(_.getDouble(0)) val labels = fullPredictions.select("label").map(_.getDouble(0)) val rmseTest = new RegressionMetrics(predictions.zip(labels)).rootMeanSquaredError > (2003.0,1999.6153819348176) (1997.0,2000.9207184703566) (1996.0,2000.4171327880172) (1997.0,2002.022142263423) (2000.0,1997.6327888556184) RMSE: 10,552024
At this stage, everything should be clear, note that we used the ready-made
RegressionMetrics class in which to evaluate the model, along with the other RMSE estimate already familiar to us, other basic estimates were also implemented.
Moving on: in the course of Scalable Machine Learning, we created new characteristics by converting the initial ones into a polynomial with degree 2. The developers of spark.ml took care of this: now it’s enough for us to create another modifier and add it to the pipeline; most importantly, in this process, do not get confused and correctly specify the name of the columns.
import org.apache.spark.ml.feature.PolynomialExpansion
So far, we have used only 12 characteristics for learning, but I remember that there were 78 more in the raw data, maybe let's try to combine them? And in this case, spark.ml has a
VectorAssembler solution. Once decided, let's do:
import org.apache.spark.ml.feature.VectorAssembler val assembler = new VectorAssembler() .setInputCols(Array("avg", "cov")) .setOutputCol("united") normAvg.setInputCol("united") val pipeline = new Pipeline().setStages(Array( assembler, normAvg, polynomAvg, linReg ))
With the preparation of the data, we figured out a little, but the question remained of selecting the optimal parameters for the algorithm, I really do not want to do it manually, and do not! For this purpose, the spark.ml class is implemented
CrossValidator . CrossValidator accepts the estimation algorithm (in our case, it is linReg), the set of parameters that we would like to test and the evaluation tool (when we evaluated the model manually, we used RMSE). CrossValidator starts its work by breaking the data set into several samples (k by default 3), randomly choosing a training and validation sample (the validation sample will be 1 / k in size from the original one). Then, for each set of parameters on each of the samples, the model will be trained, its effectiveness will be evaluated and the best model will be selected. It should be noted that the choice of a model through the CrossValidator is quite a time-consuming operation, but is statistically more reasonable than the heuristic manual selection.
For the convenience of creating a set of parameters in spark.ml, there is a ParamGridBuilder utility class, which we use:
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder} val paramGrid: Array[ParamMap] = new ParamGridBuilder() .addGrid(linReg.maxIter, Array(5, 500)) .addGrid(linReg.regParam, Array(1e-15, 1e-10)) .addGrid(linReg.tol, Array(1e-9, 1e-6, 1e-3)) .addGrid(linReg.elasticNetParam, Array(0, 0.5, 1)) .build() val crossVal = new CrossValidator() .setEstimator(pipeline) .setEvaluator(new RegressionEvaluator) .setEstimatorParamMaps(paramGrid) .setNumFolds(3) val bestModel = crossVal.fit(trainData) > Best set of parameters: { linReg_3a964d0300fd-elasticNetParam: 0.5, linReg_3a964d0300fd-maxIter: 500, linReg_3a964d0300fd-regParam: 1.0E-15, linReg_3a964d0300fd-tol: 1.0E-9 } Best cross-validation metric: -10.47433119891316
Well, that's probably all that concerns linear regression, for the classification and clustering algorithms in spark.ml there is also a set of solutions that are ready to help you conveniently organize the workflow.
Materials used:
Official documentation
UCI Machine Learning Repository
Scalable Machine Learning