Time(sec) Price($) Volume Event Type Direction - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 34203.011926972 598.68 10 submission ask 34203.011926973 594.47 15 submission bid 34203.011926974 594.49 20 submission bid 34203.011926981 597.68 30 submission ask 34203.011926991 594.47 15 execution ask 34203.011927072 597.68 10 cancellation ask 34203.011927082 599.88 12 submission ask 34203.011927097 598.38 11 submission ask
.v2
, v3
and v5
respectively. A set of v5 is needed to keep track of the accumulating difference in price and volume of shares between ask and bid. Then, taking into account the value of the data in previous periods, we calculate the parameters of the time-sensitive category. You can read more about feature calculations in the original article .MeanPriceMove
, which can be Stationary
, Up
or Down
(will not change, grow, decrease). trait Label[L] extends Serializable { label => def apply(current: OrderBook, future: OrderBook): Option[L] } sealed trait MeanPriceMove object MeanPriceMove { case object Up extends MeanPriceMove case object Down extends MeanPriceMove case object Stationary extends MeanPriceMove } object MeanPriceMovementLabel extends Label[MeanPriceMove] { private[this] val basicSet = BasicSet.apply(BasicSet.Config.default) def apply(current: OrderBook, future: OrderBook): Option[MeanPriceMove] = { val currentMeanPrice = basicSet.meanPrice(current) val futureMeanPrice = basicSet.meanPrice(future) val cell: Cell[MeanPriceMove] = currentMeanPrice.zipMap(futureMeanPrice) { (currentMeanValue, futureMeanValue) => if (currentMeanValue == futureMeanValue) MeanPriceMove.Stationary else if (currentMeanValue > futureMeanValue) MeanPriceMove.Down else MeanPriceMove.Up } cell.toOption } }
OrderBook
consists of two sorted tables, where the key is the price, and the value is the number of shares. case class OrderBook(symbol: String, buy: TreeMap[Int, Int] = TreeMap.empty, sell: TreeMap[Int, Int] = TreeMap.empty)
Cell
tool from the Framian library to visually present the extracted values of attributes: Value
, NA
or NM
.OrderBook
data, the latter requires the creation of the OrdersTrail
table, which is essentially an unformatted order log, which the window Fourier transform has been applied to. sealed trait BasicAttribute[T] extends Serializable { self => def apply(orderBook: OrderBook): Cell[T] def map[T2](f: T => T2): BasicAttribute[T2] = new BasicAttribute[T2] { def apply(orderBook: OrderBook): Cell[T2] = self(orderBook).map(f) } }
sealed trait TimeInsensitiveAttribute[T] extends Serializable { self => def apply(orderBook: OrderBook): Cell[T] def map[T2](f: T => T2): TimeInsensitiveAttribute[T2] = new TimeInsensitiveAttribute[T2] { def apply(orderBook: OrderBook): Cell[T2] = self(orderBook).map(f) } }
trait TimeSensitiveAttribute[T] extends Serializable { self => def apply(ordersTrail: Vector[OpenBookMsg]): Cell[T] def map[T2](f: T => T2): TimeSensitiveAttribute[T2] = new TimeSensitiveAttribute[T2] { def apply(ordersTrail: Vector[OpenBookMsg]): Cell[T2] = self(ordersTrail).map(f) } }
class BasicSet private[attribute] (val config: BasicSet.Config) extends Serializable { private[attribute] def askPrice(orderBook: OrderBook)(i: Int): Cell[Int] = { Cell.fromOption { orderBook.sell.keySet.drop(i - 1).headOption } } private[attribute] def bidPrice(orderBook: OrderBook)(i: Int): Cell[Int] = { Cell.fromOption { val bidPrices = orderBook.buy.keySet if (bidPrices.size >= i) { bidPrices.drop(bidPrices.size - i).headOption } else None } } private def attribute[T](f: OrderBook => Cell[T]): BasicAttribute[T] = new BasicAttribute[T] { def apply(orderBook: OrderBook): Cell[T] = f(orderBook) } def askPrice(i: Int): BasicAttribute[Int] = attribute(askPrice(_)(i)) def bidPrice(i: Int): BasicAttribute[Int] = attribute(bidPrice(_)(i)) val meanPrice: BasicAttribute[Double] = { val ask1 = askPrice(1) val bid1 = bidPrice(1) BasicAttribute.from(orderBook => ask1(orderBook).zipMap(bid1(orderBook)) { (ask, bid) => (ask.toDouble + bid.toDouble) / 2 }) } }
LabeledPointsExtractor
: class LabeledPointsExtractor[L: LabelEncode] { def labeledPoints(orders: Vector[OpenBookMsg]): Vector[LabeledPoint] = { log.debug(s"Extract labeled points from orders log. Log size: ${orders.size}") // ... } }
val extractor = { import com.scalafi.dynamics.attribute.LabeledPointsExtractor._ (LabeledPointsExtractor.newBuilder() += basic(_.askPrice(1)) += basic(_.bidPrice(1)) += basic(_.meanPrice) ).result(symbol, MeanPriceMovementLabel, LabeledPointsExtractor.Config(1.millisecond)) }
Extractor
prepare the marked points using the MeanPriceMovementLabel
with three signs: the ask price (ask price), the set price (bid price) and the average price (mean price).EQY_US_NYSE_BOOK_20130403
to train the model and EQY_US_NYSE_BOOK_20130404
to check the correctness of the work. object DecisionTreeDynamics extends App with ConfiguredSparkContext with FeaturesExtractor { private val log = LoggerFactory.getLogger(this.getClass) case class Config(training: String = "", validation: String = "", filter: Option[String] = None, symbol: Option[String] = None) val parser = new OptionParser[Config]("Order Book Dynamics") { // .... } parser.parse(args, Config()) map { implicit config => val trainingFiles = openBookFiles("Training", config.training, config.filter) val validationFiles = openBookFiles("Validation", config.validation, config.filter) val trainingOrderLog = orderLog(trainingFiles) log.info(s"Training order log size: ${trainingOrderLog.count()}") // Configure DecisionTree model val labelEncode = implicitly[LabelEncode[MeanPriceMove]] val numClasses = labelEncode.numClasses val categoricalFeaturesInfo = Map.empty[Int, Int] val impurity = "gini" val maxDepth = 5 val maxBins = 100 val trainingData = trainingOrderLog.extractLabeledData(featuresExtractor(_: String)) val trainedModels = (trainingData map { case LabeledOrderLog(symbol, labeledPoints) => log.info(s"$symbol: Train Decision Tree model. Training data size: ${labeledPoints.count()}") val model = DecisionTree.trainClassifier(labeledPoints, numClasses, categoricalFeaturesInfo, impurity, maxDepth, maxBins) val labelCounts = labeledPoints.map(_.label).countByValue().map { case (key, count) => (labelEncode.decode(key.toInt), count) } log.info(s"$symbol: Label counts: [${labelCounts.mkString(", ")}]") symbol -> model }).toMap val validationOrderLog = orderLog(validationFiles) log.info(s"Validation order log size: ${validationOrderLog.count()}") val validationData = validationOrderLog.extractLabeledData(featuresExtractor(_: String)) // Evaluate model on validation data and compute training error validationData.map { case LabeledOrderLog(symbol, labeledPoints) => val model = trainedModels(symbol) log.info(s"$symbol: Evaluate model on validation data. Validation data size: ${labeledPoints.count()}") log.info(s"$symbol: Learned classification tree model: $model") val labelAndPrediction = labeledPoints.map { point => val prediction = model.predict(point.features) (point.label, prediction) } val trainingError = labelAndPrediction.filter(r => r._1 != r._2).count().toDouble / labeledPoints.count log.info(s"$symbol: Training Error = " + trainingError) } } }
ORCL
ticker: ORCL: Train Decision Tree model. Training data size: 64064 ORCL: Trained model in 3740 millis ORCL: Label counts: [Stationary -> 42137, Down -> 10714, Up -> 11213] ORCL: Evaluate model on validation data. Validation data size: 54749 ORCL: Training Error = 0.28603262160039455
Application code on GitHub
Source: https://habr.com/ru/post/259803/
All Articles