
Processes in Elixir (and of course Erlang ) are identified using a unique process identifier - pid .
We use them to interact with processes. Messages are sent as if to the pid , and the virtual machine itself takes care of delivering these messages to the correct process.
Sometimes, however, excessive confidence in the pid can lead to significant problems.
For example, we can store the pid already dead process, or we can use the Supervisor , which abstracts the creation of processes from us, so we don’t even know what their pid ( per : and Supervisor can restart the fallen process with another pid , and we this will not know in any way).
Let's create a simple application and see: what problems we may face and how we will solve these problems.
For the first example, let's create a simple chat. Let's start by creating a mix project:
$ mix new chat Let's create an absolutely standard GenServer , which we will use throughout all the examples in this article:
# ./lib/chat/server.ex defmodule Chat.Server do use GenServer # API def start_link do GenServer.start_link(__MODULE__, []) end def add_message(pid, message) do GenServer.cast(pid, {:add_message, message}) end def get_messages(pid) do GenServer.call(pid, :get_messages) end # SERVER def init(messages) do {:ok, messages} end def handle_cast({:add_message, new_message}, messages) do {:noreply, [new_message | messages]} end def handle_call(:get_messages, _from, messages) do {:reply, messages, messages} end end If this code seems unfamiliar or incomprehensible to you, read the beginning of working with Elixir , which has excellent OTP paragraphs .iex session with the mix environment and try to work with our server:
$ iex -S mix iex> {:ok, pid} = Chat.Server.start_link {:ok, #PID<0.107.0>} iex> Chat.Server.add_message(pid, "foo") :ok iex> Chat.Server.add_message(pid, "bar") :ok iex> Chat.Server.get_messages(pid) ["bar", "foo"] The code for this stage is in this commit.
At this stage, everything seems to be so good, which is just wonderful. We get the pid process, then for each message we want to send ( add_message/2 and get_messages/1 ) we pass this pid - and everything works so predictably that it’s even boring.
However, the fun begins when we try to add the Supervisor ...
Supervisor !So for some reason our Chat.Server process Chat.Server dying. We are left alone in an empty and cold iex session, and we have no choice but to start a new process, get its pid and write messages to this new pid . So let's create a Supervisor - and we will not have to worry about such trifles!
# ./lib/chat/supervisor.ex defmodule Chat.Supervisor do use Supervisor def start_link do Supervisor.start_link(__MODULE__, []) end def init(_) do children = [ worker(Chat.Server, []) ] supervise(children, strategy: :one_for_one) end end Well, creating a Supervisor is easy. But we now have a problem if the behavior model of our server does not change. After all, we do not start the Chat.Server process ourselves, the Supervisor does it for us. And so we have no access to the pid process!
This is not a bug, but a feature of such an OTP pattern as Supervisor . We cannot access the pid its child processes, because it can unexpectedly (but, of course, only if necessary) restart the process, and in fact kill it and create a new one with a new pid .
To access our Chat.Server process Chat.Server we need to think of a way to point to the process, the other is not a pid . we need such a pointer so that it is saved even when the process is restarted through the Supervisor ( trans : that is, even when the pid changes).
And this pointer is called the !
To begin, change Chat.Server :
# ./lib/chat/server.ex defmodule Chat.Server do use GenServer def start_link do # We now start the GenServer with a `name` option. GenServer.start_link(__MODULE__, [], name: :chat_room) end # And our function doesn't need to receive the pid anymore, # as we can reference the process with its unique name. def add_message(message) do GenServer.cast(:chat_room, {:add_message, message}) end def get_messages do GenServer.call(:chat_room, :get_messages) end # ... end The changes are in this commit.
Now everything should work the same, but only better - after all, we should not transmit this pid everywhere:
$ iex -S mix iex> Chat.Supervisor.start_link {:ok, #PID<0.94.0>} iex> Chat.Server.add_message("foo") :ok iex> Chat.Server.add_message("bar") :ok iex> Chat.Server.get_messages ["bar", "foo"] Even if the process restarts, we will still be able to access it in the same way:
iex> Process.whereis(:chat_room) #PID<0.111.0> iex> Process.whereis(:chat_room) |> Process.exit(:kill) true iex> Process.whereis(:chat_room) #PID<0.114.0> iex> Chat.Server.add_message "foo" :ok iex> Chat.Server.get_messages ["foo"] Well, for our current tasks, the problems seem to be solved, but let's try to do something more complicated (and more approximate to real problems).
Imagine that we need to maintain multiple chat rooms. The client can create a new room with the name, and he expects to be able to send messages to the room he wants. Then the interface should be approximately like this:
iex> Chat.Supervisor.start_room("first room") iex> Chat.Supervisor.start_room("second room") iex> Chat.Server.add_message("first room", "foo") iex> Chat.Server.add_message("second room", "bar") iex> Chat.Server.get_messages("first room") ["foo"] iex> Chat.Server.get_messages("second room") ["bar"] Let's start from above, and change Supervisor to support all this:
# ./lib/chat/supervisor.ex defmodule Chat.Supervisor do use Supervisor def start_link do # We are now registering our supervisor process with a name # so we can reference it in the `start_room/1` function Supervisor.start_link(__MODULE__, [], name: :chat_supervisor) end def start_room(name) do # And we use `start_child/2` to start a new Chat.Server process Supervisor.start_child(:chat_supervisor, [name]) end def init(_) do children = [ worker(Chat.Server, []) ] # We also changed the `strategty` to `simple_one_for_one`. # With this strategy, we define just a "template" for a child, # no process is started during the Supervisor initialization, # just when we call `start_child/2` supervise(children, strategy: :simple_one_for_one) end end And let's make our Chat.Server accept names in the start_link function:
# ./lib/chat/server.ex defmodule Chat.Server do use GenServer # Just accept a `name` parameter here for now def start_link(name) do GenServer.start_link(__MODULE__, [], name: :chat_room) end #... end The changes are in this commit.
And here is the problem! We can have several Chat.Server processes, and they cannot all be named :chat_room . Trouble ...
$ iex -S mix iex> Chat.Supervisor.start_link {:ok, #PID<0.107.0>} iex> Chat.Supervisor.start_room "foo" {:ok, #PID<0.109.0>} iex> Chat.Supervisor.start_room "bar" {:error, {:already_started, #PID<0.109.0>}} To be honest, the VM very eloquent. We are trying to create a second process, but a process with the same name already exists, which is what the environment reminds us of. We need to come up with some other way, but which one? ..
Unfortunately, the type of the name argument is defined quite clearly. We cannot use something like {:chat_room, "room name"} . Let's go to the documentation :
Supported values:atom- in this case,GenServerregistered locally with the given nameatomusingProcess.register/2.{:global, term}- in this case,GenServerregistered globally with the given nametermusing functions in the:globalmodule.{:via, module, term}GenServerthis case, theGenServerregistered using the mechanism defined in themoduleand the name `term.
The supported values ​​are:
anatom-GenServeris registered with the given name usingProcess.register/2.{:global, term}-GenServeris registered globally with the:globalmodule.{:via, module, term}-GenServeris registered with the mechanism and name.
The first option is atom , we have already used it, and we know for sure that in our cunning case it is not suitable.
The second option is used to register the process globally in a cluster of nodes. It uses a local ETS table. In addition, it will require constant synchronization within the nodes in the cluster, and therefore the program will slow down. So use it only when you really need it.
The third, and last, option uses the tuple with :via as a parameter, and this is exactly what we need to solve our problem! This is what the documentation says:
Option:viatakes as its parameter a module that has the following interface:register_name/2,unregister_name/1,whereis_name/1andsend/2.
Exports_name / 2, unregister_name / 1, whereis_name / 1 and send / 2.
Isn't it clear at all? Me too! So let's see this method in action.
:viaSo the tuple :via is a way to tell Elixir that we are going to use a separate module to register our processes. This module should do the following things:
term , using the register_name/2 function;unregister_name/1 function;pid by name, using whereis_name/1 ;send/2 .In order for this to work, the above functions must transmit the response in a certain format defined in the OTP - just as handle_call/3 and handle_cast/2 follow certain rules.
Let's try to identify the module that knows all this:
# ./lib/chat/registry.ex defmodule Chat.Registry do use GenServer # API def start_link do # We register our registry (yeah, I know) with a simple name, # just so we can reference it in the other functions. GenServer.start_link(__MODULE__, nil, name: :registry) end def whereis_name(room_name) do GenServer.call(:registry, {:whereis_name, room_name}) end def register_name(room_name, pid) do GenServer.call(:registry, {:register_name, room_name, pid}) end def unregister_name(room_name) do GenServer.cast(:registry, {:unregister_name, room_name}) end def send(room_name, message) do # If we try to send a message to a process # that is not registered, we return a tuple in the format # {:badarg, {process_name, error_message}}. # Otherwise, we just forward the message to the pid of this # room. case whereis_name(room_name) do :undefined -> {:badarg, {room_name, message}} pid -> Kernel.send(pid, message) pid end end # SERVER def init(_) do # We will use a simple Map to store our processes in # the format %{"room name" => pid} {:ok, Map.new} end def handle_call({:whereis_name, room_name}, _from, state) do {:reply, Map.get(state, room_name, :undefined), state} end def handle_call({:register_name, room_name, pid}, _from, state) do # Registering a name is just a matter of putting it in our Map. # Our response tuple include a `:no` or `:yes` indicating if # the process was included or if it was already present. case Map.get(state, room_name) do nil -> {:reply, :yes, Map.put(state, room_name, pid)} _ -> {:reply, :no, state} end end def handle_cast({:unregister_name, room_name}, state) do # And unregistering is as simple as deleting an entry # from our Map {:noreply, Map.delete(state, room_name)} end end Again: in our hands, choose how our registry will work inside. Here we use a simple Map to associate a name and a pid . This code is absolutely straightforward, especially if you know well how GenServer works. Only values ​​returned by functions can seem unfamiliar.
It's time to try our registry in the iex session:
$ iex -S mix iex> {:ok, pid} = Chat.Server.start_link("room1") {:ok, #PID<0.107.0>} iex> Chat.Registry.start_link {:ok, #PID<0.109.0>} iex> Chat.Registry.whereis_name("room1") :undefined iex> Chat.Registry.register_name("room1", pid) :yes iex> Chat.Registry.register_name("room1", pid) :no iex> Chat.Registry.whereis_name("room1") #PID<0.107.0> iex> Chat.Registry.unregister_name("room1") :ok iex> Chat.Registry.whereis_name("room1") :undefined 5 seconds - great flight! The registry works as it should: it registers and deletes the registration. Let's try to use it in our chat rooms.
Our problem was that we had several Chat.Server servers Chat.Server , initialized through Supervisor . To send a message to a specific room, we would call Chat.Server.add_message(“room1”, “my message”) , so we would have to register server names as {:chat_room, “room1”} and {:chat_room, “room2”} . Here's how this is done through the tuple :via :
# ./lib/chat/server.ex defmodule Chat.Server do use GenServer # API def start_link(name) do # Instead of passing an atom to the `name` option, we send # a tuple. Here we extract this tuple to a private method # called `via_tuple` that can be reused in every function GenServer.start_link(__MODULE__, [], name: via_tuple(name)) end def add_message(room_name, message) do # And the `GenServer` callbacks will accept this tuple the # same way it accepts a pid or an atom. GenServer.cast(via_tuple(room_name), {:add_message, message}) end def get_messages(room_name) do GenServer.call(via_tuple(room_name), :get_messages) end defp via_tuple(room_name) do # And the tuple always follow the same format: # {:via, module_name, term} {:via, Chat.Registry, {:chat_room, room_name}} end # SERVER (no changes required here) # ... end The changes are in this commit.
Here is what happens: every time we send a message to Chat.Server , passing the name of the room, it will find the pid desired process using the module that we passed to it in the tuple :via (in this case, Chat.Registry ) .
This solves our problem: now we can use any number of Chat.Server processes (well, until the fantasy of names ends), and we never need to know their pid . Totally.
However, there is another problem in this solution. Guess?
Exactly! Our registry is not aware of the processes that have fallen, and must be restarted through Supervisor . This means that when this happens, the registry will not allow to re-create a record with the same name, and will store the pid dead process.
In theory, the solution to this problem is not too complicated. We will force our registry to monitor all processes that it stores pid . As soon as such an “observable” process drops, we will simply remove it from our registry.
# in lib/chat/registry.ex defmodule Chat.Registry do # ... def handle_call({:register_name, room_name, pid}, _from, state) do case Map.get(state, room_name) do nil -> # When a new process is registered, we start monitoring it. Process.monitor(pid) {:reply, :yes, Map.put(state, room_name, pid)} _ -> {:reply, :no, state} end end def handle_info({:DOWN, _, :process, pid, _}, state) do # When a monitored process dies, we will receive a # `:DOWN` message that we can use to remove the # dead pid from our registry. {:noreply, remove_pid(state, pid)} end def remove_pid(state, pid_to_remove) do # And here we just filter out the dead pid remove = fn {_key, pid} -> pid != pid_to_remove end Enum.filter(state, remove) |> Enum.into(%{}) end end The changes are in this commit.
Make sure everything works:
$ iex -S mix iex> Chat.Registry.start_link {:ok, #PID<0.107.0>} iex> Chat.Supervisor.start_link {:ok, #PID<0.109.0>} iex> Chat.Supervisor.start_room("room1") {:ok, #PID<0.111.0>} iex> Chat.Server.add_message("room1", "message") :ok iex> Chat.Server.get_messages("room1") ["message"] iex> Chat.Registry.whereis_name({:chat_room, "room1"}) |> Process.exit(:kill) true iex> Chat.Server.add_message("room1", "message") :ok iex> Chat.Server.get_messages("room1") ["message"] Well, now it doesn’t matter how many times Supervisor restarts the Chat.Server process: as soon as we send a message to the room, it will be delivered at the correct pid .
gprocIn principle, with our chat, we will end here, but I would like to tell you about another feature that will simplify our registration with the help of a tuple :via . This is gproc , is the Erlang library.
And we will teach our Chat.Server use gproc instead of our Chat.Registry , and then we will generally get rid of Chat.Registry .
Let's start with dependencies. To do this, add gproc to mix.exs :
# ./mix.exs defmodule Chat.Mixfile do # ... def application do [applications: [:logger, :gproc]] end defp deps do [{:gproc, "0.3.1"}] end end Then we pull up the dependencies with:
$ mix deps.get Now we can change our registration with the help of a tuple :via - let it use gproc , and not Chat.Registry :
# ./lib/chat/server.ex defmodule Chat.Server do # ... # The only thing we need to change is the `via_tuple/1` function, # to make it use `gproc` instead of `Chat.Registry` defp via_tuple(room_name) do {:via, :gproc, {:n, :l, {:chat_room, room_name}}} end # ... end gproc uses gproc keys consisting of three values: {type, scope, key} .
In our case, we use:
:n - this means a , that is, there can not be more than one process registered with such a key;:l - this means local , that is, the process is registered only on our node;{:chat_room, room_name} is the key itself as a tuple.For more information on the possible settings gproc look here .
After such changes, we generally Chat.Registry our Chat.Registry , and check that everything continues to work in the iex session:
$ iex -S mix iex> Chat.Supervisor.start_link {:ok, #PID<0.190.0>} iex> Chat.Supervisor.start_room("room1") {:ok, #PID<0.192.0>} iex> Chat.Supervisor.start_room("room2") {:ok, #PID<0.194.0>} iex> Chat.Server.add_message("room1", "first message") :ok iex> Chat.Server.add_message("room2", "second message") :ok iex> Chat.Server.get_messages("room1") ["first message"] iex> Chat.Server.get_messages("room2") ["second message"] iex> :gproc.where({:n, :l, {:chat_room, "room1"}}) |> Process.exit(:kill) true iex> Chat.Server.add_message("room1", "first message") :ok iex> Chat.Server.get_messages("room1") ["first message"] The changes are in this commit.
We have dealt with a bunch of difficult questions. Main conclusions:
pid directly: they change as soon as the process restarts.:via to provide your own registry;gproc ), and if you use them, you will not have to build your bike;Of course, this is not all. If you need global registration on all nodes in a cluster, other tools may be good too. Erlang has global modules for global registrations, pg2 for process groups, and the same gprc can help you.
If you are interested in this article, read Saša Jurić. Elixir in Action Saša Jurić. Elixir in Action .
And here is a turnip with cheese )
Source: https://habr.com/ru/post/306500/
All Articles