The image shows MapReduce as it is implemented in Qt:
QFuture<T> QtConcurrent::mappedReduced(const Sequence &sequence, MapFunction mapFunction, ReduceFunction reduceFunction /*...*/) T QtConcurrent::blockingMappedReduced(const Sequence &sequence, MapFunction mapFunction, ReduceFunction reduceFunction /*...*/)
Faced with the fact that colleagues at work do not know about MapReduce in Qt Concurrent. As Goethe said: "What we do not understand, we do not own it." Under the cut there will be a little bit about the Map, about the Reduce, about the Fork – join model and an example of solving a simple task with the help of MapReduce.
The problem was taken from the Internet as it is:
Write a console program that searches for the maximum element in the array with 1000000000 elements.
MapReduce consists of the highest order functions map and reduce . A higher order function is a function that takes other functions as arguments.
Map applies a function to each item in the list, returning a list of results. In C ++, this can be described via std :: transform :
std::list<int> list{ 1, 2, 3, 4, 5, 6 }; std::list<int> newList(list.size(), 0); std::transform(list.begin(), list.end(),newList.begin(), [](int v){ return v*2; }); for(auto i: newList){ std::cout<<i<<" "; }
Wikipedia defines: a higher order function that transforms a data structure to a single atomic value using a given function. If simple, reduce accumulates a set of elements (list, vector, etc.).
In C ++, this can be described via std::for_each
and a functional object
struct Max{ Max():value(std::numeric_limits<int>::min()){ } void operator()(int val){ value = std::max(value, val); } int value; }; struct Sum{ Sum(): value(0){ } void operator()(int val){ value += val; } int value; }; //... std::list<int> list{ 1, 2, 3, 4, 5, 6 }; const auto max = std::for_each(list.begin(), list.end(), Max()); const auto sum = std::for_each(list.begin(), list.end(), Sum()); std::cout<<"Max:"<<max.value<<std::endl; std::cout<<"Sum:"<<sum.value<<std::endl;
How to solve the puzzle through MapReduce may be unclear. There should be a look, maybe there is some theory? There is a fork-join parallel computing model. At the heart of it:
Picture showing the model (taken from wikipedia ). Something similar to the image at the beginning. MapReduce in Qt is a fork-join model implementation.
For such a problem, the standard solution is to take a vector, divide it into several non-intersecting segments, find a local maximum in the segments, and at the end combine the result. On std :: thread, it will be something like this:
using DataSet = std::vector<int>; const size_t DATASET_SIZE = 1000000000; struct Task { size_t first; size_t last; DataSet& data; int localMaximum; }; using Tasks = std::vector<Task>; void max(Task& task) { int localMax = task.data[task.first]; for(size_t item = task.first; item < task.last; ++item) { localMax = std::max(localMax, task.data[item]); } task.localMaximum = localMax; } DataSet data(DATASET_SIZE); //... const auto threadCount = std::thread::hardware_concurrency(); const auto taskSize = data.size()/threadCount; Tasks tasks; size_t first = 0; size_t last = taskSize; // for(size_t i = 0; i < threadCount; ++i) { tasks.push_back(Task{first, last, data, 0}); first+=taskSize; last = std::min(last+taskSize, data.size()); } // std::vector<std::thread> threads; for(auto& task: tasks) { threads.push_back(std::thread(max, std::ref(task))); } // for(auto& thread: threads) { thread.join(); } int Max = tasks[0].localMaximum; for(const auto& task: tasks) { Max = std::max(Max, task.localMaximum); }
Having broken the task into subtasks, you can compactly write through QtConcurrent :: blockingMappedReduced
using DataSet = std::vector<int>; const size_t DATASET_SIZE = 1000000000; struct Task { size_t first; size_t last; DataSet& data; }; int mapMax(const Task& task) { int localMax = task.data[task.first]; for(size_t item = task.first; item < task.last; ++item) { localMax = std::max(localMax, task.data[item]); } return localMax; } void reduceMax(int& a, const int& b) { a = std::max(a, b); } using Tasks = std::vector<Task>; //... const auto threadCount = std::thread::hardware_concurrency(); const auto taskSize = data.size()/threadCount; Tasks tasks; size_t first = 0; size_t last = taskSize; for(size_t i = 0; i < threadCount; ++i) { tasks.push_back(Task{first, last, data, 0}); first+=taskSize; last = std::min(last+taskSize, data.size()); } int Max = QtConcurrent::blockingMappedReduced(tasks, mapMax, reduceMax);
What you should pay attention to here:
#include <QtCore/QtDebug> #include <QtCore/QElapsedTimer> #include <QtCore/QCoreApplication> #include <QtConcurrent/QtConcurrent> #include <cstdlib> #include <thread> #include <vector> #include <algorithm> using DataSet = std::vector<int>; const size_t DATASET_SIZE = 1000000000; struct Task { size_t first; size_t last; DataSet& data; }; int mapMax(const Task& task) { int localMax = task.data[task.first]; for(size_t item = task.first; item < task.last; ++item) { localMax = std::max(localMax, task.data[item]); } return localMax; } void reduceMax(int& a, const int& b) { a = std::max(a, b); } using Tasks = std::vector<Task>; int main(int argc, char *argv[]) { std::srand(unsigned(std::time(0))); QCoreApplication a(argc, argv); DataSet data(DATASET_SIZE); for(size_t i = 0; i < data.size(); ++i) { data[i] = std::rand(); } QElapsedTimer timer; timer.start(); const auto threadCount = std::thread::hardware_concurrency(); const auto taskSize = data.size()/threadCount; Tasks tasks; size_t first = 0; size_t last = taskSize; for(size_t i = 0; i < threadCount; ++i) { tasks.push_back(Task{first, last, data}); first+=taskSize; last = std::min(last+taskSize, data.size()); } timer.start(); const auto Max = QtConcurrent::blockingMappedReduced(tasks, mapMax, reduceMax); qDebug() << "Maximum" << Max << "time" <<timer.elapsed() << "milliseconds"; return 0; }
Source: https://habr.com/ru/post/311090/