📜 ⬆️ ⬇️

The problem of saving context in asynchronous programming in scala

At some point in the project there is a question of tracking the progress of the operation, receiving or storing some information about it. For this, the context of the operation serves as best as possible, for example, the context of a client session. If it is interesting to you how it can be made rather without serious consequences, I ask under kat.

In the java world, often (but not always), each operation is performed in its own thread. And here everything turns out pretty simple, you can use the ThreadLocal object and receive it at any time during the operation:

class Context { public static final ThreadLocal<Context> global = new ThreadLocal<Context>; } //-     Context context = new Context(...); Context.global.set(context); try { someService.someMethod(); } finally { Context.global.set(null); } 

In scala, often, things are not so simple, and in the course of an operation, the flow can change repeatedly, for example, in very asynchronous applications. And the way with ThreadLocal is no longer suitable (as is the case with switching threads in java, of course).

The first thing that comes to mind is to pass the context through the function's implicite argument.
')
 def foo(bar: Bar)(implicit context: Context) 

But it will litter the protocol of services. Having broken a little head, a rather simple idea came: to bind the context to the objects of the service, and distribute it to internal services as functions are called.

Suppose our context looks like this:

 //data -       class Context(val operationId: String, val data: TrieMap[String, String] = TrieMap.empty) 

Create traits with which to label context-sensitive objects:

 trait ContextualObject { protected def context: Option[Context] } //,     trait ChangeableContextualObject[T <: ContextualObject] extends ContextualObject { def withContext(ctx: Option[Context]): T } //    trait EmptyContext { _: ContextualObject => override protected val context: Option[Context] = None } 

Now we will announce our services and implementations:

 //,       trait ServiceA extends ChangeableContextualObject[ServiceA] { def someSimpleOperation: Int def someLongOperation(implicit executionContext: ExecutionContext): Future[Int] } trait ServiceAImpl extends ServiceA { override def someSimpleOperation: Int = 1 override def someLongOperation(implicit executionContext: ExecutionContext): Future[Int] = { Future(someSimpleOperation) .map { res => // -    ,    context.foreach(_.data.put("ServiceA.step1", res.toString)) res * Random.nextInt(10) } .map { res => context.foreach(_.data.put("ServiceA.step2", res.toString)) res - Random.nextInt(5) } .andThen { case Success(res) => context.foreach(_.data.put("ServiceA.step3", res.toString)) } } //      override def withContext(ctx: Option[Context]): ServiceA = new ServiceAImpl { ctx.foreach(_.data.put("ServiceA.withContext", "true")) override protected def context: Option[Context] = ctx } } object ServiceAImpl { def apply(): ServiceAImpl = new ServiceAImpl with EmptyContext } 

And the second service that will use the first one:

 trait ServiceB extends ChangeableContextualObject[ServiceB] { def someOperationWithoutServiceA: Int def someOperationWithServiceA(implicit executionContext: ExecutionContext): Future[Boolean] } /** *         : *            ? *     EmptyContext   , *       withContext. * ,  ,      cake pattern    */ trait ServiceBImpl extends ServiceB { self => protected def serviceA: ServiceA override def someOperationWithoutServiceA: Int = 1 override def someOperationWithServiceA(implicit executionContext: ExecutionContext): Future[Boolean] = { serviceA.someLongOperation.map { case res if res % 2 == 0 => context.foreach(_.data.put("ServiceB.res", "even")) true case res => context.foreach(_.data.put("ServiceB.res", "odd")) false } } override def withContext(ctx: Option[Context]): ServiceB = new ServiceBImpl { ctx.foreach(_.data.put("ServiceB.withContext", "true")) override protected val context: Option[Context] = ctx // ,  ,        //      lazy val, //        ,     . //       override protected lazy val serviceA: ServiceA = self.serviceA.withContext(ctx) } } object ServiceBImpl { //    -        , //    ,        . //      : // class Builder(val serviceA: ServiceA) extends ServiceBImpl with EmptyContext //    : // new ServiceBImpl.Builder(serviceA) // , ,   ,     . def apply(a: ServiceA): ServiceBImpl = new ServiceBImpl with EmptyContext { //         val override protected val serviceA: ServiceA = a } } 

As a result, in the place of the call we will receive the following code:

 val context = new Context("opId") val serviceBWithContext = serviceB.withContext(Some(context)) serviceBWithContext.someOperationWithoutServiceA context.data.get("ServiceB.withContext") // Some("true") context.data.get("ServiceA.withContext") // None serviceBWithContext.someOperationWithServiceA.andThen { case _ => context.data.get("ServiceA.withContext") // Some("true") context.data.get("ServiceA.step1") // Some("1") } 

Everything is quite simple - thus, in the course of the operation there will be the same context. But it is necessary for this to find some real use. For example, we recorded important information during the operation, and now we want to log this information. The simplest option was to create a logger for each context, and when writing to the message log, assign this information to it. But there is a logging problem that occurs outside of your code (for example, in a third-party library).

To ensure that the context can be used outside of our code, we will make ThreadLocal with our context:

 object Context { val global: ThreadLocal[Option[Context]] = ThreadLocal.withInitial[Option[Context]](() => None) //    def runWith[T](context: Context)(operation: => T): T = { runWith(Some(context))(operation) } //    def runWith[T](context: Option[Context])(operation: => T): T = { val old = global.get() global.set(context) //         try operation finally global.set(old) } } 

For example, if you use the logback-classic library for logging, then you can write your Layout to log these parameters.

Possible implementation
 class OperationContextLayout extends LayoutBase[ILoggingEvent] { private val separator: String = System.getProperty("line.separator") override def doLayout(event: ILoggingEvent): String = { val sb = new StringBuilder(256) sb.append(event.getFormattedMessage) .append(separator) appendContextParams(sb) appendStack(event, sb) sb.toString() } private def appendContextParams(sb: StringBuilder): Unit = { Context.global.get().foreach { ctx => sb.append("operationId=") .append(ctx.operationId) ctx.data.readOnlySnapshot().foreach { case (key, value) => sb.append(" ").append(key).append("=").append(value) } sb.append(separator) } } private def appendStack(event: ILoggingEvent, sb: StringBuilder): Unit = { if (event.getThrowableProxy != null) { val converter = new ThrowableProxyConverter converter.setOptionList(List("full").asJava) converter.start() sb.append() } } } 


Possible config
 <configuration> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder"> <layout class="operation.context.logging.OperationContextLayout" /> </encoder> </appender> <root level="debug"> <appender-ref ref="STDOUT" /> </root> </configuration> 


And we will try to secure something:

  def runWithoutA(): Unit = { val context = Some(createContext()) val res = serviceB.withContext(context).someOperationWithoutServiceA Context.runWith(context) { // Result of someOperationWithoutServiceA: '1' // operationId=GPapC6JKmY ServiceB.withContext=true logger.info(s"Result of someOperationWithoutServiceA: '$res'") } } 

  def runWithA(): Future[_] = { val context = Some(createContext()) serviceB.withContext(context).someOperationWithServiceA.andThen { case _ => Context.runWith(context) { // someOperationWithServiceA completed // operationId=XU1SGXPq1N ServiceB.res=even ServiceA.withContext=true ServiceB.withContext=true ServiceA.step1=1 ServiceA.step2=7 ServiceA.step3=4 logger.info("someOperationWithServiceA completed") } } } 

And the question remains: what about the external code that runs in the ExecutionContext ? But no one bothers us to write a wrapper for him:

Possible implementation of a wrapper
 class ContextualExecutionContext(context: Option[Context], executor: ExecutionContext) extends ExecutionContext { override def execute(runnable: Runnable): Unit = executor.execute(() => { Context.runWith(context)(runnable.run()) }) override def reportFailure(cause: Throwable): Unit = { Context.runWith(context)(executor.reportFailure(cause)) } } object ContextualExecutionContext { implicit class ContextualExecutionContextOps(val executor: ExecutionContext) extends AnyVal { def withContext(context: Option[Context]): ContextualExecutionContext = new ContextualExecutionContext(context, executor) } } 


Possible implementation of an external system
 class SomeExternalObject { val logger: Logger = LoggerFactory.getLogger(classOf[SomeExternalObject]) def externalCall(implicit executionContext: ExecutionContext): Future[Int] = { Future(1).andThen { case Success(res) => logger.debug(s"external res $res") } } } 

Let's try to make a call in our ExecutionContext :


  def runExternal(): Future[_] = { val context = Some(createContext()) implicit val executor = global.withContext(context) // external res 1 // operationId=8Hf277SV7B someExternalObject.externalCall } 

That's the whole idea. In fact, the use of context is not limited to logging. You can store anything in this context. For example, a cast of some state, if you want all services during the operation to work with the same data. And so on and so forth.

If there is a need for the implementation of tracking the context when communicating actors, write in the comments, add the article. If you have ideas about a different implementation, also write in the comments, it will be interesting to read.

PS The source code of the project used in the article github.com/eld0727/scala-operation-context .
PPS I am sure that this approach can be applied to other languages ​​that allow you to create anonymous classes, and this is just a possible implementation on scala.

Source: https://habr.com/ru/post/323682/


All Articles