⬆️ ⬇️

The quickest introduction to Reactive Programming

The purpose of this article is to show by example why reactive programming is needed, how it relates to functional programming, and how it can be used to write declarative code that can be easily adapted to new requirements. In addition, I want to do this as briefly and simply as possible with an example close to the real one.



Take the following task:

There is a service with REST API and endpoint /people . When a POST request to this endpoint is created, a new entity is created. Write a function that accepts an array of objects of the form { name: 'Max' } and creates a set of entities using the API (in English, this is called a batch operation).



Let's solve this problem in an imperative style:



 const request = require('superagent') function batchCreate(bodies) { const calls = [] for (let body of bodies) { calls.push( request .post('/people') .send(body) .then(r => r.status) ) } return Promise.all(calls) } 


For comparison, let's rewrite this piece of code in a functional style. For simplicity, the functional style will be understood as:



  1. Using functional primitives ( .map , .filter , .reduce ) instead of imperative loops ( for , while )
  2. The code is organized in "pure" functions - they depend only on their arguments and do not depend on the state of the system


Functional code:



 const request = require('superagent') function batchCreate(bodies) { const calls = bodies.map(body => request .post('/people') .send(body) .then(r => r.status) ) return Promise.all(calls) } 


We received a piece of code of the same size and it is worth admitting that it is not clear why this piece is better than the previous one.

In order to understand how the second piece of code is better - you need to start changing the code, imagine that a new requirement has appeared for the original task:

The service that we call has a limit on the number of requests in a period of time: in one second one client can execute no more than five requests. Performing more requests will cause the service to return a 429 HTTP error (too many requests).



In this place, it is probably worth stopping and trying to solve the problem yourself,% username%



Let's take our functional code as a basis and try to change it. The main problem of "pure" functional programming is that it does not "know" anything - about the runtime environment and input-output (in English there is a side effect for this), but in practice we are constantly working with them.

To fill this gap, Reactive Programming comes to the rescue - a set of approaches trying to solve the problem of side effects. The most famous implementation of this paradigm is the Rx library, which uses the concept of reactive streams



What is reactive streams? In short, this is an approach that allows you to apply functional primitives (.map, .filter, .reduce) to something distributed over time.



For example, we transmit a set of commands over the network - we do not need to wait until we receive the entire set, we represent it as a reactive stream and can work with it. Here there are two more important concepts:





The purpose of this article is to find easy ways, therefore, we will take the Highland library, which tries to solve the same problem as Rx, but is much easier to learn. The idea behind it is simple: let's take Node.js streams as a basis and we will “pour” data from one Stream into another.



Let’s get started: let's start with a simple one — let's make our code “reactive” without adding new functionality



 const request = require('superagent') const H = require('highland') function batchCreate(bodies) { return H(bodies) .flatMap(body => H(request .post('localhost:3000/people') .send(body) .then(r => r.status) ) ) .collect() .toPromise(Promise) } 


What you should pay attention to:





Now let's try to implement our requirement:



 const request = require('superagent') const H = require('highland') function batchCreate(bodies) { return H(bodies) .flatMap(body => H(request .post('localhost:3000/people') .send(body) .then(r => r.status) ) ) .ratelimit(5, 1000) .collect() .toPromise(Promise) } 


Thanks to the concept of backpressure, this is just one line of .ratelimit in this paradigm. In Rx, this takes up about the same amount of space .



Well, that's all, I am interested in your opinion:





PS: here you can find another one of my articles about Reactive Programming



')

Source: https://habr.com/ru/post/427467/



All Articles