📜 ⬆️ ⬇️

Comet – application for Mochiweb with a load of 1 000 000 users. Part 3/3

Part 1 and Part 2 in this series of articles showed how to create an application using mochiweb, and how to send messages to connected users. We reduced the memory consumption to 8 KB for each connection. We repeated the c10k test. We made graphics. It was fun, but now it's time to repeat everything for 1 million connections.

This article covers the following:
• Using the Mnesia database;
• Generate a believable “friends” data set for a million users;
• Configure Mnesia and enter our data;
• Discovery of a million connections from one machine;
• Comparative test with 1 million users;
• Libevent + C for handling connections;
• Final conclusions.

One of the parts of this test is the ability to open 1,000,000 connections from a single test machine. Writing a server that is able to accept 1,000,000 connections is easier than actually creating 1,000,000 connections. Thus, a fair amount of this article is about the methods used to open 1,000,000 connections from a single machine.

Launch our Pubsub.


In Part 2, we used a router to send messages to specific users. This is fine for the chat / IM system, but there are more useful things we could do instead. Before we start a large-scale test, let's add another module - the subscriber database. We will create a repository with data about your friends, so it can provide you with all the events generated by people from your friends list.
')
My intention is to use it for Last.fm. That way, I can get a real-time channel of the songs that my friends are currently listening to. This can equally apply to other events generated on social networks. Photos uploaded to Flickr, Facebook news, Twitter, etc. FriendFeed even has a real-time API in beta, so this is definitely relevant.

Subscription Manager Implementation


We are implementing a simple subscription manager, but we will be subscribing people to all their friends automatically.

API:
• add_subscriptions ([{Subscriber, Subscribee}, ...])
• remove_subscriptions ([{Subscriber, Subscribee}, ...])
• get_subscribers (User)

-module(subsmanager). -behaviour(gen_server). -include("/usr/local/lib/erlang/lib/stdlib-1.15.4/include/qlc.hrl"). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export([add_subscriptions/1, remove_subscriptions/1, get_subscribers/1, first_run/0, stop/0, start_link/0]). -record(subscription, {subscriber, subscribee}). -record(state, {}). % state is all in mnesia -define(SERVER, global:whereis_name(?MODULE)). start_link() -> gen_server:start_link({global, ?MODULE}, ?MODULE, [], []). stop() -> gen_server:call(?SERVER, {stop}). add_subscriptions(SubsList) -> gen_server:call(?SERVER, {add_subscriptions, SubsList}, infinity). remove_subscriptions(SubsList) -> gen_server:call(?SERVER, {remove_subscriptions, SubsList}, infinity). get_subscribers(User) -> gen_server:call(?SERVER, {get_subscribers, User}). %% init([]) -> ok = mnesia:start(), io:format("Waiting on mnesia tables..\n",[]), mnesia:wait_for_tables([subscription], 30000), Info = mnesia:table_info(subscription, all), io:format("OK. Subscription table info: \n~w\n\n",[Info]), {ok, #state{}}. handle_call({stop}, _From, State) -> {stop, stop, State}; handle_call({add_subscriptions, SubsList}, _From, State) -> % Transactionally is slower: % F = fun() -> % [ ok = mnesia:write(S) || S <- SubsList ] % end, % mnesia:transaction(F), [ mnesia:dirty_write(S) || S <- SubsList ], {reply, ok, State}; handle_call({remove_subscriptions, SubsList}, _From, State) -> F = fun() -> [ ok = mnesia:delete_object(S) || S <- SubsList ] end, mnesia:transaction(F), {reply, ok, State}; handle_call({get_subscribers, User}, From, State) -> F = fun() -> Subs = mnesia:dirty_match_object(#subscription{subscriber='_', subscribee=User}), Users = [Dude || #subscription{subscriber=Dude, subscribee=_} <- Subs], gen_server:reply(From, Users) end, spawn(F), {noreply, State}. handle_cast(_Msg, State) -> {noreply, State}. handle_info(_Msg, State) -> {noreply, State}. terminate(_Reason, _State) -> mnesia:stop(), ok. code_change(_OldVersion, State, _Extra) -> io:format("Reloading code for ?MODULE\n",[]), {ok, State}. %% first_run() -> mnesia:create_schema([node()]), ok = mnesia:start(), Ret = mnesia:create_table(subscription, [ {disc_copies, [node()]}, {attributes, record_info(fields, subscription)}, {index, [subscribee]}, %index subscribee too {type, bag} ]), Ret. 


Remarkable:
• I included qlc.hrl, necessary for Mnesia, using the absolute path. This is not good, but it did not work out differently.
• get_subscribers generates another process and delegates the creation of a response to this same process using gen_server: reply. This means that the gen_server loop will not block on this call if we often call a lookup.
• rr ("subsmanager.erl"). The example below allows you to use record definitions in the erl shell. Put your definitions in the records.hrl file and include it in your modules is the best style. I did it for short.

Now check. first_run () creates the Mnesia scheme, so it is important to call it first. Another potential bug with mnesia is that (by default) only the node that created it can access the database, so give the erl shell a name.
 $ mkdir /var/mnesia $ erl -boot start_sasl -mnesia dir '"/var/mnesia_data"' -sname subsman (subsman@localhost)1> c(subsmanager). {ok,subsmanager} (subsman@localhost)2> subsmanager:first_run(). ... {atomic,ok} (subsman@localhost)3> subsmanager:start_link(). Waiting on mnesia tables.. OK. Subscription table info: [{access_mode,read_write},{active_replicas,[subsman@localhost]},{arity,3},{attributes,[subscriber,subscribee]},{checkpoints,[]},{commit_work,[{index,bag,[{3,{ram,57378}}]}]},{cookie,{{1224,800064,900003},subsman@localhost}},{cstruct,{cstruct,subscription,bag,[],[subsman@localhost],[],0,read_write,[3],[],false,subscription,[subscriber,subscribee],[],[],{{1224,863164,904753},subsman@localhost},{{2,0},[]}}},{disc_copies,[subsman@localhost]},{disc_only_copies,[]},{frag_properties,[]},{index,[3]},{load_by_force,false},{load_node,subsman@localhost},{load_order,0},{load_reason,{dumper,create_table}},{local_content,false},{master_nodes,[]},{memory,288},{ram_copies,[]},{record_name,subscription},{record_validation,{subscription,3,bag}},{type,bag},{size,0},{snmp,[]},{storage_type,disc_copies},{subscribers,[]},{user_properties,[]},{version,{{2,0},[]}},{where_to_commit,[{subsman@localhost,disc_copies}]},{where_to_read,subsman@localhost},{where_to_write,[subsman@localhost]},{wild_pattern,{subscription,'_','_'}},{{index,3},57378}] {ok,<0.105.0>} (subsman@localhost)4> rr("subsmanager.erl"). [state,subscription] (subsman@localhost)5> subsmanager:add_subscriptions([ #subscription{subscriber=alice, subscribee=rj} ]). ok (subsman@localhost)6> subsmanager:add_subscriptions([ #subscription{subscriber=bob, subscribee=rj} ]). ok (subsman@localhost)7> subsmanager:get_subscribers(rj). [bob,alice] (subsman@localhost)8> subsmanager:remove_subscriptions([ #subscription{subscriber=bob, subscribee=rj} ]). ok (subsman@localhost)8> subsmanager:get_subscribers(rj). [alice] (subsman@localhost)10> subsmanager:get_subscribers(charlie). [] 

We will use integer Id to distinguish between users - but for this test I used atoms (rj, alice, bob) and assumed that alice and bob are friends of rj. It is great that mnesia (and ets / dets) do not care which types you used - any Erlang term is allowed. This means that updating to support various types will not be difficult.

Change router


Instead of addressing messages to specific users, that is, router: send (123, “Hello user 123”), we “mark” the messages — the person who generated the message — and there is a router that sends the message to each subscribed user. In other words, the API will work like this: router: send (123, “Hello everyone subscribed to user 123”).
 -module(router). -behaviour(gen_server). -export([start_link/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export([send/2, login/2, logout/1]). -define(SERVER, global:whereis_name(?MODULE)). % will hold bidirectional mapping between id <–> pid -record(state, {pid2id, id2pid}). start_link() -> gen_server:start_link({global, ?MODULE}, ?MODULE, [], []). % sends Msg to anyone subscribed to Id send(Id, Msg) -> gen_server:call(?SERVER, {send, Id, Msg}). login(Id, Pid) when is_pid(Pid) -> gen_server:call(?SERVER, {login, Id, Pid}). logout(Pid) when is_pid(Pid) -> gen_server:call(?SERVER, {logout, Pid}). %% init([]) -> % set this so we can catch death of logged in pids: process_flag(trap_exit, true), % use ets for routing tables {ok, #state{ pid2id = ets:new(?MODULE, [bag]), id2pid = ets:new(?MODULE, [bag]) } }. handle_call({login, Id, Pid}, _From, State) when is_pid(Pid) -> ets:insert(State#state.pid2id, {Pid, Id}), ets:insert(State#state.id2pid, {Id, Pid}), link(Pid), % tell us if they exit, so we can log them out %io:format("~w logged in as ~w\n",[Pid, Id]), {reply, ok, State}; handle_call({logout, Pid}, _From, State) when is_pid(Pid) -> unlink(Pid), PidRows = ets:lookup(State#state.pid2id, Pid), case PidRows of [] -> ok; _ -> IdRows = [ {I,P} || {P,I} <- PidRows ], % invert tuples ets:delete(State#state.pid2id, Pid), % delete all pid->id entries [ ets:delete_object(State#state.id2pid, Obj) || Obj <- IdRows ] % and all id->pid end, %io:format("pid ~w logged out\n",[Pid]), {reply, ok, State}; handle_call({send, Id, Msg}, From, State) -> F = fun() -> % get users who are subscribed to Id: Users = subsmanager:get_subscribers(Id), io:format("Subscribers of ~w = ~w\n",[Id, Users]), % get pids of anyone logged in from Users list: Pids0 = lists:map( fun(U)-> [ P || { _I, P } <- ets:lookup(State#state.id2pid, U) ] end, [ Id | Users ] % we are always subscribed to ourselves ), Pids = lists:flatten(Pids0), io:format("Pids: ~w\n", [Pids]), % send Msg to them all M = {router_msg, Msg}, [ Pid ! M || Pid <- Pids ], % respond with how many users saw the message gen_server:reply(From, {ok, length(Pids)}) end, spawn(F), {noreply, State}. % handle death and cleanup of logged in processes handle_info(Info, State) -> case Info of {'EXIT', Pid, _Why} -> handle_call({logout, Pid}, blah, State); Wtf -> io:format("Caught unhandled message: ~w\n", [Wtf]) end, {noreply, State}. handle_cast(_Msg, State) -> {noreply, State}. terminate(_Reason, _State) -> ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. 


A small test - I used atoms instead of id.
 (subsman@localhost)1> c(subsmanager), c(router), rr("subsmanager.erl"). (subsman@localhost)2> subsmanager:start_link(). (subsman@localhost)3> router:start_link(). (subsman@localhost)4> Subs = [#subscription{subscriber=alice, subscribee=rj}, #subscription{subscriber=bob, subscribee=rj}]. [#subscription{subscriber = alice,subscribee = rj}, #subscription{subscriber = bob,subscribee = rj}] (subsman@localhost)5> subsmanager:add_subscriptions(Subs). ok (subsman@localhost)6> router:send(rj, "RJ did something"). Subscribers of rj = [bob,alice] Pids: [] {ok,0} (subsman@localhost)7> router:login(alice, self()). ok (subsman@localhost)8> router:send(rj, "RJ did something"). Subscribers of rj = [bob,alice] Pids: [<0.46.0>] {ok,1} (subsman@localhost)9> receive {router_msg, M} -> io:format("~s\n",[M]) end. RJ did something ok 

It can be seen that alice can receive messages when someone to whom it is subscribed sends a message, even though the message was not sent directly to alice. The output shows that the router identified possible targets as [alice, bob], but sent a message to one person, alice, because bob was not authorized.

Generating a simple dataset


We can generate a lot of relationships at random, but this is not particularly realistic. Social networks usually have a few super popular users (some Twitter users have over 100,000 followers), and many people with just a handful of friends.
To generate the data set, I used the Python module from the excellent igraph library:
 import igraph g = igraph.Graph.Barabasi(1000000, 15, directed=False) print "Edges: " + str(g.ecount()) + " Verticies: " + str(g.vcount()) g.write_edgelist("fakefriends.txt") 


Data loading in Mnesia


This little module reads the fakefriends.txt file and creates a list of subscriptions.
 -module(readfriends). -export([load/1]). -record(subscription, {subscriber, subscribee}). load(Filename) -> for_each_line_in_file(Filename, fun(Line, Acc) -> [As, Bs] = string:tokens(string:strip(Line, right, $\n), " "), {A, _} = string:to_integer(As), {B, _} = string:to_integer(Bs), [ #subscription{subscriber=A, subscribee=B} | Acc ] end, [read], []). % via: http://www.trapexit.org/Reading_Lines_from_a_File for_each_line_in_file(Name, Proc, Mode, Accum0) -> {ok, Device} = file:open(Name, Mode), for_each_line(Device, Proc, Accum0). for_each_line(Device, Proc, Accum) -> case io:get_line(Device, "") of eof -> file:close(Device), Accum; Line -> NewAccum = Proc(Line, Accum), for_each_line(Device, Proc, NewAccum) end. 

Now in the subsmanager shell you can read from a text file and add subscriptions:
 $ erl -name router@minifeeds4.gs2 +K true +A 128 -setcookie secretcookie -mnesia dump_log_write_threshold 50000 -mnesia dc_dump_limit 40 erl> c(readfriends), c(subsmanager). erl> subsmanager:first_run(). erl> subsmanager:start_link(). erl> subsmanager:add_subscriptions( readfriends:load("fakefriends.txt") ). 

Tick ​​the additional parameters - they will help to avoid the messages "** WARNING ** Mnesia is overloaded". Mnesia documentation contains many other settings worth seeing.

1,000,000


Creating a million tcp connections from one node is nontrivial. I have a feeling that people who do it regularly, on selected small clusters to simulate a large number of connections, probably use a real tool like Tsung. Even with the configuration from Part 1, we still run into the hard ports limit. When creating a tcp connection, the client port is allocated from the range in / proc / sys / net / ipv4 / ip_local_port_range. It does not matter if you specify it manually, or use the automatic port. In Part 1, we set the range to “1024,65535”, i.e. we have 65535-1024 = 64511 unprivileged available ports. Some of them will be used by other processes, but we will never pass for 64511 clients, because we will exhaust ports.
The local port range is IP related, so if we make outgoing connections from different local IP addresses, we will be able to open more than 64511 outgoing connections.

So let's create 17 new IP addresses to make 62,000 connections from each — this will provide us with a total of 1,054,000 connections:
 $ for i in `seq 1 17`; do echo sudo ifconfig eth0:$i 10.0.0.$i up ; done 

If you check ifconfig now, you should see your virtual interfaces: eth0: 1, eth0: 2 ... eth0: 17, each with a different IP address.

All that remains now is to change the floodtest from Part 1 to select a local IP. Unfortunately, the erlang http client does not allow you to determine the source IP.

At this point, I considered another possibility: use 17 pairs of IP — one on the server and one on the client — each pair on their own isolated / 30 subnet. I think that if I then forced the client to connect to any server IP, it would force the local address to be the second of the pair, because only one of the local IPs would actually be able to reach the server's IP on a given subnet. In theory, this would mean declaring that a local source IP on the client machine would not be necessary (although the range of server IP addresses would have to be determined). I don't know if it would work or not — it sounded likely at the time. In the end, I decided that it would be too perverted.

gen_tcp allows you to specify the source address, so I ended up using a raw client:
 -module(floodtest2). -compile(export_all). -define(SERVERADDR, "10.1.2.3"). % where mochiweb is running -define(SERVERPORT, 8000). % Generate the config in bash like so (chose some available address space): % EACH=62000; for i in `seq 1 17`; do echo "{{10,0,0,$i}, $((($i-1)*$EACH+1)), $(($i*$EACH))}, "; done run(Interval) -> Config = [ {{10,0,0,1}, 1, 62000}, {{10,0,0,2}, 62001, 124000}, {{10,0,0,3}, 124001, 186000}, {{10,0,0,4}, 186001, 248000}, {{10,0,0,5}, 248001, 310000}, {{10,0,0,6}, 310001, 372000}, {{10,0,0,7}, 372001, 434000}, {{10,0,0,8}, 434001, 496000}, {{10,0,0,9}, 496001, 558000}, {{10,0,0,10}, 558001, 620000}, {{10,0,0,11}, 620001, 682000}, {{10,0,0,12}, 682001, 744000}, {{10,0,0,13}, 744001, 806000}, {{10,0,0,14}, 806001, 868000}, {{10,0,0,15}, 868001, 930000}, {{10,0,0,16}, 930001, 992000}, {{10,0,0,17}, 992001, 1054000}], start(Config, Interval). start(Config, Interval) -> Monitor = monitor(), AdjustedInterval = Interval / length(Config), [ spawn(fun start/5, [Lower, Upper, Ip, AdjustedInterval, Monitor]) || {Ip, Lower, Upper} <- Config ], ok. start(LowerID, UpperID, _, _, _) when LowerID == UpperID -> done; start(LowerID, UpperID, LocalIP, Interval, Monitor) -> spawn(fun connect/5, [?SERVERADDR, ?SERVERPORT, LocalIP, "/test/"++LowerID, Monitor]), receive after Interval -> start(LowerID + 1, UpperID, LocalIP, Interval, Monitor) end. connect(ServerAddr, ServerPort, ClientIP, Path, Monitor) -> Opts = [binary, {packet, 0}, {ip, ClientIP}, {reuseaddr, true}, {active, false}], {ok, Sock} = gen_tcp:connect(ServerAddr, ServerPort, Opts), Monitor ! open, ReqL = io_lib:format("GET ~s\r\nHost: ~s\r\n\r\n", [Path, ServerAddr]), Req = list_to_binary(ReqL), ok = gen_tcp:send(Sock, [Req]), do_recv(Sock, Monitor), (catch gen_tcp:close(Sock)), ok. do_recv(Sock, Monitor)-> case gen_tcp:recv(Sock, 0) of {ok, B} -> Monitor ! {bytes, size(B)}, io:format("Recvd ~s\n", [ binary_to_list(B)]), io:format("Recvd ~w bytes\n", [size(B)]), do_recv(Sock, Monitor); {error, closed} -> Monitor ! closed, closed; Other -> Monitor ! closed, io:format("Other:~w\n",[Other]) end. % Monitor process receives stats and reports how much data we received etc: monitor() -> Pid = spawn(?MODULE, monitor0, [{0,0,0,0}]), timer:send_interval(10000, Pid, report), Pid. monitor0({Open, Closed, Chunks, Bytes}=S) -> receive report -> io:format("{Open, Closed, Chunks, Bytes} = ~w\n",[S]); open -> monitor0({Open + 1, Closed, Chunks, Bytes}); closed -> monitor0({Open, Closed + 1, Chunks, Bytes}); chunk -> monitor0({Open, Closed, Chunks + 1, Bytes}); {bytes, B} -> monitor0({Open, Closed, Chunks, Bytes + B}) end. 

First I connected to the mochiweb application from Part 1 - it just sends one message to each client every 10 seconds.
 erl> c(floodtest2), floodtest2:run(20). 


It quickly ate all my memory


It turns out that opening a large number of connections using gen_tcp kills a lot of memory. I guess it would take ~ 36GB to make it work. I was not interested in trying to optimize my erlang http client, and the only machine with more than 32GB of memory I could get was one of our databases, and I could not find a good excuse to turn down Last.fm while I dabble :)

At this moment I decided to recall the tested libevent, which, has an HTTP API. Newer versions also have the evhttp_connection_set_local_address function in the http API.

Here is an http C client using libevent:
 #include <sys/types.h> #include <sys/time.h> #include <sys/queue.h> #include <stdlib.h> #include <err.h> #include <event.h> #include <evhttp.h> #include <unistd.h> #include <stdio.h> #include <sys/socket.h> #include <netinet/in.h> #include <time.h> #include <pthread.h> #define BUFSIZE 4096 #define NUMCONNS 62000 #define SERVERADDR "10.103.1.43" #define SERVERPORT 8000 #define SLEEP_MS 10 char buf[BUFSIZE]; int bytes_recvd = 0; int chunks_recvd = 0; int closed = 0; int connected = 0; // called per chunk received void chunkcb(struct evhttp_request * req, void * arg) { int s = evbuffer_remove( req->input_buffer, &buf, BUFSIZE ); //printf("Read %d bytes: %s\n", s, &buf); bytes_recvd += s; chunks_recvd++; if(connected >= NUMCONNS && chunks_recvd%10000==0) printf(">Chunks: %d\tBytes: %d\tClosed: %d\n", chunks_recvd, bytes_recvd, closed); } // gets called when request completes void reqcb(struct evhttp_request * req, void * arg) { closed++; } int main(int argc, char **argv) { event_init(); struct evhttp *evhttp_connection; struct evhttp_request *evhttp_request; char addr[16]; char path[32]; // eg: "/test/123" int i,octet; for(octet=1; octet<=17; octet++){ sprintf(&addr, "10.224.0.%d", octet); for(i=1;i<=NUMCONNS;i++) { evhttp_connection = evhttp_connection_new(SERVERADDR, SERVERPORT); evhttp_connection_set_local_address(evhttp_connection, &addr); evhttp_set_timeout(evhttp_connection, 864000); // 10 day timeout evhttp_request = evhttp_request_new(reqcb, NULL); evhttp_request->chunk_cb = chunkcb; sprintf(&path, "/test/%d", ++connected); if(i%100==0) printf("Req: %s\t->\t%s\n", addr, &path); evhttp_make_request( evhttp_connection, evhttp_request, EVHTTP_REQ_GET, path ); evhttp_connection_set_timeout(evhttp_request->evcon, 864000); event_loop( EVLOOP_NONBLOCK ); if( connected % 200 == 0 ) printf("\nChunks: %d\tBytes: %d\tClosed: %d\n", chunks_recvd, bytes_recvd, closed); usleep(SLEEP_MS*1000); } } event_dispatch(); return 0; } 

Most parameters are hardcoded as #define, so you can edit and recompile it:
 $ gcc -o httpclient httpclient.c -levent $ ./httpclient 


It is still unable to open more than 64,500 ports.


To open more than 64,500 connections, you must determine the local address and local port yourself, and manage them accordingly.

Unfortunately, the libevent HTTP API does not have an option to define a local port. I fixed libevent to add such a function:
 void evhttp_connection_set_local_port(struct evhttp_connection *evcon, u_short port);. 

It was an amazingly pleasant experience: libevent seems to be properly written, and the documentation is pretty decent.

With this modified libevent, I was able to add the following in the above code:
 evhttp_connection_set_local_port(evhttp_connection, 1024+i); 

Now multiple connections from different addresses could use the same local port number, specific to the local address. I recompiled the client and let it work for a while to make sure that it passes the barrier.

Netstat confirms this:
 # netstat -n | awk '/^tcp/ {t[$NF]++}END{for(state in t){print state, t[state]}}' TIME_WAIT 8 ESTABLISHED 118222 

This shows how many ports are open in different states. We finally managed to open more than 2 ^ 16 connections.

Now we have a tool capable of opening a million http connections from a single computer. It looks like it uses about 2 KB for each connection, plus what the kernel occupies. It's time to test our mochiweb server.

C1024K


For this test I used 4 different servers. The main difference between this test and the previous ones is a modified client written in C.
Server 1 - Quad-core 2GHz CPU, 16GB of RAM
• Starting subsmanager
• Data Download
• Starting the router
Server 2 - Dual Quad-core 2.8GHz CPU, 32GB of RAM
• Launch Mochiweb application
Server 3 - Quad-core 2GHz CPU, 16GB of RAM
• Creation of 17 IP addresses
• Install libevent
• Client Start: ./httpclient (100 connections per second)
Server 4 - Dual-core 2GHz, 2GB RAM
• Run msggen to send heaps of messages

Memory usage during connection opening and for some time:

HttpClient has a built-in 10ms delay between connections, so it took almost 3 hours to open a million connections. I demanded a total of 25GB of memory. This is what my server looks like through the eyes of Ganglia:

You can see that it takes about 38GB, and further paging begins. I suspect that the difference is mainly in the consumption of the core.

Messages were generated using 1000 processes, with an average time between messages of about 60 ms per process, giving about 16666 messages per second:
 erl> [ spawn( fun()->msggen:start(1000000, 10+random:uniform(100), 1000000) end) || I <- lists:seq(1,1000) ]. 

Server 4 in Ganglia:

About 10 MB per second - 16,666 messages.

When I started sending messages, the load on the first server remained low. CPU consumption on the second server increased:

Naturally, because processes exit hibernate () for processing messages, memory usage increases slightly. Having all open connections without any messages is optimal in memory usage. Not surprisingly, some actions require more memory.

So where is the memory flowing? Mochiweb requires 40 GB of RAM to keep 1,000,000 active connections open. Under load, up to 30GB of memory will be used by mochiweb, and the remaining 10 GB by the kernel. In other words, you need about 40Kb per connection.

During the various tests with a lot of connections, I ended up making some additional changes to sysctl.conf. I came to this through trial and error, and I do not know which values ​​to change. My policy was waiting for an error to check /var/log/kern.log and see what the mysterious error tells me. Here are the settings:
 net.core.rmem_max = 33554432 net.core.wmem_max = 33554432 net.ipv4.tcp_rmem = 4096 16384 33554432 net.ipv4.tcp_wmem = 4096 16384 33554432 net.ipv4.tcp_mem = 786432 1048576 26777216 net.ipv4.tcp_max_tw_buckets = 360000 net.core.netdev_max_backlog = 2500 vm.min_free_kbytes = 65536 vm.swappiness = 0 net.ipv4.ip_local_port_range = 1024 65535 

I would like to learn more about TCP Linux settings in order to make more substantiated explanations. Almost certainly, these settings are not optimal, but at least that was enough to reach 1,000,000 connections.

Erlang node on Libevent


After intervening in the HTTP API for libevent, it seemed perfectly reasonable to carry out the above test with libevent HTTPd written in C.

I would like to leave as much code as possible on erlang, so let's do a minimum of C - just handle HTTP connections.

Libevent has an asynchronous HTTP API, which makes the implementation of an HTTP server trivial. I also looked for a reason to try the C interface for Erlang. This is an HTTP server using libevent, which identifies users using a whole Id (like our mochiweb app), and also acts as an Erlang C node.

It connects to the designated Erlang node, listens for messages like {123, << “Hello user 123” >>}, sends “Hello user 123 ″ for user 123, if it is connected. Messages for users who are not connected are discarded, as before.
 #include <sys/types.h> #include <sys/time.h> #include <sys/queue.h> #include <stdlib.h> #include <err.h> #include <event.h> #include <evhttp.h> #include <stdio.h> #include <sys/socket.h> #include <netinet/in.h> #include "erl_interface.h" #include "ei.h" #include <pthread.h> #define BUFSIZE 1024 #define MAXUSERS (17*65536) // C1024K // List of current http requests by uid: struct evhttp_request * clients[MAXUSERS+1]; // Memory to store uids passed to the cleanup callback: int slots[MAXUSERS+1]; // called when user disconnects void cleanup(struct evhttp_connection *evcon, void *arg) { int *uidp = (int *) arg; fprintf(stderr, "disconnected uid %d\n", *uidp); clients[*uidp] = NULL; } // handles http connections, sets them up for chunked transfer, // extracts the user id and registers in the global connection table, // also sends a welcome chunk. void request_handler(struct evhttp_request *req, void *arg) { struct evbuffer *buf; buf = evbuffer_new(); if (buf == NULL){ err(1, "failed to create response buffer"); } evhttp_add_header(req->output_headers, "Content-Type", "text/html; charset=utf-8"); int uid = -1; if(strncmp(evhttp_request_uri(req), "/test/", 6) == 0){ uid = atoi( 6+evhttp_request_uri(req) ); } if(uid <= 0){ evbuffer_add_printf(buf, "User id not found, try /test/123 instead"); evhttp_send_reply(req, HTTP_NOTFOUND, "Not Found", buf); evbuffer_free(buf); return; } if(uid > MAXUSERS){ evbuffer_add_printf(buf, "Max uid allowed is %d", MAXUSERS); evhttp_send_reply(req, HTTP_SERVUNAVAIL, "We ran out of numbers", buf); evbuffer_free(buf); return; } evhttp_send_reply_start(req, HTTP_OK, "OK"); // Send welcome chunk: evbuffer_add_printf(buf, "Welcome, Url: '%s' Id: %d\n", evhttp_request_uri(req), uid); evhttp_send_reply_chunk(req, buf); evbuffer_free(buf); // put reference into global uid->connection table: clients[uid] = req; // set close callback evhttp_connection_set_closecb( req->evcon, cleanup, &slots[uid] ); } // runs in a thread – the erlang c-node stuff // expects msgs like {uid, msg} and sends aa 'msg' chunk to uid if connected void cnode_run() { int fd; /* fd to Erlang node */ int got; /* Result of receive */ unsigned char buf[BUFSIZE]; /* Buffer for incoming message */ ErlMessage emsg; /* Incoming message */ ETERM *uid, *msg; erl_init(NULL, 0); if (erl_connect_init(1, "secretcookie", 0) == -1) erl_err_quit("erl_connect_init"); if ((fd = erl_connect("httpdmaster@localhost")) < 0) erl_err_quit("erl_connect"); fprintf(stderr, "Connected to httpdmaster@localhost\n\r"); struct evbuffer *evbuf; while (1) { got = erl_receive_msg(fd, buf, BUFSIZE, &emsg); if (got == ERL_TICK) { continue; } else if (got == ERL_ERROR) { fprintf(stderr, "ERL_ERROR from erl_receive_msg.\n"); break; } else { if (emsg.type == ERL_REG_SEND) { // get uid and body data from eg: {123, <<"Hello">>} uid = erl_element(1, emsg.msg); msg = erl_element(2, emsg.msg); int userid = ERL_INT_VALUE(uid); char *body = (char *) ERL_BIN_PTR(msg); int body_len = ERL_BIN_SIZE(msg); // Is this userid connected? if(clients[userid]){ fprintf(stderr, "Sending %d bytes to uid %d\n", body_len, userid); evbuf = evbuffer_new(); evbuffer_add(evbuf, (const void*)body, (size_t) body_len); evhttp_send_reply_chunk(clients[userid], evbuf); evbuffer_free(evbuf); }else{ fprintf(stderr, "Discarding %d bytes to uid %d – user not connected\n", body_len, userid); // noop } erl_free_term(emsg.msg); erl_free_term(uid); erl_free_term(msg); } } } // if we got here, erlang connection died. // this thread is supposed to run forever // TODO – gracefully handle failure / reconnect / etc pthread_exit(0); } int main(int argc, char **argv) { // Launch the thread that runs the cnode: pthread_attr_t tattr; pthread_t helper; int status; pthread_create(&helper, NULL, cnode_run, NULL); int i; for(i=0;i<=MAXUSERS;i++) slots[i]=i; // Launch libevent httpd: struct evhttp *httpd; event_init(); httpd = evhttp_start("0.0.0.0", 8000); evhttp_set_gencb(httpd, request_handler, NULL); event_dispatch(); // Not reached, event_dispatch() shouldn't return evhttp_free(httpd); return 0; } 

The maximum number of users is set using #define, port 8000 is listened on and users are expected. The name of the Erlang cookie node is also hardcoded.

Start the node to which the server will connect:
 $ erl -setcookie secretcookie -sname httpdmaster@localhost 


Run C-node:
 $ gcc -o httpdcnode httpdcnode.c -lerl_interface -lei -levent $ ./httpdcnode 

Check that the node is visible:
 erl> nodes(hidden). [c1@localhost] 

Open localhost : 8000 / test / 123 in your browser . You should see the welcome message.
Next, send a message to the C node:
 erl> {any, c1@localhost} ! {123, <<"Hello Libevent World">>}. 

, Pid — {procname, node}. «any», , C-.

Erlang, libevent , Erlang.

, 1 000 000 httpdcnode , , . , 10 .

2GB:


2 .

:
 Mem: 32968672k total, 9636488k used, 23332184k free, 180k buffers 

kernel/ TCP 8 , , .

libevent-cnode . , “race conditions”, , .

, Erlang , C + libevent . C C-, Erlang API. .



, , Last.fm. 40 — , 40GB . 10GB .

Source: https://habr.com/ru/post/111600/


All Articles