In recent years, reactive programming in general, and
ReactiveX technology in particular, is becoming increasingly popular among developers. Some are already actively using all the advantages of this approach, while others have only “heard something”. For my part, I will try to help you imagine how some of the concepts of reactive programming can change the view on the seemingly familiar things.
There are two fundamentally different ways of organizing large systems: in accordance with the objects and states that live in the system, and in accordance with the data flows that pass through it. The reactive programming paradigm implies the ease of expressing data streams, as well as the propagation of changes due to these streams. For example, in imperative programming, the assignment operation means the finiteness of the result, whereas in the reactive value it will be recalculated when new input data is received. The flow of values ​​passes through the system a series of transformations that are necessary to solve a specific task. Threading allows the system to be expandable and asynchronous, and the correct response to errors that occur is fault tolerant.
ReactiveX is a library that allows you to create asynchronous and event-oriented programs that use observed sequences. It extends the Observer pattern to support data sequences, adds operators for their declarative connection, eliminating the need to take care of synchronization and thread security, shared data structures, and non-blocking I / O.
One of the main differences of the ReactiveX library from functional reactive programming is that it operates not with continuously changing, but discrete values ​​that are “emitted” for a long time.
')
It is worth telling a little about what Observer, Observable, Subject are. The Observable model is a data source and allows you to handle asynchronous event streams in a similar way with the one you use for data collections, such as arrays. And all this instead of callbacks, which means that the code is more readable and less prone to errors.
In ReactiveX, the Observer subscribes to the Observable and subsequently responds to an element or sequence of elements that it sends. Every Observer subscribed to Observable calls Observer.on_next () on each element of the data stream, after which both Observer.on_complete () and Observer.on_error () can be called. Often an Observable is used in such a way that it does not begin to give data until someone subscribes to it. These are the so-called “lazy calculations” - values ​​are calculated only when there is a need for them.
There are tasks for which you need to connect the Observer and Observable to receive event messages and report them to your subscribers. For this, there is a Subject that has, in addition to the standard, several more implementations:
- ReplaySubject has the ability to cache all the data entered into it, and when a new subscriber appears, give up all this sequence from the beginning, working further as usual.
- BehaviorSubject stores the last value, by analogy with ReplaySubject giving it to the appeared subscriber. When created, it gets the default value that each new subscriber will receive if the last value has not yet been.
- AsyncSubject also stores the last value, but does not give the data until the entire sequence is completed.
Observable and Observer are just the beginning of ReactiveX. They do not carry all the power that the operators allow to transform, merge, manipulate the sequences of elements that are given to the Observable.
In the ReactiveX documentation, the
description of the operators includes the use of the Marble Diagram. For example, here’s how these diagrams represent Observable and their transformations:

Looking at the diagram below, it is easy to understand that the map operator transforms the elements given to Observable by applying a function to each of them.

A good illustration of ReactiveX features is the RSS aggregator application. Here there is a need for asynchronous data loading, filtering and transformation of values, maintaining the current state by periodic updates.
In this article, examples for representing the core principles of ReactiveX are written using the
rx library for the Python programming language. This is, for example, the abstract implementation of the observer:
class Observer(metaclass=ABCMeta): @abstractmethod def on_next(self, value): return NotImplemented @abstractmethod def on_error(self, error): return NotImplemented @abstractmethod def on_completed(self): return NotImplemented
Our application in real time will exchange messages with the browser via web sockets. The ability to easily implement this provides
Tornado .
The program starts with the launch of the server. When the browser accesses the server, a web socket is opened.
Code import json import os import feedparser from rx import config, Observable from rx.subjects import Subject from tornado.escape import json_decode from tornado.httpclient import AsyncHTTPClient from tornado.platform.asyncio import AsyncIOMainLoop from tornado.web import Application, RequestHandler, StaticFileHandler, url from tornado.websocket import WebSocketHandler asyncio = config['asyncio'] class WSHandler(WebSocketHandler): urls = ['https://lenta.ru/rss/top7', 'http://wsrss.bbc.co.uk/russian/index.xml'] def open(self): print("WebSocket opened")
To process a request entered by a user, a Subject is created, when he subscribes to which he sends the default value (in our case, an empty string), and then sends what the user has entered and satisfies the conditions: length 0 or greater than 2, the value has changed.
Observable is also provided for periodic news updates, which gives a value every 60s.
interval_obs = Observable.interval( 60000
These two streams are connected by the combine_latest operator, Observable is embedded in the chain to get the list of news. After that a subscription is created for this Observable, the whole chain starts working only at this moment.
It is necessary to elaborate on what “Observable to get the list of news”. From the url list for receiving news, we create a data stream whose elements come into a function, where using the HTTP client Tornado AsyncHTTPClient, asynchronous data loading takes place for each urls list item. They also create a data stream, which is filtered by a query entered by the user. From each stream, we take 5 news items that lead to the desired format for sending to the frontend.
Code def get_rss(self, rss_url): http_client = AsyncHTTPClient() return http_client.fetch(rss_url, method='GET') def get_data(self, query):
After the output data stream has been formed, its subscriber begins to receive data element by element. The send_response function sends the received values ​​to the frontend, which adds the news to the list.
def send_response(x): self.write_message(json.dumps(x)) def on_error(ex): print(ex)
File feeder.js
Code ws.onmessage = function(msg) { var value = JSON.parse(msg.data); if (value === "clear") {$results.empty(); return;}
Thus, a push-technology is implemented, in which data comes from the server to the frontend, which only sends a query entered by the user to search for news.
As a conclusion, I propose to think about what kind of implementation would be obtained with the usual approach using callbacks instead of Observable, without the ability to easily merge data streams, without the ability to instantly send data to the consumer frontend and with the need to track changes in the query string. Among the Python-developers, the technology is practically not widely distributed yet, but I already see several possibilities to apply it on current projects.
An example of using ReactiveX for Python can be found in the github
repository with the RSS aggregator demo project.