📜 ⬆️ ⬇️

Erlang cluster on the knee

In order to increase the counter of articles about the “rare” language of Erlang by one more, we will tell you how to compile an Erlang cluster on your knee and run a parallelized calculation on it.


To understand what is happening, the reader may need to know the basics of Erlang (without OTP). Which, by the way, can get on the spot without leaving the cozy Habr, right here:



But, let's not be clever ahead of time and first we will do ...


Lyrical digression


As we now understand, as the IT industry develops, completely different programming languages ​​find their application. The Erlang language, which was originally developed as a language for fault-tolerant systems, but won its niche in our world of distributed multiprocessor systems thanks to:



And while remaining:



We list the companies and projects that use Erlang in production:



We will not hide from the reader that, in our opinion, the language has weaknesses, which are well described by other authors, for example: Erlang in Wargaming


Well, now let's do some programming, and write a function that does ...


Parallel calculation of function values ​​(with a limit on the number of simultaneously running processes and a timeout for calculation)


Map function

The signature of this function is made similar to lists:map :


map(Fn, Items, WorkersN, Timeout) -> {ok, [FnResult]} | {error, timeout}, throws {'EXIT', Reason} map(Fn, Items, WorkersN, Timeout) -> {ok, [FnResult]} | {error, timeout}, throws {'EXIT', Reason} , where:



Function implementation:


 -module(dmap_pmap). ... map(Fn, Items, WorkersN, Timeout) -> Total = length(Items), Context = #{fn => Fn, items => Items, results => #{}, counter => 1, total => Total, workers => 0, workers_max => min(WorkersN, Total), pids => #{}, timeout => Timeout}, Self = self(), spawn(fun() -> Self ! map_loop(Context) end), Result = receive Any -> Any end, case get_error(Result) of undefined -> {ok, Result}; {'EXIT', Reason} -> throw({'EXIT', Reason}); {error, timeout} -> {error, timeout} end. 

Here are the main points in the code:



Map_loop function

The signature of this function: map_loop(Context) -> [FnResult]


Implementation:


 map_loop(#{counter := Counter, total := Total, workers := Workers, workers_max := WorkersMax, fn := Fn, items := Items, pids := PIDs} = Context) when Workers < WorkersMax, Counter =< Total -> Self = self(), Index = Counter, WorkerIndex = Workers + 1, PID = spawn(fun() -> WorkerPID = self(), io:fwrite("{Index, PID, {W, WMax}}: ~p~n", [{Index, WorkerPID, {Workers + 1, WorkersMax}}]), Item = lists:nth(Index, Items), Self ! {Index, WorkerPID, catch Fn(Item, WorkerIndex)} end), Context2 = Context#{counter => Counter + 1, workers => Workers + 1, pids => PIDs#{PID => Index}}, map_loop(Context2); map_loop(#{workers := Workers, timeout := Timeout, pids := _PIDs} = Context) when Workers > 0 -> receive {Index, PID, {'EXIT', _Reason} = Result} when is_integer(Index) ->%% error case io:fwrite("got error: ~p~n", [{Index, PID, Result}]), Context2 = set_worker_result(PID, {Index, Result}, Context), Context3 = kill_workers(Context2, error), create_result(Context3); {Index, PID, Result} when is_integer(Index) -> %% ok case io:fwrite("got result: ~p~n", [{Index, PID, Result}]), Context2 = set_worker_result(PID, {Index, Result}, Context), map_loop(Context2) after Timeout -> %% timeout case io:fwrite("timeout: ~p~n", [#{context => Context}]), Context3 = kill_workers(Context, {error, timeout}), create_result(Context3) end; map_loop(#{workers := Workers, pids := PIDs} = Context) when Workers == 0, PIDs == #{} -> create_result(Context). 

Let's go through the implementation:



Let's run through the private functions that we used here:



Full listing of functions can be viewed on GitHub: here


Testing

Now let's test our function a bit through the Erlang REPL.


1) Run the calculation for 2 workers, so that the result from the second worker comes earlier than from the first one:


>catch dmap_pmap:map(fun(1, _) -> timer:sleep(2000), 1; (2, _) -> timer:sleep(1000), 2 end, [1, 2], 2, 5000).


In the last line - the result of calculations.


 {Index, PID, {W, WMax}}: {1,<0.1010.0>,{1,2}} {Index, PID, {W, WMax}}: {2,<0.1011.0>,{2,2}} got result: {2,<0.1011.0>,2} got result: {1,<0.1010.0>,1} {ok,[1,2]} 

2) Run the calculation for 2 workers, so that the first worker has a crash:


>catch dmap_pmap:map(fun(1, _) -> timer:sleep(100), erlang:exit(terrible_error); (2, _) -> timer:sleep(100), 2 end, [1, 2], 2, 5000).


 {Index, PID, {W, WMax}}: {1,<0.2149.0>,{1,2}} {Index, PID, {W, WMax}}: {2,<0.2150.0>,{2,2}} got error: {1,<0.2149.0>,{'EXIT',terrible_error}} kill: <0.2150.0> {'EXIT',terrible_error} 

3) Run the calculation for 2 workers, so that the function calculation time exceeds the allowed timeout:


> catch dmap_pmap:map(fun(1, _) -> timer:sleep(2000), erlang:exit(terrible_error); (2, _) -> timer:sleep(100), 2 end, [1, 2], 2, 1000).


 {Index, PID, {W, WMax}}: {1,<0.3184.0>,{1,2}} {Index, PID, {W, WMax}}: {2,<0.3185.0>,{2,2}} got result: {2,<0.3185.0>,2} timeout: #{context => #{counter => 3,fn => #Fun<erl_eval.12.99386804>, items => [1,2], pids => #{<0.3184.0> => 1}, results => #{2 => 2}, timeout => 1000,total => 2,workers => 1,workers_max => 2}} kill: <0.3184.0> {error,timeout} 

Well, finally ...


Cluster calculations


The test results look reasonable, but what’s the cluster, the attentive reader may ask.
In fact, it turns out that we already have almost everything we need to run the computations on the cluster, where by the cluster we mean a set of connected Erlang nodes.


In a separate dmap_dmap module, we will dmap_dmap another function with the following signature:


map({M, F}, Items, WorkersNodes, Timeout) -> {ok, [FnResult]} | {error, timeout}, throws {'EXIT', Reason} map({M, F}, Items, WorkersNodes, Timeout) -> {ok, [FnResult]} | {error, timeout}, throws {'EXIT', Reason} , where:



Implementation:


 -module(dmap_dmap). ... map({M, F}, Items, WorkersNodes, Timeout) -> Fn = fun(Item, WorkerIndex) -> Node = lists:nth(WorkerIndex, WorkersNodes), rpc:call(Node, M, F, [Item]) end, dmap_pmap:map(Fn, Items, length(WorkersNodes), Timeout). 

The logic of this function is very simple: we use the dmap_pmap:map function from the previous section, into which we substitute an anonymous function, which, in turn, simply performs the calculation on the desired node.


For the test, in a separate module, we will create a function that returns the name of its node:


 -module(dmap_test). test(X) -> {ok, {node(), X}}. 

Testing


For testing, we need to run in two terminals by a node, for example, like this (from the project's working directory):


make run NODE_NAME=n1@127.0.0.1


make run NODE_NAME=n2@127.0.0.1


Run the calculation on the first node:


(n1@127.0.0.1)1> dmap_dmap:map({dmap_test, test}, [1, 2], ['n1@127.0.0.1', 'n2@127.0.0.1'], 5000).


And we get the result:


 {Index, PID, {W, WMax}}: {1,<0.1400.0>,{1,2}} {Index, PID, {W, WMax}}: {2,<0.1401.0>,{2,2}} got result: {1,<0.1400.0>,{ok,{'n1@127.0.0.1',1}}} got result: {2,<0.1401.0>,{ok,{'n2@127.0.0.1',2}}} {ok,[{ok,{'n1@127.0.0.1',1}},{ok,{'n2@127.0.0.1',2}}]} 

As it is possible to replace, the results flew in from two nodes, as we ordered.


Instead of conclusion


Our simple example shows that, in its field of application, Erlang makes it relatively easy to solve useful tasks (which are not so easy to solve using other programming languages).


Due to the short format of the article, there may be questions about the code and the library assembly that were left behind the frame.


Some details can be found in GitHub: here .


We promise to cover the rest of the details in the following articles.


')

Source: https://habr.com/ru/post/354628/


All Articles