📜 ⬆️ ⬇️

Parallel Hystrix. Improve the performance of distributed applications

About a year ago, our team rewrote the backend of a little-known application with 5 million users using Hystrix's “latency and fault tolerance”. This made it possible to significantly increase the reliability of the application in case of drops or delays in downstream systems (there are about 10, which is not much for a serious system), provided a wonderful tool (Hystrix Dashboard) for monitoring the load of external systems (now we know who the brake is), it allowed to optimize the size of various pools in the application. However, there remains the problem of long-term processing of individual heavy requests, the solution to which this article is devoted to.

Why paste histriks was easy


All the buns were achieved with minimal effort and very small code changes - Hystrix provides a synchronous programming model, as a result only a small code calling http clients was transferred from application threads to hystrix thread pools with as few problems as possible (I do not believe that these problems can be avoid completely).

Consider a simple example - the CommandHelloWorld command, which with a delay retrieves very important data and client code that uses it.

public class CommandHelloWorld extends HystrixCommand<String> { @Override protected String run() throws InterruptedException { //          //        Thread.sleep(random.nextInt(500)); return "Hello " + name + "!"; } } //   String executeCommand(String str) { //    ,     . return new CommandHelloWorld(str).execute(); } 

It turned out a very simple solution with controlled execution time. Of course, over time, any initial advantage will result in flaws. There are customers who need to provide a list of 200 elements, to form each element, you need to go once (or not one) to the external system (sometimes not one). Not all external systems support batchy, not any code is so easy to parallelize without errors - by spending 20 ms per trip to the external system, we get a hefty 4 seconds for the entire request, during which the user suffered, and the tomkata thread pool did not receive its stream back, at the same time, the histrix thread pool was practically not filled with requests. It's time to fix it.
')

How do we arrange histriks


Almost the entire application was built according to the following pattern :


Of course, theoretically, there is a parallel operation in java stream, but in fact it has extremely poor management of thread pools in which work will be done, and it was decided to refuse its use. It would have been necessary to have some kind of a clear solution that would not break the original Pattern and not break the Moscow team. As a result, 2 working solutions were proposed - based on reactivex.io and add-on for java stream.

The demo can be found on a githaba , all working examples are placed in tests.

Option 1. Add reactivity


It should be noted that inside, histriks uses a reactive (this is a term, not a fact that it is fast) reactivex.io library of "asynchronous data sources". The article is not a guide to this uncooperative topic, but one elegant solution will be shown. Unfortunately, I am not familiar with the well-established Russian translation of the term observable, therefore I will call it a source. And so, wishing not to break the Pattern , we will act as follows:


On the one hand, the last operation is not as simple as it seems, but since we know that all external connections are controlled by the histrix, and therefore strictly limited in time, and the application’s own code doesn’t do anything - we can assume that the data source is the timeout is required to raise the value or the exception will be thrown out by the histrix before the timeout expires (1 second). Therefore, we will use a simple collector function:

 static List<String> toList(Observable<String> observable) { return observable.timeout(1, TimeUnit.SECONDS).toList().toBlocking().single(); } 


We will create teams and sources with two functions, using the native API:

  /** *    - */ static Observable<String> executeCommand(String str) { LOG.info("Hot Hystrix command created: {}", str); return new CommandHelloWorld(str).observe(); } /** *    - */ static Observable<String> executeCommandDelayed(String str) { LOG.info("Cold Hystrix command created: {}", str); return new CommandHelloWorld(str).toObservable(); } 

And so, consider the simple case of processing a list of 6 items:

  public void testNaive() { List<Integer> source = IntStream.range(1, 7).boxed().collect(Collectors.toList()); Observable<String> observable = Observable.from(source) .flatMap(elem -> executeCommand(elem.toString())); toList(observable).forEach(el ->LOG.info("List element: {}", el)); } 

Everything works great in parallel for approximately 500ms, the program logs confirm the simultaneous use of histrix streams. As a side effect, items are listed in a random order. Such is the price of reactivity.

Let's try to increase the size of the list to 49 - and get a natural bummer:

  public void testStupid() { List<Integer> source = IntStream.range(1, 50).boxed().collect(Collectors.toList()); Observable<String> observable = Observable.from(source) .flatMap(elem -> executeCommand(elem.toString())); toList(observable).forEach(el ->LOG.info("List element: {}", el)); } 

Here is such a funny exhaust in the log:

 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 1 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 2 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 3 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 4 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 5 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 6 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 7 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 8 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 9 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 10 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 11 [hystrix-ExampleGroup-5] INFO org.silentpom.CommandHelloWorld - Command start: 5 [hystrix-ExampleGroup-2] INFO org.silentpom.CommandHelloWorld - Command start: 2 [hystrix-ExampleGroup-9] INFO org.silentpom.CommandHelloWorld - Command start: 9 [hystrix-ExampleGroup-8] INFO org.silentpom.CommandHelloWorld - Command start: 8 [hystrix-ExampleGroup-4] INFO org.silentpom.CommandHelloWorld - Command start: 4 [hystrix-ExampleGroup-1] INFO org.silentpom.CommandHelloWorld - Command start: 1 [hystrix-ExampleGroup-6] INFO org.silentpom.CommandHelloWorld - Command start: 6 [hystrix-ExampleGroup-10] INFO org.silentpom.CommandHelloWorld - Command start: 10 [hystrix-ExampleGroup-3] INFO org.silentpom.CommandHelloWorld - Command start: 3 [main] ERROR org.silentpom.RxHystrixTest - Ooops com.netflix.hystrix.exception.HystrixRuntimeException: CommandHelloWorld could not be queued for execution and no fallback available. 

The fact is that by default, the histrix creates a pool of 10 threads with zero queue. When subscribing to a source, all its elements are emitted very quickly, instantly overflowing the entire pool, even the creation of a cold source did not help. We do not need such histriks. It is required to reasonably limit the greed of one client.

The solution was found quite simple:

  1. For a start, we will not use flatMap so that the subscription to the source does not cause the creation of all commands. And create a double source using the map method.
  2. let's group these sources by the window method - we get a triple source!
  3. it's time to strictly streamline the triple sources - release them one by one by the concatMap method
  4. each double source is parallel computed by the flatmap method

The code turned out to be surprisingly compact, although it took a lot of time to understand its work:

 public void testWindow() { List<Integer> source = IntStream.range(1, 50).boxed().collect(Collectors.toList()); Observable<String> observable = Observable.from(source) .map(elem -> executeCommandDelayed(elem.toString())) .window(7) .concatMap(window -> window.flatMap(x -> x)); toList(observable).forEach(el ->LOG.info("List element: {}", el)); } 

Let's see a fragment of the logs:

 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 20 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 21 [hystrix-ExampleGroup-3] INFO org.silentpom.CommandHelloWorld - Command start: 3 [hystrix-ExampleGroup-7] INFO org.silentpom.CommandHelloWorld - Command start: 7 [hystrix-ExampleGroup-5] INFO org.silentpom.CommandHelloWorld - Command start: 5 [hystrix-ExampleGroup-4] INFO org.silentpom.CommandHelloWorld - Command start: 4 [hystrix-ExampleGroup-2] INFO org.silentpom.CommandHelloWorld - Command start: 2 [hystrix-ExampleGroup-6] INFO org.silentpom.CommandHelloWorld - Command start: 6 [hystrix-ExampleGroup-1] INFO org.silentpom.CommandHelloWorld - Command start: 1 [hystrix-ExampleGroup-3] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 3 [hystrix-ExampleGroup-6] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 6 [hystrix-ExampleGroup-2] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 2 [hystrix-ExampleGroup-7] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 7 [hystrix-ExampleGroup-1] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 1 [hystrix-ExampleGroup-5] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 5 [hystrix-ExampleGroup-4] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 4 [hystrix-ExampleGroup-8] INFO org.silentpom.CommandHelloWorld - Command start: 8 [hystrix-ExampleGroup-5] INFO org.silentpom.CommandHelloWorld - Command start: 11 [hystrix-ExampleGroup-1] INFO org.silentpom.CommandHelloWorld - Command start: 12 

From the logs of the application it is clear that the process of launching new commands has stopped after the launch of the 7th command. The result is a very simple code modification that allows you to run requests to external systems with the necessary degree of parallelism, without clogging the entire pool.

Option 2. Nafig reactivity! We have executors


Rare man understand rx. Even if he says the opposite - ask to write the code above in your own words. But after all, java 8 already has stream and future, the histrix seems to be able to work with the future as its own, let's try to start parallel processing with their help. We will create the future like this:

  //        static Future<String> executeCommandDelayed(String str) { LOG.info("Direct Hystrix command created: {}", str); return new CommandHelloWorld(str).queue(); } //     ,    static String executeCommand(String str) { LOG.info("Direct Hystrix command created: {}", str); return new CommandHelloWorld(str).execute(); } 

We try to process a list of 49 items:
 public void testStupid() { IntStream.range(1, 50).boxed().map( value -> executeCommandDelayed(value.toString()) ).collect(Collectors.toList()) .forEach(el -> LOG.info("List element (FUTURE): {}", el.toString())); } 

and again a familiar problem.

 [main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 2 [main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 3 [main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 4 [main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 5 [main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 6 [main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 7 [main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 8 [main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 9 [main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 10 [main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 11 [main] ERROR org.silentpom.stream.ParallelAsyncServiceTest - Ooops com.netflix.hystrix.exception.HystrixRuntimeException: CommandHelloWorld could not be queued for execution and no fallback available. 

In this case, the order of creating commands is controlled by the splitter, that is, it is generally completely opaque and dangerous. We will try to solve the problem in 2 stages, keeping the rather familiar technique of working with the stream:

1) zamapim stream source data to stream futures. And with the control of actually running tasks. To do this, we need an intermediate executor with the degree of parallelism we need.
2) turn stream futures into stream values. keeping in mind that every futures are ultimately executed by a histrix and have guaranteed lead times.

To implement the first step, we will do a separate parallelWarp operator for converting a user-defined function; for the second step, we will have to write a waitStream function that receives and returns streams:

  public void testSmart() { service.waitStream( IntStream.range(1, 50).boxed().map( service.parallelWarp( value -> executeCommand(value.toString()) ) ) ).collect(Collectors.toList()) .forEach(el -> LOG.info("List element: {}", el)); } 

It turned out almost usual for users of the stream record. Let's see what's under the hood, this is the last code snippet for today:
 //   threadSize = 7 public ParallelAsyncService(int threadSize) { ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("parallel-async-thread-%d").build(); //   ,      executorService = Executors.newFixedThreadPool(threadSize, namedThreadFactory); } /** * Maps user function T -> Ret to function T -> Future<Ret>. Adds task to executor service * @param mapper user function * @param <T> user function argument * @param <Ret> user function result * @return function to future */ public <T, Ret> Function<T, Future<Ret>> parallelWarp(Function<T, Ret> mapper) { return (T t) -> { LOG.info("Submitting task to inner executor"); Future<Ret> future = executorService.submit(() -> { LOG.info("Sending task to hystrix"); return mapper.apply(t); }); return future; }; } /** * waits all futures in stream and rethrow exception if occured * @param futureStream stream of futures * @param <T> type * @return stream of results */ public <T> Stream<T> waitStream(Stream<Future<T>> futureStream) { List<Future<T>> futures = futureStream.collect(Collectors.toList()); // wait all futures one by one. for (Future<T> future : futures) { try { future.get(); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceof RuntimeException) { throw (RuntimeException) cause; } throw new RuntimeException(e); } } // all futures have completed, it is safe to call get return futures.stream().map( future -> { try { return future.get(); } catch (Exception e) { e.printStackTrace(); return null; //     } } ); 

The waitStream method is very simple, only error handling has corrupted it. The operator parallelWarp is extremely simple and certainly has a special name for adherents of functional programming. New histrix commands are created only by an internal executor, which has the degree of parallelism we need. Pruflink:

 main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 18 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 19 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 20 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 21 [hystrix-ExampleGroup-4] INFO org.silentpom.CommandHelloWorld - Command start: 4 [hystrix-ExampleGroup-1] INFO org.silentpom.CommandHelloWorld - Command start: 1 [hystrix-ExampleGroup-2] INFO org.silentpom.CommandHelloWorld - Command start: 2 [hystrix-ExampleGroup-3] INFO org.silentpom.CommandHelloWorld - Command start: 3 [hystrix-ExampleGroup-5] INFO org.silentpom.CommandHelloWorld - Command start: 5 [hystrix-ExampleGroup-6] INFO org.silentpom.CommandHelloWorld - Command start: 6 [hystrix-ExampleGroup-7] INFO org.silentpom.CommandHelloWorld - Command start: 7 [hystrix-ExampleGroup-2] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 2 [hystrix-ExampleGroup-5] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 5 [hystrix-ExampleGroup-3] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 3 [hystrix-ExampleGroup-7] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 7 [hystrix-ExampleGroup-6] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 6 [hystrix-ExampleGroup-4] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 4 [hystrix-ExampleGroup-1] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 1 [hystrix-ExampleGroup-4] INFO org.silentpom.CommandHelloWorld - Command start: 11 [hystrix-ExampleGroup-6] INFO org.silentpom.CommandHelloWorld - Command start: 12 [hystrix-ExampleGroup-8] INFO org.silentpom.CommandHelloWorld - Command start: 8 [hystrix-ExampleGroup-7] INFO org.silentpom.CommandHelloWorld - Command start: 13 [hystrix-ExampleGroup-9] INFO org.silentpom.CommandHelloWorld - Command start: 9 

With this approach, we needed an additional pool of flows for each pool of histrix flows, but the output list maintained order. Which approach will win in the application - time will tell.

I repeat that all examples can be viewed in tests on a githaba . I would be glad to famous habrokritike.

Source: https://habr.com/ru/post/337864/


All Articles