📜 ⬆️ ⬇️

Reactive Stream Log Processing with RxJava - Part 1

image
Reactive log stream processing with RxJava - Part l


In the previous post, the author considered the cases of using the ELK stack and collecting logs.
Taking into account the movement towards microservices and containerization of applications, centralized processing of logs and their storage becomes a de facto standard .


Maybe we should try to take the next step and more actively use the information obtained in order to find the causes of a number of problems long before they appear. *


Footnote - The streams and data streams in this translation are interchangeable words. Also, the word log can mean a log, although in most cases we use a different meaning in the text.


If we viewed the event log as a data stream of what is happening in real time on your system, it would be very interesting to analyze the real-time data and all possible uses, for example, to detect fraudulent behavior by aggregating various information flows directly during the "attack" , and immediately block the attacker instead of "traditionally" collecting log data and investigating after the incident.


Or another example, we can filter ( filter ) only those events that correspond to a certain type of events, group ( group by ) them by the common key as userID and calculate the total number in the time window, getting the number of events of this type that the user performs in a certain period of time.


failedLogStream() .window(5,TimeUnit.SECONDS) .flatMap(window -> window .groupBy(propertyStringValue("remoteIP")) .flatMap(grouped -> grouped .count() .map( failedLoginsCount -> { final String remoteIp = grouped.getKey(); return new Pair<>(remoteIp, failedLoginsCount); })) ) .filter(pair -> pair.get > 10) .forEach(System.out::println); 

We can initiate requests in other systems and work with their answers as data streams, to which we can subscribe and apply several familiar operators to work with streams (data streams) that are represented in the reactive streams frameworks.


Learning a new development paradigm


It would be nice to make out what reactive programming of streams is , for this we don’t need to deploy something big, such as Kafka Streams Spark or Flink .


Reactive programming is non-blocking , event - driven applications that scale even with a small number of threads with load opposition (a feedback mechanism in which the amount of data from manufacturers does not exceed the number of data received by consumers).


The biggest topic Spring5 brings will be Support for Reactive Programming . The new spring-web-reactive module is a framework similar to spring-web-mvc , which will allow to give asynchronous (non-blocking) responses for REST services and a reactive web client, which implies the possibility of using this solution for microservice architecture. The concept of reactive streams is not specific to Spring, since there is a general specification of reactive-streams-jvm agreed upon by most of the reactive frameworks (it may not have an identical name for it yet, but the concept should be simple enough to become a replacement for frameworks).


Historically, the model of reactive streams was represented by Rx.NET, and then ported to java using Netflix, with the name RxJava. At the same time, the concept has also been successfully implemented in other languages, called Reactive EXtensions . Since then, companies have moved in the same direction as the specification of jet streams. Now RxJava , since it was a pioneer, needs significant refactoring (code rewriting) - accordingly, version 2.x better meets the specification, and while Spring reactor is still a beginner, it will not be difficult for the company to rewrite the implementation according to the specification. We recommend reading more about how they are interrelated.


Doug Lea said that he wants to include jet streams in the java.util.concurrent.Flow object, which means that jet streams will be supplied as part of Java 9 .


Performance Benefits


Also, another fashionable word now is microservice architecture with the obligatory ability to make requests for many different services. Ideally, it is best to perform non-blocking requests, without waiting for the next request to receive the entire response. Think, instead of waiting for the moment when some service returns you a large list of results, it may be worthwhile at the same time when you receive the first fragment to send a new request to another system.


Never block


If we consider a response from a remote request as a Stream (Stream-data stream), a subscription to which triggers an action when an answer is received, instead of blocking the stream that is waiting for its response, we can use a smaller number of streams in general, which, in turn, , will reduce the cost of resources (for example, processor time for context switching between threads and memory for each stack of threads).


Thus, the use of reactive programming will allow us to process more event logs than usual on a standard hardware.


Example: a service, such as Gmail, needs to display user emails. However, emails, in turn, can have many people in a copy (CC). It would be cool to display a photo for those users who are in your contacts, which means calling REST - ContactService.


It turns out like this:


 Future<List<Mail>> emailsFuture = mailstoreService.getUnreadEmails(); List<Mail> emails = emailsFuture.get(); //   //  ,        //    ,      ? Future<List<Contacts>> contacts = getContactsForEmails(emails); for(Mail mail : emails) { streamRenderEmails(mail, contacts); //push() emails  } 

Part of the problem was solved with the arrival of support for reactive programming in Java 8 with the Completable Future (with its thenCompose, thenCombine, thenAccept and 50 more methods, although this does not negate the fact that you need to remember everything they do, but it does not help in reading code).


 CompletableFuture<List<Mail>> emailsFuture = mailstoreService.getUnreadEmails(); CompletableFuture<List<Contact>> emailsFuture .thenCompose(emails -> getContactsForEmails(emails)) //     List<Mail> .thenAccept(emailsContactsPair -> streamRenderEmails(emailsContactsPair.getKey(), emailsContactsPair.getValue())) 

We can switch to Iterator instead of List, and at the same time there are no methods to perform any action when new values ​​appear. In SQL, there is such a possibility, for example, ResultSet (in which you can execute rs.next ()) instead of loading all data into memory.


 public interface Iterator<E> { /** *  {@code true},     . */ boolean hasNext(); /** *    . */ E next(); } 

But we still need to constantly ask, "Do you have a new meaning?"


 Iterable<Mail> emails = mailstoreService.getUnreadEmails(); Iterator<Mail> emailsIt = emails.iterator(); while(emailsIt.hasNext()) { Mail mail = emailsIt.next(); //            if(mail != null) { .... } } 

What we need is a reactive iterator, a data type that can subscribe and perform an action as soon as a new value is received. This is where reactive stream programming begins.


So what is Stream?


Everything is a stream


Stream is simply a sequence of events arranged in time ( event X occurs after event Y, so events do not compete with each other ).


Stream is modeled so that it releases 0..N events and one of two terminal operations :



We can describe it visually with the help of ' marble diagrams '.


Marble diagram for Observable


Thus, we can imagine that the stream is everything, and not just the event log. Even a single value can be expressed as a Stream issuing a value, followed by an event of completion.


Endless stream - a stream that releases events, but without a single terminal event (completion | error).


RxJava defines the Observable (Observable) data type for Stream modeling a type event. In Spring Reactor, it is equal to the type Flux .

Observable is a stream of temperatures taken at various intervals.

Observable is a stream of products purchased in our web store.

Observable is a single user (User), returned on request to the database.

  public Observable<User> findByUserId(String userId) {...} //  Single    public Single<User> findByUserId(String userId) {...} 

But Observable is just a data type, so, as is the case with the Publish / Subscriber (Publish / Subscriber) design template, we need a Subscriber (Subscriber) to handle 3 types of events

  Observable<CartItem> cartItemsStream = ...; Subscriber<CartItem> subscriber = new Subscriber<CartItem>() { @Override public void onNext(CartItem cartItem) { System.out.println("Cart Item added " + cartItem); } @Override public void onCompleted() { } @Override public void onError(Throwable e) { e.printStackTrace(); } }; cartItemsStream.subscribe(subscriber); 

Reactive operators


But this is just a part of Stream, but until now we have not used anything unusual, just the classic Observer design pattern.


The Reactive (Reactive) part means that we can define some Function (operators - functions) that will be executed when stream triggers an event.


This means that another stream will be created ( immutable stream), to which we can sign another operator, etc.


 Observable<CartItem> filteredCartStream = cartStream.filter(new Func1<CartItem, Boolean>() { @Override public Boolean call(CartItem cartItem) { return cartItem.isLaptop(); } }); Observable<Long> laptopCartItemsPriceStream = filteredCartStream.map(new Func1<CartItem, Long>() { @Override public Long call(CartItem cartItem) { try { return priceService.getPrice(cartItem.getId()); } catch(PriceServiceException e) { thrown new RuntimeException(e); } } }); 

Since the operators (methods) of the Observable class (filter, map, groupBy, ...) return Observable, this means that we can use a chain of operators to combine them with the lambda syntax and write something beautiful.


 Observable<BigDecimal> priceStream = cartStream .filter((cartItem) -> cartItem.isLaptop()). .map((laptop) -> { try { return priceService.getPrice(cartItem.getId()); } catch(PriceServiceException e) { thrown new RuntimeException(e); } }); 

Note that above, when priceStream is created, nothing happens - priceService.getPrice() not called until there is an element passing through the chain of operators. This means that we created a similarity of the plan through an rx-operator, how the managed data will go down the chain (the signing is registered).


When asked to explain reactive programming, they usually jokingly give an example of Excel sheets, where the columns contain formulas that are called when a cell is updated, which in turn updates another cell, which, in turn, updates another one and so on along the chain.


Just like an rx-operator who does nothing, these formulas simply control the data and each one gets its chance to do something until the data goes down the chain.


To better understand how events travel along with a chain of operators, I found a useful analogy, in the example of moving from one house to another, the movers act as operators, with whom things move from your house - this is how Thomas Nild portrayed.


His example with the code:


 Observable<Item> mover1 = Observable.create(s -> { while (house.hasItems()) { s.onNext(house.getItem()); } s.onCompleted(); }); Observable<Item> mover2 = mover1.map(item -> putInBox(item)); Subscription mover3 = mover2.subscribe(box -> putInTruck(box), () -> closeTruck()); //    OnCompleted() 


"Loader 1 on the one hand is an Observable source. It creates outliers by taking things out of the house. It calls Loader 2 with the onNext() method, which performs a map() operation. When its onNext() method is called, it takes a thing and shifts to the box. Then he calls Loader 3, the final Subscriber (subscriber), with the onNext() method, which loads the box in the car. "


The magic of RxJava is a large set of available operators, but your job is to combine them all together to control the flow of data.



Many Stream operators help to compile a glossary of terms for actions performed with streams that can be implemented in popular languages ​​(RxJava, RxJS, Rx.NET, etc) from among the ReactiveX framework (Reactive Extensions).


These concepts should be known even when using different frameworks for working with jet streams, such as Spring Reactor (in the hope of having some operators common to these frameworks).


So far, we have only seen simple operators, such as filtering:


** Filter **


Which only pass elements that fall under the condition of the filter (one loader will carry only those things that cost less than $ 100, instead of transferring everything at once to another loader)


However, there are operators that can break the stream into many separate streams - Observable<Observable<T>> (Stream streams) are such operators as groupBy


> ** group by **


  Observable<Integer> values = Observable.just(1,4,5,7,8,9,10); Observable<GroupedObservable<String, Integer>> oddEvenStream = values.groupBy((number) -> number % 2 == 0 ? "odd":"even"); Observable<Integer> remergedStream = Observable.concat(oddEvenStream); remergedStream.subscribe(number -> System.out.print(number +" ")); 

 // //1 5 7 9 4 8 10 

and a fairly simple concat operator, which again creates a single stream from even and odd streams, and establishes a subscription to it.
> ** Concat **


We see that the concat operator waits for the end of the stream before adding another one, again creating one stream. Thus, odd numbers are displayed first.


We also have the ability to combine together many streams, such as the zip operator
> ** Zip operator **


Zip is named so not because it works as an archiver, but rather because it, like lightning (on a jacket), combines events from two stream-s.


> Lightning (Latch)


It takes one event from one stream and connects it with an event from another (making a pair). Once this is done, he applies the gluing operator before going further down the chain.


PS: it works for more streams.


So, even if one stream releases events faster, then the listener will see only the combined event, which will be released from the slower stream.


Having the ability to "wait" for a response from the many remote calls we receive from streaming s is actually very useful.


On the other hand, the combineLatest operator combineLatest not wait for a couple of events to be released, but instead uses the latest released events from the slower stream, before applying the glue function and passing it further along the chain.


> Combine latest


Moving towards thinking based on the push approach


Let's look at a few examples of how Observable s are actually created. The longest way to create:


  log("Before create Observable"); Observable<Integer> someIntStream = Observable .create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { log("Create"); emitter.onNext(3); emitter.onNext(4); emitter.onNext(5); emitter.onComplete(); log("Completed"); } }); log("After create Observable"); log("Subscribing 1st"); someIntStream.subscribe((val) -> LOGGER.info("received " + val)); //    // (for onError and onComplete)     , -  log("Subscribing 2nd"); someIntStream.subscribe((val) -> LOGGER.info("received " + val)); 

Events are sent to the subscriber as soon as he subscribes .
Not that we use such a construct, we just passed a new ObservableOnSubscribe object, which demonstrates what to do when someone signs to it.


Until we subscribe to Observable , there is no output and nothing happens, the data does not move.


When someone signs up, the call() method is call() and 3 messages are pushed down the chain, followed by a signal that stream has ended.


Above, we have subscribed twice, the code inside the call(...) method will also be called twice. So he effectively re-sends the same values ​​as soon as someone signs up and then gets the following values ​​to the output:


 mainThread: Before create Observable mainThread: After create Observable mainThread: Subscribing 1st mainThread: Create mainThread: received 3 mainThread: received 4 mainThread: received 5 mainThread: Completed mainThread: Subscribing 2nd mainThread: Create mainThread: received 3 mainThread: received 4 mainThread: received 5 mainThread: Completed 

It is important to note that rx operators do not necessarily mean multithreading. RxJava does not enforce default contention between Observable and Subscriber . Therefore, all calls occur on the " main " thread.


This type of Observable , which begins to spread when someone is signed, is called cold observables (cold observers). Another type is hot observables (hot observers), they can release events, even when no one subscribes to them.



Subjects are such a special kind of Observable , which is also an Observer (like Subscriber - which decides that it can push data (by onNext() ) to them) and make implementing hot Observables easier. There are also many implementations, like ReplaySubject , that store selected events in the buffer and replay them by subscription (of course, you can specify a buffer size to prevent the OutOfMemory error) while PublishSubject only misses events that occurred after signing.
And of course, there are many static methods for creating Observables and from other sources.


 Observable.just("This", "is", "something") Observable.from(Iterable<T> collection) Observable.from(Future<T> future) -    ,  `future`  

Adding to our ELK stack a RabbitMQ emitter of data sent via push


By tradition, working with the ELK stack, we use ElasticSearch to query the event log data, so we can say that they are in a pull-based polling style.


Can we instead have push-based, where we are going to inform 'immediately' when an event appears in the log, to further reduce the response time to the event, from the moment it occurred and before we start to process it.


One of the many possible solutions can be RabbitMq , as a solution experienced in battles with a very good reputation for its performance, for its ability to process a huge number of messages. Despite this, Logstash already supports the RabbitMQ plugin (there is also another FluentD plugin) so that we can easily integrate it into our existing ELK stack and write logs to ElasticSearch and RabbitMQ.


Perhaps you remember that Logstash can behave like a controller, and choose how it works, and where to send / save logged events. This means that we can filter the events that we want to process or indicate where to send them, for example, to other RabbitMQ queues.


It is even possible to directly send data to RabbitMQ through the Logback Appender, if you want to omit the use of Logstash .


By the way: the so-called AmqpAppender is so AmqpAppender a rather specific implementation of RabbitMQ AMQP (with AMQP protocol version 0-9-1, 0-9).


For example, ActiveMQ (while also supporting AMQP connector) seems to implement the protocol version AMQP 1.0, while the spring-amqp library with protocol versions 0-9-1, 0-9, which are quite different from 1.0), so you may encounter errors on type 'org.apache.activemq.transport.amqp.AmqpProtocolException: Connection from client using unsupported AMQP attempted'


However, our solution was to use logstash-logback-encoder and send formatted JSON with an event log to Logstash . We will redirect the logstash output to the RabbitMQ (exchange) exchange point.


We will use docker-compose to start the logstash-rabbitmq cluster .
You can clone the repository


docker-compose -f docker-compose-rabbitmq.yml up
and then you can use
./event-generate.sh
to generate a certain number of random events that will be sent to logstash .


, , , logstash . rabbitmq-output-plugin , :


 output { rabbitmq { exchange => logstash exchange_type => direct host => rabbitmq key => my_app } } 

RabbitMQ JMS , AMQP , .


amqp


(exchange) .


'routing-key', , . , ' logstash. '


AMQP . Spring c RabbitMq


  @Bean ConnectionFactory connectionFactory() { return new CachingConnectionFactory(host, port); } @Bean RabbitAdmin rabbitAdmin() { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory()); rabbitAdmin.declareQueue(queue()); rabbitAdmin.declareBinding(bindQueueFromExchange(queue(), exchange())); return rabbitAdmin; } @Bean SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames(queueName); container.setMessageListener(listenerAdapter); return container; } @Bean Queue queue() { return new Queue(queueName, false); } DirectExchange exchange() { return new DirectExchange("logstash"); } private Binding bindQueueFromExchange(Queue queue, DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("my_app"); } @Bean MessageListenerAdapter listenerAdapter(Receiver receiver) { MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(receiver, new MessageConverter() { public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException { throw new RuntimeException("Unsupported"); } public String fromMessage(Message message) throws MessageConversionException { try { return new String(message.getBody(), "UTF-8"); } catch (UnsupportedEncodingException e) { throw new RuntimeException("UnsupportedEncodingException"); } } }); messageListenerAdapter.setDefaultListenerMethod("receive"); //the method in our Receiver class return messageListenerAdapter; } @Bean Receiver receiver() { return new Receiver(); } 

'logstash', 'my_app'. MessageListenerAdapter , 'receive' Receiver , .


, , hot observable , , , PublishSubject .


 public class Receiver { private PublishSubject<JsonObject> publishSubject = PublishSubject.create(); public Receiver() { } /** * Method invoked by Spring whenever a new message arrives * @param message amqp message */ public void receive(Object message) { log.info("Received remote message {}", message); JsonElement remoteJsonElement = gson.fromJson ((String) message, JsonElement.class); JsonObject jsonObj = remoteJsonElement.getAsJsonObject(); publishSubject.onNext(jsonObj); } public PublishSubject<JsonObject> getPublishSubject() { return publishSubject; } } 

, SimpleMessageListenerContainer , ( ). Observable , ( onNext , onComplete , onError ):


 //     Observable.create(s -> { // Thread A new Thread(() -> { s.onNext("one"); s.onNext("two"); }).start(); // Thread B new Thread(() -> { s.onNext("three"); s.onNext("four"); }).start(); }); //     //  Observable<String> obs1 = Observable.create(s -> { // Thread A new Thread(() -> { s.onNext("one"); s.onNext("two"); }).start(); }); Observable<String> obs2 = Observable.create(s -> { // Thread B new Thread(() -> { s.onNext("three"); s.onNext("four"); }).start(); }); Observable<String> c = Observable.merge(obs1, obs2); 

Observable.serialize() Subject.toSerialized() , 1 Thread ListenerContainer , . , Subjects , . .


And now you can look at the code and the repository , as a continuation of this long Part II post (Part 2) or go to the Rx Playground there you will find more examples.
Link to translator's site


')

Source: https://habr.com/ru/post/329894/


All Articles