public class CommandHelloWorld extends HystrixCommand<String> { @Override protected String run() throws InterruptedException { // // Thread.sleep(random.nextInt(500)); return "Hello " + name + "!"; } } // String executeCommand(String str) { // , . return new CommandHelloWorld(str).execute(); }
static List<String> toList(Observable<String> observable) { return observable.timeout(1, TimeUnit.SECONDS).toList().toBlocking().single(); }
/** * - */ static Observable<String> executeCommand(String str) { LOG.info("Hot Hystrix command created: {}", str); return new CommandHelloWorld(str).observe(); } /** * - */ static Observable<String> executeCommandDelayed(String str) { LOG.info("Cold Hystrix command created: {}", str); return new CommandHelloWorld(str).toObservable(); }
public void testNaive() { List<Integer> source = IntStream.range(1, 7).boxed().collect(Collectors.toList()); Observable<String> observable = Observable.from(source) .flatMap(elem -> executeCommand(elem.toString())); toList(observable).forEach(el ->LOG.info("List element: {}", el)); }
public void testStupid() { List<Integer> source = IntStream.range(1, 50).boxed().collect(Collectors.toList()); Observable<String> observable = Observable.from(source) .flatMap(elem -> executeCommand(elem.toString())); toList(observable).forEach(el ->LOG.info("List element: {}", el)); }
[main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 1 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 2 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 3 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 4 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 5 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 6 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 7 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 8 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 9 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 10 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 11 [hystrix-ExampleGroup-5] INFO org.silentpom.CommandHelloWorld - Command start: 5 [hystrix-ExampleGroup-2] INFO org.silentpom.CommandHelloWorld - Command start: 2 [hystrix-ExampleGroup-9] INFO org.silentpom.CommandHelloWorld - Command start: 9 [hystrix-ExampleGroup-8] INFO org.silentpom.CommandHelloWorld - Command start: 8 [hystrix-ExampleGroup-4] INFO org.silentpom.CommandHelloWorld - Command start: 4 [hystrix-ExampleGroup-1] INFO org.silentpom.CommandHelloWorld - Command start: 1 [hystrix-ExampleGroup-6] INFO org.silentpom.CommandHelloWorld - Command start: 6 [hystrix-ExampleGroup-10] INFO org.silentpom.CommandHelloWorld - Command start: 10 [hystrix-ExampleGroup-3] INFO org.silentpom.CommandHelloWorld - Command start: 3 [main] ERROR org.silentpom.RxHystrixTest - Ooops com.netflix.hystrix.exception.HystrixRuntimeException: CommandHelloWorld could not be queued for execution and no fallback available.
public void testWindow() { List<Integer> source = IntStream.range(1, 50).boxed().collect(Collectors.toList()); Observable<String> observable = Observable.from(source) .map(elem -> executeCommandDelayed(elem.toString())) .window(7) .concatMap(window -> window.flatMap(x -> x)); toList(observable).forEach(el ->LOG.info("List element: {}", el)); }
[main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 20 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 21 [hystrix-ExampleGroup-3] INFO org.silentpom.CommandHelloWorld - Command start: 3 [hystrix-ExampleGroup-7] INFO org.silentpom.CommandHelloWorld - Command start: 7 [hystrix-ExampleGroup-5] INFO org.silentpom.CommandHelloWorld - Command start: 5 [hystrix-ExampleGroup-4] INFO org.silentpom.CommandHelloWorld - Command start: 4 [hystrix-ExampleGroup-2] INFO org.silentpom.CommandHelloWorld - Command start: 2 [hystrix-ExampleGroup-6] INFO org.silentpom.CommandHelloWorld - Command start: 6 [hystrix-ExampleGroup-1] INFO org.silentpom.CommandHelloWorld - Command start: 1 [hystrix-ExampleGroup-3] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 3 [hystrix-ExampleGroup-6] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 6 [hystrix-ExampleGroup-2] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 2 [hystrix-ExampleGroup-7] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 7 [hystrix-ExampleGroup-1] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 1 [hystrix-ExampleGroup-5] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 5 [hystrix-ExampleGroup-4] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 4 [hystrix-ExampleGroup-8] INFO org.silentpom.CommandHelloWorld - Command start: 8 [hystrix-ExampleGroup-5] INFO org.silentpom.CommandHelloWorld - Command start: 11 [hystrix-ExampleGroup-1] INFO org.silentpom.CommandHelloWorld - Command start: 12
// static Future<String> executeCommandDelayed(String str) { LOG.info("Direct Hystrix command created: {}", str); return new CommandHelloWorld(str).queue(); } // , static String executeCommand(String str) { LOG.info("Direct Hystrix command created: {}", str); return new CommandHelloWorld(str).execute(); }
public void testStupid() { IntStream.range(1, 50).boxed().map( value -> executeCommandDelayed(value.toString()) ).collect(Collectors.toList()) .forEach(el -> LOG.info("List element (FUTURE): {}", el.toString())); }
[main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 2 [main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 3 [main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 4 [main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 5 [main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 6 [main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 7 [main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 8 [main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 9 [main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 10 [main] INFO org.silentpom.stream.ParallelAsyncServiceTest - Direct Hystrix command created: 11 [main] ERROR org.silentpom.stream.ParallelAsyncServiceTest - Ooops com.netflix.hystrix.exception.HystrixRuntimeException: CommandHelloWorld could not be queued for execution and no fallback available.
public void testSmart() { service.waitStream( IntStream.range(1, 50).boxed().map( service.parallelWarp( value -> executeCommand(value.toString()) ) ) ).collect(Collectors.toList()) .forEach(el -> LOG.info("List element: {}", el)); }
// threadSize = 7 public ParallelAsyncService(int threadSize) { ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("parallel-async-thread-%d").build(); // , executorService = Executors.newFixedThreadPool(threadSize, namedThreadFactory); } /** * Maps user function T -> Ret to function T -> Future<Ret>. Adds task to executor service * @param mapper user function * @param <T> user function argument * @param <Ret> user function result * @return function to future */ public <T, Ret> Function<T, Future<Ret>> parallelWarp(Function<T, Ret> mapper) { return (T t) -> { LOG.info("Submitting task to inner executor"); Future<Ret> future = executorService.submit(() -> { LOG.info("Sending task to hystrix"); return mapper.apply(t); }); return future; }; } /** * waits all futures in stream and rethrow exception if occured * @param futureStream stream of futures * @param <T> type * @return stream of results */ public <T> Stream<T> waitStream(Stream<Future<T>> futureStream) { List<Future<T>> futures = futureStream.collect(Collectors.toList()); // wait all futures one by one. for (Future<T> future : futures) { try { future.get(); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceof RuntimeException) { throw (RuntimeException) cause; } throw new RuntimeException(e); } } // all futures have completed, it is safe to call get return futures.stream().map( future -> { try { return future.get(); } catch (Exception e) { e.printStackTrace(); return null; // } } );
main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 18 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 19 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 20 [main] INFO org.silentpom.RxHystrix - Cold Hystrix command created: 21 [hystrix-ExampleGroup-4] INFO org.silentpom.CommandHelloWorld - Command start: 4 [hystrix-ExampleGroup-1] INFO org.silentpom.CommandHelloWorld - Command start: 1 [hystrix-ExampleGroup-2] INFO org.silentpom.CommandHelloWorld - Command start: 2 [hystrix-ExampleGroup-3] INFO org.silentpom.CommandHelloWorld - Command start: 3 [hystrix-ExampleGroup-5] INFO org.silentpom.CommandHelloWorld - Command start: 5 [hystrix-ExampleGroup-6] INFO org.silentpom.CommandHelloWorld - Command start: 6 [hystrix-ExampleGroup-7] INFO org.silentpom.CommandHelloWorld - Command start: 7 [hystrix-ExampleGroup-2] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 2 [hystrix-ExampleGroup-5] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 5 [hystrix-ExampleGroup-3] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 3 [hystrix-ExampleGroup-7] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 7 [hystrix-ExampleGroup-6] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 6 [hystrix-ExampleGroup-4] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 4 [hystrix-ExampleGroup-1] INFO org.silentpom.CommandHelloWorld - Command calculation finished: 1 [hystrix-ExampleGroup-4] INFO org.silentpom.CommandHelloWorld - Command start: 11 [hystrix-ExampleGroup-6] INFO org.silentpom.CommandHelloWorld - Command start: 12 [hystrix-ExampleGroup-8] INFO org.silentpom.CommandHelloWorld - Command start: 8 [hystrix-ExampleGroup-7] INFO org.silentpom.CommandHelloWorld - Command start: 13 [hystrix-ExampleGroup-9] INFO org.silentpom.CommandHelloWorld - Command start: 9
Source: https://habr.com/ru/post/337864/
All Articles