📜 ⬆️ ⬇️

Nonlinear regression in Apache Spark. We develop with our own hands



When solving signal processing problems, the approximation of raw data by a regression model is often used. Based on the structure, the models can be divided into three types - linear, reducible to linear and non-linear. In the “Spark ML” Apache Spark machine learning module, the functionality for the first two types is represented by the classes LinearRegression and GeneralizedLinearRegression, respectively. Training non-linear models in the standard library is not represented and requires self-development.

First, we briefly consider the theoretical foundations of the construction of nonlinear models, and then proceed to the practical development of the expansion Spark ML.

A bit of math


Learning nonlinear models is more complicated than linear ones. This may be due to the presence of more than one extremum and / or the "ravine" nature of the response surface. The main incentive to use nonlinear functions is the possibility of obtaining more compact models. It should also be noted that many analytical equations from the field of physics and engineering initially have a nonlinear form, so the use of appropriate models can be forced.
')
For teaching nonlinear models, there is a fairly wide range of tools, the choice of which depends on the specific type of function, the presence and type of restrictions used, etc. The article will use the widely presented quadratic error function and the Newton-Gauss method - the first-order quasi algorithm -Newton type. This algorithm has a fairly good convergence in most cases.

The iteration step in the Newton-Gauss method is determined by the relation:

d = - (J ^ T J) ^ {- 1} J ^ T r where J is the Jacobi matrix, r is the residual column vector r_i = y_i-f (w; x_i) .

This formula logically consists of two parts: approximation of the Hesse matrix H \ approx (J ^ T J) and gradient approximation \ nabla f \ approx J ^ T r .

The number of rows of the Jacobi matrix is ​​determined by the number of training examples n , the number of columns is determined by the size of the weights vector m . As shown in [1], the approximation of the Hessian matrix can be calculated by reading two rows of the matrix of the Jacobi matrix and multiplying them. Received n ^ 2 matrix size m \ times m it only remains to add. The proposed rearrangement of operations does not alter the total computational complexity, but allows us not to load the entire Jacobi matrix into memory and conduct the calculation in parallel. Similarly, the calculation of the approximation of the gradient is carried out only the addition are subject to n vectors of length m . The inversion of the resulting Hesse matrix is ​​not very complex due to the relatively small size. To ensure the convergence of the algorithm, it is necessary to follow the positive definiteness of the calculated matrix (J ^ T J) that is implemented by calculating eigenvalues ​​and vectors.

In articles [2, 3], a general scheme was proposed for applying the approach described above for Apache Spark. The only flaw in these works, in my opinion, is the lack of a clear link with the existing Spark ML API. We will try to fill this gap in the next section.

Implementing the Spark ML API


For the successful implementation of nonlinear models, we need to understand the structure and purpose of the base classes. The Spark machine learning API has two versions: 1.x located inside the mllib package, 2.x in the ml package. The documentation for the Spark ML module [4] indicates that the transition from API version 1.x to version 2.x is intended to provide the possibility of embedding into Pipelines and working with a typed DataFrame structure. In our example, we will use a newer class structure, but if necessary, it can be quite simply implemented under the old structure.

Important Spark API Classes



Regressor class extension


The org.apache.spark.ml.regression.NonLinearRegression class extends the Regressor contract and, as a result of the training, returns an instance of NonLinearRegressionModel . To create a non-linear model, you need to specify a unique string identifier and the core function of the NonLinearFunction model. From the optional parameters, you can list: the maximum number of learning iterations, the initial approximation of the vector of coefficients and the required accuracy. Nonlinear functions often have many extremes and the choice of the initial approximation, based on a priori ideas about the behavior of a particular nuclear function, allows you to direct the search to the global extremum. It is worth noting the use of ready-made implementation of the Broyden-Fletcher-Goldfarb-Shanno algorithm with limited memory consumption (LBFGS) from the Breeze library. The model learning code is shown below.

Code org.apache.spark.ml.regression.NonLinearRegression # train
override protected def train(dataset: Dataset[_]): NonLinearRegressionModel = { // set instance weight to 1 if not defined the column val instanceWeightCol: Column = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol)) val instances: RDD[Instance] = dataset.select( col($(labelCol)).cast(DoubleType), instanceWeightCol, col($(featuresCol))).rdd.map { case Row(label: Double, weight: Double, features: Vector) => Instance(label, weight, features) } // persists dataset if defined any storage level val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) val costFunc = new SquaresLossFunctionRdd(kernel, instances) val optimizer = new LBFGS[BDV[Double]]($(maxIter), 10, $(tol)) // checks and assigns the initial coefficients val initial = { if (!isDefined(initCoeffs) || $(initCoeffs).length != kernel.dim) { if ($(initCoeffs).length != kernel.dim) logWarning(s"Provided initial coefficients vector (${$(initCoeffs)}) not corresponds with model dimensionality equals to ${kernel.dim}") BDV.zeros[Double](kernel.dim) } else BDV($(initCoeffs).clone()) } val states = optimizer.iterations(new CachedDiffFunction[BDV[Double]](costFunc), initial) val (coefficients, objectiveHistory) = { val builder = mutable.ArrayBuilder.make[Double] var state: optimizer.State = null while (states.hasNext) { state = states.next builder += state.adjustedValue } // checks is method failed if (state == null) { val msg = s"${optimizer.getClass.getName} failed." logError(msg) throw new SparkException(msg) } (Vectors.dense(state.x.toArray.clone()).compressed, builder.result) } // unpersists the instances RDD if (handlePersistence) instances.unpersist() // produces the model and saves training summary val model = copyValues(new NonLinearRegressionModel(uid, kernel, coefficients)) val (summaryModel: NonLinearRegressionModel, predictionColName: String) = model.findSummaryModelAndPredictionCol() val trainingSummary = new NonLinearRegressionSummary( summaryModel.transform(dataset), predictionColName, $(labelCol), $(featuresCol), objectiveHistory, model ) model.setSummary(trainingSummary) } 


The code for the train method can be divided into three parts: obtaining training examples from a data set; initiation of the penalty function and finding the optimal solution; saving the vector of coefficients and learning outcomes in the model instance.

RegressionModel class extension


The implementation of the org.apache.spark.ml.regression.NonLinearRegressionModel class is rather trivial. The predict method uses the kernel function to get the value at the point:

 override protected def predict(features: Vector): Double = { kernel.eval(coefficients.asBreeze.toDenseVector, features.asBreeze.toDenseVector) } 

The only requirement Spark ML API, which is worth paying attention to - the requirement for model serializability. The necessary behavior is provided in the companion object by extending the abstract classes org.apache.spark.ml.util. {MLReader, MLWriter} [6]. The state of the trained model consists of two parts: the coefficient vector and the kernel function. If with the vector of coefficients everything is already invented before us, then with the core function it is somewhat more complicated. You cannot save the kernel function directly in the DataFrame, but there are several alternatives.

For simplicity, the binary serialization option of the function in the Base64 string was chosen. The disadvantages include the inaccessibility of a person reading a received line, as well as the need to support versioning of implementations.

A much more promising approach is to preserve the function in a symbolic form. This can be done in the image of the objects of class formula of the stats package in the R language, for example, log (y) ~ a + log (x) . This method is more complicated than the first, but it solves a number of problems: a human-readable representation of functions and the possibility of deserialization by different versions of parsers while maintaining backward compatibility. Significant complexity here is the development of a fairly flexible parser of symbolic expressions of functions.

Perhaps a useful improvement will be the ability to select a step for the numerical differentiation of the core function. This does not significantly affect the complexity of saving the model.

Realization of quadratic loss function


The final element required for learning is the loss function. In our example, a quadratic loss function is used in the form of two realizations. In one, the training examples are specified as a Breeze matrix, in the other, as a Spark structure RDD [Instance]. The first implementation is easy to understand (directly uses matrix expressions) and is suitable for small training sets. It serves as a test bench for us.

Org.apache.spark.ml.regression.SquaresLossFunctionBreeze code
 package org.apache.spark.ml.regression import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV} /** * Breeze implementation for the squares loss function. * * @param fitmodel concrete model implementation * @param xydata labeled data combined into the matrix: * the first n-th columns consist of feature values, (n+1)-th columns contains labels */ class SquaresLossFunctionBreeze(val fitmodel: NonLinearFunction, xydata: BDM[Double]) extends SquaresLossFunction { /** * The number of instances. */ val instanceCount: Int = xydata.rows /** * The number of features. */ val featureCount: Int = xydata.cols - 1 /** * Feature matrix. */ val X: BDM[Double] = xydata(::, 0 until featureCount) /** * Label vector. */ val y: BDV[Double] = xydata(::, featureCount) /** * The model dimensionality (the number of weights). * * @return dimensionality */ override def dim: Int = fitmodel.dim /** * Evaluates loss function value and the gradient vector * * @param weights weights * @return (loss function value, gradient vector) */ override def calculate(weights: BDV[Double]): (Double, BDV[Double]) = { val r: BDV[Double] = diff(weights) (0.5 * (rt * r), gradient(weights)) } /** * Calculates a positive definite approximation of the Hessian matrix. * * @param weights weights * @return Hessian matrix approximation */ override def hessian(weights: BDV[Double]): BDM[Double] = { val J: BDM[Double] = jacobian(weights) posDef(Jt * J) } /** * Calculates the Jacobian matrix * * @param weights weights * @return the Jacobian */ def jacobian(weights: BDV[Double]): BDM[Double] = { val gradData = (0 until instanceCount) map { i => fitmodel.grad(weights, X(i, ::).t).toArray } BDM(gradData: _*) } /** * Calculates the difference vector between the label and the approximated values. * * @param weights weights * @return difference vector */ def diff(weights: BDV[Double]): BDV[Double] = { val diff = (0 until instanceCount) map (i => fitmodel.eval(weights, X(i, ::).t) - y(i)) BDV(diff.toArray) } /** * Calculates the gradient vector * * @param weights weights * @return gradient vector */ def gradient(weights: BDV[Double]): BDV[Double] = { val J: BDM[Double] = jacobian(weights) val r = diff(weights) 2.0 * Jt * r } } 


The second option is designed to run in a distributed environment. For the calculation, the RDD.treeAggregate function is used , which allows implementing the algorithm in the “Map-Reduce” style.

Code org.apache.spark.ml.regression.SquaresLossFunctionRdd
 package org.apache.spark.ml.regression import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV} import org.apache.spark.broadcast.Broadcast import org.apache.spark.ml.feature.Instance import org.apache.spark.rdd.RDD /** * Spark RDD implementation for the squares loss function. * * @param fitmodel concrete model implementation * @param xydata RDD with instances */ class SquaresLossFunctionRdd(val fitmodel: NonLinearFunction, val xydata: RDD[Instance]) extends SquaresLossFunction { /** * The model dimensionality (the number of weights). * * @return dimensionality */ override def dim: Int = fitmodel.dim /** * Evaluates loss function value and the gradient vector * * @param weights weights * @return (loss function value, gradient vector) */ override def calculate(weights: BDV[Double]): (Double, BDV[Double]) = { val bcW: Broadcast[BDV[Double]] = xydata.context.broadcast(weights) val (f: Double, grad: BDV[Double]) = xydata.treeAggregate((0.0, BDV.zeros[Double](dim)))( seqOp = (comb, item) => (comb, item) match { case ((loss, oldGrad), Instance(label, _, features)) => val featuresBdv = features.asBreeze.toDenseVector val w: BDV[Double] = bcW.value val prediction = fitmodel.eval(w, featuresBdv) val addedLoss: Double = 0.5 * math.pow(label - prediction, 2) val addedGrad: BDV[Double] = 2.0 * (prediction - label) * fitmodel.grad(w, featuresBdv) (loss + addedLoss, oldGrad + addedGrad) }, combOp = (comb1, comb2) => (comb1, comb2) match { case ((loss1, grad1: BDV[Double]), (loss2, grad2: BDV[Double])) => (loss1 + loss2, grad1 + grad2) }) (f, grad) } /** * Calculates a positive definite approximation of the Hessian matrix. * * @param weights weights * @return Hessian matrix approximation */ override def hessian(weights: BDV[Double]): BDM[Double] = { val bcW = xydata.context.broadcast(weights) val (hessian: BDM[Double]) = xydata.treeAggregate(new BDM[Double](dim, dim, Array.ofDim[Double](dim * dim)))( seqOp = (comb, item) => (comb, item) match { case ((oldHessian), Instance(_, _, features)) => val grad = fitmodel.grad(bcW.value, features.asBreeze.toDenseVector) val subHessian: BDM[Double] = grad * grad.t oldHessian + subHessian }, combOp = (comb1, comb2) => (comb1, comb2) match { case (hessian1, hessian2) => hessian1 + hessian2 } ) posDef(hessian) } } 


Build project


To simplify development and testing from the original Spark ML project, we borrowed pom.xml with a slight modification. We fix the version of Spark to one of the released ones, in our case 2.0.1 . Attention should be paid to the inheritance of our POM file from org.apache.spark: spark-parent_2.11: 2.0.1 , which allows us not to restart the Maven plug-in configuration.

To run tests that require SparkContext , we add org.apache.spark to the test dependencies : spark-mllib_2.11: 2.0.1: test-jar : traits org.apache.spark.mllib.util.MLlibTestSparkContext , org.apache.spark .ml.util.TempDirectory embed in the appropriate test classes. Also useful for testing may be extensions of the Suite classes from the org.apache.spark package that help to work with the context, for example, SparkFunSuite .

On the Rights of Conclusion


There are several points that are not covered in this article, but studying them is extremely interesting:


At the moment, I have not enough information on the aspects indicated above, but I will be grateful to all of the shared sources.

The full code can be viewed on Github .

Full and comprehensive testing of this solution is still to be done, so please treat the material as a concept and a topic for discussing improvements.

For comments and suggestions, it is preferable to use private messages, comments are better suited for discussions.

Thank you all for your attention.

Used materials


  1. ieeexplore.ieee.org/document/5451114
  2. www.nodalpoint.com/nonlinear-regression-using-spark-part-1-nonlinear-models
  3. www.nodalpoint.com/non-linear-regression-using-spark-part2-sum-of-squares
  4. spark.apache.org/docs/latest/ml-guide.html
  5. github.com/scalanlp/breeze
  6. jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-mllib/spark-mllib-pipelines-persistence.html

Source: https://habr.com/ru/post/316988/


All Articles