
When solving signal processing problems, the approximation of raw data by a regression model is often used. Based on the structure, the models can be divided into three types - linear, reducible to linear and non-linear. In the “Spark ML” Apache Spark machine learning module, the functionality for the first two types is represented by the classes
LinearRegression and
GeneralizedLinearRegression, respectively. Training non-linear models in the standard library is not represented and requires self-development.
First, we briefly consider the theoretical foundations of the construction of nonlinear models, and then proceed to the practical development of the expansion Spark ML.
A bit of math
Learning nonlinear models is more complicated than linear ones. This may be due to the presence of more than one extremum and / or the "ravine" nature of the response surface. The main incentive to use nonlinear functions is the possibility of obtaining more compact models. It should also be noted that many analytical equations from the field of physics and engineering initially have a nonlinear form, so the use of appropriate models can be forced.
')
For teaching nonlinear models, there is a fairly wide range of tools, the choice of which depends on the specific type of function, the presence and type of restrictions used, etc. The article will use the widely presented quadratic error function and the Newton-Gauss method - the first-order quasi algorithm -Newton type. This algorithm has a fairly good convergence in most cases.
The iteration step in the Newton-Gauss method is determined by the relation:
%5E%7B-1%7D%20J%5ET%20%20r)
where
J is the Jacobi matrix,
r is the residual column vector
)
.
This formula logically consists of two parts: approximation of the Hesse matrix
)
and gradient approximation

.
The number of rows of the Jacobi matrix is ​​determined by the number of training examples
n , the number of columns is determined by the size of the weights vector
m . As shown in [1], the approximation of the Hessian matrix can be calculated by reading two rows of the matrix of the Jacobi matrix and multiplying them. Received

matrix size

it only remains to add. The proposed rearrangement of operations does not alter the total computational complexity, but allows us not to load the entire Jacobi matrix into memory and conduct the calculation in parallel. Similarly, the calculation of the approximation of the gradient is carried out only the addition are subject to
n vectors of length
m . The inversion of the resulting Hesse matrix is ​​not very complex due to the relatively small size. To ensure the convergence of the algorithm, it is necessary to follow the positive definiteness of the calculated matrix
)
that is implemented by calculating eigenvalues ​​and vectors.
In articles [2, 3], a general scheme was proposed for applying the approach described above for Apache Spark. The only flaw in these works, in my opinion, is the lack of a clear link with the existing Spark ML API. We will try to fill this gap in the next section.
Implementing the Spark ML API
For the successful implementation of nonlinear models, we need to understand the structure and purpose of the base classes. The Spark machine learning API has two versions: 1.x located inside the
mllib package, 2.x in the
ml package. The documentation for the Spark ML module [4] indicates that the transition from API version 1.x to version 2.x is intended to provide the possibility of embedding into Pipelines and working with a typed DataFrame structure. In our example, we will use a newer class structure, but if necessary, it can be quite simply implemented under the old structure.
Important Spark API Classes
- class org.apache.spark.ml.feature.Instance describes an instance of a training example that includes a real label, an example weight coefficient, and a feature value vector;
- org.apache.spark.ml.regression. {Regressor, RegressionModel} are the key classes that need to be extended. The first is a model builder, and the second is a trained model.
- With the help of the treit introduced by us org.apache.spark.ml.regression.NonLinearFunction , a non-linear function contract is defined, for which the goal is to choose a weight vector. There are only 3 methods in the contract: eval returns the value of the function at a point, grad is the gradient vector, and dim is the dimension of the model (the length of the weight vector).
- The library of linear algebra Breeze [5] takes over operations with matrices and has ready-made implementations of optimization functions. To use the Newton-Gauss algorithm or another first-order algorithm, we need to implement the breeze.optimize.DiffFunction treit in our loss function. The calculate method for the transmitted coefficient vector should return a tuple of two values: the value of the penalty function and the vector of its gradient at a point.
Regressor class extension
The
org.apache.spark.ml.regression.NonLinearRegression class extends the
Regressor contract and, as a result of the training, returns an instance of
NonLinearRegressionModel . To create a non-linear model, you need to specify a unique string identifier and the core function of the
NonLinearFunction model. From the optional parameters, you can list: the maximum number of learning iterations, the initial approximation of the vector of coefficients and the required accuracy. Nonlinear functions often have many extremes and the choice of the initial approximation, based on a priori ideas about the behavior of a particular nuclear function, allows you to direct the search to the global extremum. It is worth noting the use of ready-made implementation of the Broyden-Fletcher-Goldfarb-Shanno algorithm with limited memory consumption (LBFGS) from the Breeze library. The model learning code is shown below.
Code org.apache.spark.ml.regression.NonLinearRegression # trainoverride protected def train(dataset: Dataset[_]): NonLinearRegressionModel = {
The code for the
train method can be divided into three parts: obtaining training examples from a data set; initiation of the penalty function and finding the optimal solution; saving the vector of coefficients and learning outcomes in the model instance.
RegressionModel class extension
The implementation of the
org.apache.spark.ml.regression.NonLinearRegressionModel class is rather trivial. The predict method uses the kernel function to get the value at the point:
override protected def predict(features: Vector): Double = { kernel.eval(coefficients.asBreeze.toDenseVector, features.asBreeze.toDenseVector) }
The only requirement Spark ML API, which is worth paying attention to - the requirement for model serializability. The necessary behavior is provided in the companion object by extending the abstract classes
org.apache.spark.ml.util. {MLReader, MLWriter} [6]. The state of the trained model consists of two parts: the coefficient vector and the kernel function. If with the vector of coefficients everything is already invented before us, then with the core function it is somewhat more complicated. You cannot save the kernel function directly in the DataFrame, but there are several alternatives.
For simplicity, the binary serialization option of the function in the Base64 string was chosen. The disadvantages include the inaccessibility of a person reading a received line, as well as the need to support versioning of implementations.
A much more promising approach is to preserve the function in a symbolic form. This can be done in the image of the objects of class formula of the stats package in the R language, for example,
log (y) ~ a + log (x) . This method is more complicated than the first, but it solves a number of problems: a human-readable representation of functions and the possibility of deserialization by different versions of parsers while maintaining backward compatibility. Significant complexity here is the development of a fairly flexible parser of symbolic expressions of functions.
Perhaps a useful improvement will be the ability to select a step for the numerical differentiation of the core function. This does not significantly affect the complexity of saving the model.
Realization of quadratic loss function
The final element required for learning is the loss function. In our example, a quadratic loss function is used in the form of two realizations. In one, the training examples are specified as a Breeze matrix, in the other, as a Spark structure RDD [Instance]. The first implementation is easy to understand (directly uses matrix expressions) and is suitable for small training sets. It serves as a test bench for us.
Org.apache.spark.ml.regression.SquaresLossFunctionBreeze code package org.apache.spark.ml.regression import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV} class SquaresLossFunctionBreeze(val fitmodel: NonLinearFunction, xydata: BDM[Double]) extends SquaresLossFunction { val instanceCount: Int = xydata.rows val featureCount: Int = xydata.cols - 1 val X: BDM[Double] = xydata(::, 0 until featureCount) val y: BDV[Double] = xydata(::, featureCount) override def dim: Int = fitmodel.dim override def calculate(weights: BDV[Double]): (Double, BDV[Double]) = { val r: BDV[Double] = diff(weights) (0.5 * (rt * r), gradient(weights)) } override def hessian(weights: BDV[Double]): BDM[Double] = { val J: BDM[Double] = jacobian(weights) posDef(Jt * J) } def jacobian(weights: BDV[Double]): BDM[Double] = { val gradData = (0 until instanceCount) map { i => fitmodel.grad(weights, X(i, ::).t).toArray } BDM(gradData: _*) } def diff(weights: BDV[Double]): BDV[Double] = { val diff = (0 until instanceCount) map (i => fitmodel.eval(weights, X(i, ::).t) - y(i)) BDV(diff.toArray) } def gradient(weights: BDV[Double]): BDV[Double] = { val J: BDM[Double] = jacobian(weights) val r = diff(weights) 2.0 * Jt * r } }
The second option is designed to run in a distributed environment. For the calculation, the
RDD.treeAggregate function is
used , which allows implementing the algorithm in the “Map-Reduce” style.
Code org.apache.spark.ml.regression.SquaresLossFunctionRdd package org.apache.spark.ml.regression import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV} import org.apache.spark.broadcast.Broadcast import org.apache.spark.ml.feature.Instance import org.apache.spark.rdd.RDD class SquaresLossFunctionRdd(val fitmodel: NonLinearFunction, val xydata: RDD[Instance]) extends SquaresLossFunction { override def dim: Int = fitmodel.dim override def calculate(weights: BDV[Double]): (Double, BDV[Double]) = { val bcW: Broadcast[BDV[Double]] = xydata.context.broadcast(weights) val (f: Double, grad: BDV[Double]) = xydata.treeAggregate((0.0, BDV.zeros[Double](dim)))( seqOp = (comb, item) => (comb, item) match { case ((loss, oldGrad), Instance(label, _, features)) => val featuresBdv = features.asBreeze.toDenseVector val w: BDV[Double] = bcW.value val prediction = fitmodel.eval(w, featuresBdv) val addedLoss: Double = 0.5 * math.pow(label - prediction, 2) val addedGrad: BDV[Double] = 2.0 * (prediction - label) * fitmodel.grad(w, featuresBdv) (loss + addedLoss, oldGrad + addedGrad) }, combOp = (comb1, comb2) => (comb1, comb2) match { case ((loss1, grad1: BDV[Double]), (loss2, grad2: BDV[Double])) => (loss1 + loss2, grad1 + grad2) }) (f, grad) } override def hessian(weights: BDV[Double]): BDM[Double] = { val bcW = xydata.context.broadcast(weights) val (hessian: BDM[Double]) = xydata.treeAggregate(new BDM[Double](dim, dim, Array.ofDim[Double](dim * dim)))( seqOp = (comb, item) => (comb, item) match { case ((oldHessian), Instance(_, _, features)) => val grad = fitmodel.grad(bcW.value, features.asBreeze.toDenseVector) val subHessian: BDM[Double] = grad * grad.t oldHessian + subHessian }, combOp = (comb1, comb2) => (comb1, comb2) match { case (hessian1, hessian2) => hessian1 + hessian2 } ) posDef(hessian) } }
Build project
To simplify development and testing from the original Spark ML project, we borrowed
pom.xml with a slight modification. We fix the version of Spark to one of the released ones, in our case
2.0.1 . Attention should be paid to the inheritance of our POM file from
org.apache.spark: spark-parent_2.11: 2.0.1 , which allows us not to restart the Maven plug-in configuration.
To run tests that require
SparkContext , we add
org.apache.spark to the test dependencies
: spark-mllib_2.11: 2.0.1: test-jar : traits
org.apache.spark.mllib.util.MLlibTestSparkContext ,
org.apache.spark .ml.util.TempDirectory embed in the appropriate test classes. Also useful for testing may be extensions of the Suite classes from the
org.apache.spark package that help to work with the context, for example,
SparkFunSuite .
On the Rights of Conclusion
There are several points that are not covered in this article, but studying them is extremely interesting:
- use of weighted training set;
- the use of constraints on the optimization domain, both soft (regularization) and hard (boundary conditions);
- evaluation of statistical indicators of the model (confidence intervals of coefficients, significance, etc.).
At the moment, I have not enough information on the aspects indicated above, but I will be grateful to all of the shared sources.
The full code can be viewed on
Github .
Full and comprehensive testing of this solution is still to be done, so please treat the material as a concept and a topic for discussing improvements.
For comments and suggestions, it is preferable to use private messages, comments are better suited for discussions.
Thank you all for your attention.
Used materials
- ieeexplore.ieee.org/document/5451114
- www.nodalpoint.com/nonlinear-regression-using-spark-part-1-nonlinear-models
- www.nodalpoint.com/non-linear-regression-using-spark-part2-sum-of-squares
- spark.apache.org/docs/latest/ml-guide.html
- github.com/scalanlp/breeze
- jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-mllib/spark-mllib-pipelines-persistence.html