file_io_queue.async([=]{ file_data = get_data_from_file( file_name ); parser_queue.async([=]{ parsed_data = parse( file_data ); main_queue.async([=]{ update_ui_with_new_data( parsed_data ) ; }); }); });
This code is read as an absolutely linear, synchronous code. It describes the logic of how data changes will occur. For me, by and large, no matter in which stream the file will be read, in which - its parsing. The main thing is the sequence of these operations. I can call the previous code 100500 times for 100500 files. namespace dispatch{ typedef std::function<void ()> function; struct queue { typedef long priority; // . const queue::priority queue_priority; static std::shared_ptr<queue> main_queue() ; // virtual void async(function) const; // queue(queue::priority priority) : queue_priority(priority) {}; }; }
void queue::async(dispatch::function task) const { thread_pool::shared_pool()->push_task_with_priority(task, this->queue_priority); };
struct queue_impl{ const queue::priority priority; std::queue<function> tasks; bool is_running; queue_impl(queue::priority priority): priority(priority){}; }; struct thread_pool{ thread_pool(); static std::shared_ptr<thread_pool>& shared_pool(); // thread_pool virtual ~thread_pool(); bool stop; typedef std::shared_ptr<queue_impl> queue_ptr; void push_task_with_priority(const function&, queue::priority);// bool get_free_queue(queue_ptr*) const; // , void start_task_in_queue(const queue_ptr&); // void stop_task_in_queue(const queue_ptr&); // std::mutex mutex; // std::map<queue::priority, queue_ptr> queues; // std::mutex main_thread_mutex; std::queue<dispatch::function> main_queue; std::condition_variable condition; std::vector<std::thread> threads; // , dispatch::function main_loop_need_update; void add_worker(); // };
bool thread_pool::get_free_queue(queue_ptr* out_queue) const { // auto finded = std::find_if(queues.rbegin(), queues.rend(), [](const std::pair<queue::priority, queue_ptr>& iter){ return ! iter.second->is_running; // }); bool is_free_queue_exist = (finded != queues.rend()); if (is_free_queue_exist) *out_queue = finded->second; return is_free_queue_exist; }
void thread_pool::push_task_with_priority(const function& task, queue::priority priority){ { std::unique_lock<std::mutex> lock(mutex); // // . - auto queue = queues[priority]; if (!queue){ queue = std::make_shared<dispatch::queue_impl>(priority); queues[priority] = queue; } queue->tasks.push(task); // , unsigned max_number_of_threads = std::max<unsigned>(std::thread::hardware_concurrency(), 2); unsigned number_of_threads_required = round(log(queues.size()) + 1); while (threads.size() < std::min<unsigned>(max_number_of_threads, number_of_threads_required)){ add_worker(); } } condition.notify_one(); // , }
void thread_pool::stop_task_in_queue(const queue_ptr& queue){ { std::unique_lock<std::mutex> lock(mutex); // . - queue->is_running = false; if ( queue->tasks.size() ==0 ){ queues.erase(queues.find(queue->queue_priority)); } } condition.notify_one(); // , }
void thread_pool::add_worker(){ threads.push_back(std::thread([=]{ dispatch::function task; thread_pool::queue_ptr queue; while(true){ { std::unique_lock<std::mutex> lock(mutex); // while(!stop && !get_free_queue(&queue)) // condition.wait(lock); // if(stop) // , return; task = queue->tasks.front(); // queue->tasks.pop(); start_task_in_queue(queue); // } task(); // stop_task_in_queue(queue); // } })); }
struct main_queue : queue{ virtual void async(dispatch::function task) const override; main_queue(): queue(0) {}; }; std::shared_ptr<queue> queue::main_queue(){ return std::static_pointer_cast<dispatch::queue>(std::make_shared<dispatch::main_queue>()); }
void main_queue::async(dispatch::function task) const { auto pool = thread_pool::shared_pool(); std::unique_lock<std::mutex> lock(pool->main_thread_mutex); pool->main_queue.push(task); if (pool->main_loop_need_update != nullptr) pool->main_loop_need_update(); }
void process_main_loop() { auto pool = thread_pool::shared_pool(); std::unique_lock<std::mutex> lock(pool->main_thread_mutex); while (!pool->main_queue.empty()) { auto task = pool->main_queue.front(); pool->main_queue.pop(); task(); } }
-(void)dispatchMainThread{ dispatch::process_main_loop(); } - (BOOL)application:(UIApplication *)application didFinishLaunchingWithOptions:(NSDictionary *)launchOptions{ dispatch::set_main_loop_process_callback([=]{ [self performSelectorOnMainThread:@selector(dispatchMainThread) withObject:nil waitUntilDone:NO]; }); return YES; }
void main_loop(dispatch::function main_loop_function); void main_loop(dispatch::function main_loop_function){ auto main_queue = queue::main_queue(); while (!thread_pool::shared_pool()->stop) { main_queue->async(main_loop_function); process_main_loop(); } }
auto main_thread_id = std::this_thread::get_id(); for (unsigned task = 0; task < 6; ++task) for (unsigned priority = 0; priority < 6; ++priority){ dispatch::queue(priority).async([=]{ assert(std::this_thread::get_id() != main_thread_id); std::string task_string = std::to_string(task); std::string palceholder(1+priority*5, ' '); dispatch::queue::main_queue()->async([=]{ assert(std::this_thread::get_id() == main_thread_id); std::cout << palceholder << task_string << std::endl; }); }); }
0 1 0 0 2 1 1 3 2 2 4 3 3 5 4 4 0 5 5 1 0 0 2 1 1 3 2 2 4 3 3 5 4 4 5 5
for (int i = 0; i < 20; ++i){ dispatch::queue(dispatch::QUEUE_PRIORITY::DEFAULT).async([=]{ NSAssert(![NSThread isMainThread], nil); std::string first_string = std::to_string(i); dispatch::queue::main_queue()->async([=]{ NSAssert([NSThread isMainThread], nil); std::string second_string = std::to_string(i+1); std::cout << first_string << " -> " << second_string << std::endl; [self.tableView reloadData]; // - UI. , }); }); }
Source: https://habr.com/ru/post/186200/
All Articles