📜 ⬆️ ⬇️

Principles of reactive programming using a simple RSS aggregator using ReactiveX for Python


In recent years, reactive programming in general, and ReactiveX technology in particular, is becoming increasingly popular among developers. Some are already actively using all the advantages of this approach, while others have only “heard something”. For my part, I will try to help you imagine how some of the concepts of reactive programming can change the view on the seemingly familiar things.

There are two fundamentally different ways of organizing large systems: in accordance with the objects and states that live in the system, and in accordance with the data flows that pass through it. The reactive programming paradigm implies the ease of expressing data streams, as well as the propagation of changes due to these streams. For example, in imperative programming, the assignment operation means the finiteness of the result, whereas in the reactive value it will be recalculated when new input data is received. The flow of values ​​passes through the system a series of transformations that are necessary to solve a specific task. Threading allows the system to be expandable and asynchronous, and the correct response to errors that occur is fault tolerant.

ReactiveX is a library that allows you to create asynchronous and event-oriented programs that use observed sequences. It extends the Observer pattern to support data sequences, adds operators for their declarative connection, eliminating the need to take care of synchronization and thread security, shared data structures, and non-blocking I / O.

One of the main differences of the ReactiveX library from functional reactive programming is that it operates not with continuously changing, but discrete values ​​that are “emitted” for a long time.
')
It is worth telling a little about what Observer, Observable, Subject are. The Observable model is a data source and allows you to handle asynchronous event streams in a similar way with the one you use for data collections, such as arrays. And all this instead of callbacks, which means that the code is more readable and less prone to errors.

In ReactiveX, the Observer subscribes to the Observable and subsequently responds to an element or sequence of elements that it sends. Every Observer subscribed to Observable calls Observer.on_next () on each element of the data stream, after which both Observer.on_complete () and Observer.on_error () can be called. Often an Observable is used in such a way that it does not begin to give data until someone subscribes to it. These are the so-called “lazy calculations” - values ​​are calculated only when there is a need for them.

observer

There are tasks for which you need to connect the Observer and Observable to receive event messages and report them to your subscribers. For this, there is a Subject that has, in addition to the standard, several more implementations:


Observable and Observer are just the beginning of ReactiveX. They do not carry all the power that the operators allow to transform, merge, manipulate the sequences of elements that are given to the Observable.

In the ReactiveX documentation, the description of the operators includes the use of the Marble Diagram. For example, here’s how these diagrams represent Observable and their transformations:

observable

Looking at the diagram below, it is easy to understand that the map operator transforms the elements given to Observable by applying a function to each of them.

map

A good illustration of ReactiveX features is the RSS aggregator application. Here there is a need for asynchronous data loading, filtering and transformation of values, maintaining the current state by periodic updates.

In this article, examples for representing the core principles of ReactiveX are written using the rx library for the Python programming language. This is, for example, the abstract implementation of the observer:

class Observer(metaclass=ABCMeta): @abstractmethod def on_next(self, value): return NotImplemented @abstractmethod def on_error(self, error): return NotImplemented @abstractmethod def on_completed(self): return NotImplemented 

Our application in real time will exchange messages with the browser via web sockets. The ability to easily implement this provides Tornado .

The program starts with the launch of the server. When the browser accesses the server, a web socket is opened.

Code
 import json import os import feedparser from rx import config, Observable from rx.subjects import Subject from tornado.escape import json_decode from tornado.httpclient import AsyncHTTPClient from tornado.platform.asyncio import AsyncIOMainLoop from tornado.web import Application, RequestHandler, StaticFileHandler, url from tornado.websocket import WebSocketHandler asyncio = config['asyncio'] class WSHandler(WebSocketHandler): urls = ['https://lenta.ru/rss/top7', 'http://wsrss.bbc.co.uk/russian/index.xml'] def open(self): print("WebSocket opened") #       def on_message(self, message): obj = json_decode(message) #  ,   user_input self.subject.on_next(obj['term']) def on_close(self): #   Observable;      observable self.combine_latest_sbs.dispose() print("WebSocket closed") class MainHandler(RequestHandler): def get(self): self.render("index.html") def main(): AsyncIOMainLoop().install() port = os.environ.get("PORT", 8080) app = Application([ url(r"/", MainHandler), (r'/ws', WSHandler), (r'/static/(.*)', StaticFileHandler, {'path': "."}) ]) print("Starting server at port: %s" % port) app.listen(port) asyncio.get_event_loop().run_forever() 


To process a request entered by a user, a Subject is created, when he subscribes to which he sends the default value (in our case, an empty string), and then sends what the user has entered and satisfies the conditions: length 0 or greater than 2, the value has changed.
 # Subject   Observable,  Observer self.subject = Subject() user_input = self.subject.throttle_last( 1000 #        ).start_with( '' #         ).filter( lambda text: len(text) == 0 or len(text) > 2 ).distinct_until_changed() #     

Observable is also provided for periodic news updates, which gives a value every 60s.

 interval_obs = Observable.interval( 60000 #     60 (  ) ).start_with(0) 

These two streams are connected by the combine_latest operator, Observable is embedded in the chain to get the list of news. After that a subscription is created for this Observable, the whole chain starts working only at this moment.

 # combine_latest  2       # ,        self.combine_latest_sbs = user_input.combine_latest( interval_obs, lambda input_val, i: input_val ).do_action( #      #        lambda x: send_response('clear') ).flat_map( #    Observable    self.get_data ).subscribe(send_response, on_error) #  ;         

It is necessary to elaborate on what “Observable to get the list of news”. From the url list for receiving news, we create a data stream whose elements come into a function, where using the HTTP client Tornado AsyncHTTPClient, asynchronous data loading takes place for each urls list item. They also create a data stream, which is filtered by a query entered by the user. From each stream, we take 5 news items that lead to the desired format for sending to the frontend.

Code
 def get_rss(self, rss_url): http_client = AsyncHTTPClient() return http_client.fetch(rss_url, method='GET') def get_data(self, query): # Observable    url return Observable.from_list( self.urls ).flat_map( #   url  Observable,    lambda url: Observable.from_future(self.get_rss(url)) ).flat_map( #   ,    Observable lambda x: Observable.from_list( feedparser.parse(x.body)['entries'] ).filter( #          lambda val, i: query in val.title or query in val.summary ).take(5) #    5    url ).map(lambda x: {'title': x.title, 'link': x.link, 'published': x.published, 'summary': x.summary}) #       


After the output data stream has been formed, its subscriber begins to receive data element by element. The send_response function sends the received values ​​to the frontend, which adds the news to the list.

 def send_response(x): self.write_message(json.dumps(x)) def on_error(ex): print(ex) 

File feeder.js

Code
  ws.onmessage = function(msg) { var value = JSON.parse(msg.data); if (value === "clear") {$results.empty(); return;} // Append the results $('<li><a tabindex="-1" href="' + value.link + '">' + value.title +'</a> <p>' + value.published + '</p><p>' + value.summary + '</p></li>' ).appendTo($results); $results.show(); } 


Thus, a push-technology is implemented, in which data comes from the server to the frontend, which only sends a query entered by the user to search for news.

As a conclusion, I propose to think about what kind of implementation would be obtained with the usual approach using callbacks instead of Observable, without the ability to easily merge data streams, without the ability to instantly send data to the consumer frontend and with the need to track changes in the query string. Among the Python-developers, the technology is practically not widely distributed yet, but I already see several possibilities to apply it on current projects.

An example of using ReactiveX for Python can be found in the github repository with the RSS aggregator demo project.

Source: https://habr.com/ru/post/310824/


All Articles