📜 ⬆️ ⬇️

Parallel data loading with time constraints

There are situations when it is necessary to obtain data from several remote sources, but so that the waiting time is not too long. For example, when downloading weather or currency rates, we can poll several services and display the results of all respondents in a given period of time.



If during this period of time an insufficient number of services responded, we can give additional time to wait for the download.
')
So, we operate with three basic parameters:


To make the task easier, let's write a class loader. Everything is very simple, first listing, then explanation:
AsyncDataLoader
public sealed class AsyncDataLoader<T> { /// <summary> ///   . /// </summary> public AsyncDataLoader() { EmergencyPeriod = TimeSpan.Zero; MinResultsCount = 0; } /// <summary> ///   . /// </summary> /// <param name="dataLoaders">,  .</param> /// <param name="loadDataPeriod">,       .</param> public AsyncDataLoader(IEnumerable<Func<T>> dataLoaders, TimeSpan loadDataPeriod) : this(dataLoaders, loadDataPeriod, 0, TimeSpan.Zero) { } /// <summary> ///   . /// </summary> /// <param name="dataLoaders">,  .</param> /// <param name="loadDataPeriod">,       .</param> /// <param name="minimalResultsCount">    .</param> /// <param name="emergencyPeriod">,       .</param> public AsyncDataLoader(IEnumerable<Func<T>> dataLoaders, TimeSpan loadDataPeriod, int minimalResultsCount, TimeSpan emergencyPeriod) { DataLoaders = dataLoaders; LoadDataPeriod = loadDataPeriod; EmergencyPeriod = emergencyPeriod; MinResultsCount = minimalResultsCount; } /// <summary> ///    ,        ,     . /// </summary> public TimeSpan EmergencyPeriod { get; set; } /// <summary> ///       . /// </summary> public int MinResultsCount { get; set; } /// <summary> ///    ,  . /// </summary> public IEnumerable<Func<T>> DataLoaders { get; set; } /// <summary> ///    ,       . /// </summary> public TimeSpan LoadDataPeriod { get; set; } /// <summary> ///       . /// </summary> public bool SkipDefaultResults { get; set; } /// <summary> ///      . /// </summary> /// <returns> .</returns> public async Task<T[]> GetResultsAsync() { BlockingCollection<T> results = new BlockingCollection<T>(); List<Task> tasks = new List<Task>(); tasks.AddRange(DataLoaders.Select(handler => Task.Factory.StartNew(() => { T result = handler.Invoke(); results.Add(result); }, CancellationToken.None, TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness, TaskScheduler.Default))); bool isAllCompleted = true; try { CancellationTokenSource source = new CancellationTokenSource(LoadDataPeriod); CancellationToken token = source.Token; #if DEBUG token = CancellationToken.None; //    #endif await Task.Factory.ContinueWhenAll(tasks.ToArray(), (t) => { }, token); } catch (TaskCanceledException ex) //  ?  . { isAllCompleted = false; } if (!isAllCompleted && EmergencyPeriod > TimeSpan.Zero) //     { Func<bool> isReadyHandler = () => results.Count >= MinResultsCount; //,  ,     . await WaitWhenReady(isReadyHandler, EmergencyPeriod); } if (SkipDefaultResults) return results.Where(r => !object.Equals(r, default(T))).ToArray(); return results.ToArray(); } /// <summary> ///    . /// </summary> /// <param name="isReadyValidator">, ,   .</param> /// <param name="totalDelay"> .</param> /// <param name="iterationsCount">   .</param> private async Task WaitWhenReady(Func<bool> isReadyValidator, TimeSpan totalDelay, int iterationsCount = 7) { if (isReadyValidator()) return; double milliseconds = totalDelay.TotalMilliseconds / iterationsCount; TimeSpan delay = TimeSpan.FromMilliseconds(milliseconds); for (int i = 0; i < iterationsCount; i++) { if (isReadyValidator()) return; await Task.Delay(delay); } } } 


In the body of GetResultsAsync:
  1. Create a collection to store the results. The BlockingCollection class is safe when interacting with different threads;
  2. We place each handler in a separate task. All tasks are grouped into a list, we warn the scheduler about a long-term execution (TaskCreationOptions.LongRunning) and ask to add a priority to them (TaskCreationOptions.PreferFairness);
  3. We start all tasks on performance, having put time limit;
  4. If necessary, we give additional time to download data;
  5. Before returning, skip the empty results if the flag SkipDefaultResults == true.

For the debug version, we forcibly disable the time limit in order to be able to walk through the code in the debugged function.
References:

Source: https://habr.com/ru/post/245201/


All Articles