-module(subsmanager). -behaviour(gen_server). -include("/usr/local/lib/erlang/lib/stdlib-1.15.4/include/qlc.hrl"). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export([add_subscriptions/1, remove_subscriptions/1, get_subscribers/1, first_run/0, stop/0, start_link/0]). -record(subscription, {subscriber, subscribee}). -record(state, {}). % state is all in mnesia -define(SERVER, global:whereis_name(?MODULE)). start_link() -> gen_server:start_link({global, ?MODULE}, ?MODULE, [], []). stop() -> gen_server:call(?SERVER, {stop}). add_subscriptions(SubsList) -> gen_server:call(?SERVER, {add_subscriptions, SubsList}, infinity). remove_subscriptions(SubsList) -> gen_server:call(?SERVER, {remove_subscriptions, SubsList}, infinity). get_subscribers(User) -> gen_server:call(?SERVER, {get_subscribers, User}). %% init([]) -> ok = mnesia:start(), io:format("Waiting on mnesia tables..\n",[]), mnesia:wait_for_tables([subscription], 30000), Info = mnesia:table_info(subscription, all), io:format("OK. Subscription table info: \n~w\n\n",[Info]), {ok, #state{}}. handle_call({stop}, _From, State) -> {stop, stop, State}; handle_call({add_subscriptions, SubsList}, _From, State) -> % Transactionally is slower: % F = fun() -> % [ ok = mnesia:write(S) || S <- SubsList ] % end, % mnesia:transaction(F), [ mnesia:dirty_write(S) || S <- SubsList ], {reply, ok, State}; handle_call({remove_subscriptions, SubsList}, _From, State) -> F = fun() -> [ ok = mnesia:delete_object(S) || S <- SubsList ] end, mnesia:transaction(F), {reply, ok, State}; handle_call({get_subscribers, User}, From, State) -> F = fun() -> Subs = mnesia:dirty_match_object(#subscription{subscriber='_', subscribee=User}), Users = [Dude || #subscription{subscriber=Dude, subscribee=_} <- Subs], gen_server:reply(From, Users) end, spawn(F), {noreply, State}. handle_cast(_Msg, State) -> {noreply, State}. handle_info(_Msg, State) -> {noreply, State}. terminate(_Reason, _State) -> mnesia:stop(), ok. code_change(_OldVersion, State, _Extra) -> io:format("Reloading code for ?MODULE\n",[]), {ok, State}. %% first_run() -> mnesia:create_schema([node()]), ok = mnesia:start(), Ret = mnesia:create_table(subscription, [ {disc_copies, [node()]}, {attributes, record_info(fields, subscription)}, {index, [subscribee]}, %index subscribee too {type, bag} ]), Ret.
$ mkdir /var/mnesia $ erl -boot start_sasl -mnesia dir '"/var/mnesia_data"' -sname subsman (subsman@localhost)1> c(subsmanager). {ok,subsmanager} (subsman@localhost)2> subsmanager:first_run(). ... {atomic,ok} (subsman@localhost)3> subsmanager:start_link(). Waiting on mnesia tables.. OK. Subscription table info: [{access_mode,read_write},{active_replicas,[subsman@localhost]},{arity,3},{attributes,[subscriber,subscribee]},{checkpoints,[]},{commit_work,[{index,bag,[{3,{ram,57378}}]}]},{cookie,{{1224,800064,900003},subsman@localhost}},{cstruct,{cstruct,subscription,bag,[],[subsman@localhost],[],0,read_write,[3],[],false,subscription,[subscriber,subscribee],[],[],{{1224,863164,904753},subsman@localhost},{{2,0},[]}}},{disc_copies,[subsman@localhost]},{disc_only_copies,[]},{frag_properties,[]},{index,[3]},{load_by_force,false},{load_node,subsman@localhost},{load_order,0},{load_reason,{dumper,create_table}},{local_content,false},{master_nodes,[]},{memory,288},{ram_copies,[]},{record_name,subscription},{record_validation,{subscription,3,bag}},{type,bag},{size,0},{snmp,[]},{storage_type,disc_copies},{subscribers,[]},{user_properties,[]},{version,{{2,0},[]}},{where_to_commit,[{subsman@localhost,disc_copies}]},{where_to_read,subsman@localhost},{where_to_write,[subsman@localhost]},{wild_pattern,{subscription,'_','_'}},{{index,3},57378}] {ok,<0.105.0>} (subsman@localhost)4> rr("subsmanager.erl"). [state,subscription] (subsman@localhost)5> subsmanager:add_subscriptions([ #subscription{subscriber=alice, subscribee=rj} ]). ok (subsman@localhost)6> subsmanager:add_subscriptions([ #subscription{subscriber=bob, subscribee=rj} ]). ok (subsman@localhost)7> subsmanager:get_subscribers(rj). [bob,alice] (subsman@localhost)8> subsmanager:remove_subscriptions([ #subscription{subscriber=bob, subscribee=rj} ]). ok (subsman@localhost)8> subsmanager:get_subscribers(rj). [alice] (subsman@localhost)10> subsmanager:get_subscribers(charlie). []
-module(router). -behaviour(gen_server). -export([start_link/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export([send/2, login/2, logout/1]). -define(SERVER, global:whereis_name(?MODULE)). % will hold bidirectional mapping between id <–> pid -record(state, {pid2id, id2pid}). start_link() -> gen_server:start_link({global, ?MODULE}, ?MODULE, [], []). % sends Msg to anyone subscribed to Id send(Id, Msg) -> gen_server:call(?SERVER, {send, Id, Msg}). login(Id, Pid) when is_pid(Pid) -> gen_server:call(?SERVER, {login, Id, Pid}). logout(Pid) when is_pid(Pid) -> gen_server:call(?SERVER, {logout, Pid}). %% init([]) -> % set this so we can catch death of logged in pids: process_flag(trap_exit, true), % use ets for routing tables {ok, #state{ pid2id = ets:new(?MODULE, [bag]), id2pid = ets:new(?MODULE, [bag]) } }. handle_call({login, Id, Pid}, _From, State) when is_pid(Pid) -> ets:insert(State#state.pid2id, {Pid, Id}), ets:insert(State#state.id2pid, {Id, Pid}), link(Pid), % tell us if they exit, so we can log them out %io:format("~w logged in as ~w\n",[Pid, Id]), {reply, ok, State}; handle_call({logout, Pid}, _From, State) when is_pid(Pid) -> unlink(Pid), PidRows = ets:lookup(State#state.pid2id, Pid), case PidRows of [] -> ok; _ -> IdRows = [ {I,P} || {P,I} <- PidRows ], % invert tuples ets:delete(State#state.pid2id, Pid), % delete all pid->id entries [ ets:delete_object(State#state.id2pid, Obj) || Obj <- IdRows ] % and all id->pid end, %io:format("pid ~w logged out\n",[Pid]), {reply, ok, State}; handle_call({send, Id, Msg}, From, State) -> F = fun() -> % get users who are subscribed to Id: Users = subsmanager:get_subscribers(Id), io:format("Subscribers of ~w = ~w\n",[Id, Users]), % get pids of anyone logged in from Users list: Pids0 = lists:map( fun(U)-> [ P || { _I, P } <- ets:lookup(State#state.id2pid, U) ] end, [ Id | Users ] % we are always subscribed to ourselves ), Pids = lists:flatten(Pids0), io:format("Pids: ~w\n", [Pids]), % send Msg to them all M = {router_msg, Msg}, [ Pid ! M || Pid <- Pids ], % respond with how many users saw the message gen_server:reply(From, {ok, length(Pids)}) end, spawn(F), {noreply, State}. % handle death and cleanup of logged in processes handle_info(Info, State) -> case Info of {'EXIT', Pid, _Why} -> handle_call({logout, Pid}, blah, State); Wtf -> io:format("Caught unhandled message: ~w\n", [Wtf]) end, {noreply, State}. handle_cast(_Msg, State) -> {noreply, State}. terminate(_Reason, _State) -> ok. code_change(_OldVsn, State, _Extra) -> {ok, State}.
(subsman@localhost)1> c(subsmanager), c(router), rr("subsmanager.erl"). (subsman@localhost)2> subsmanager:start_link(). (subsman@localhost)3> router:start_link(). (subsman@localhost)4> Subs = [#subscription{subscriber=alice, subscribee=rj}, #subscription{subscriber=bob, subscribee=rj}]. [#subscription{subscriber = alice,subscribee = rj}, #subscription{subscriber = bob,subscribee = rj}] (subsman@localhost)5> subsmanager:add_subscriptions(Subs). ok (subsman@localhost)6> router:send(rj, "RJ did something"). Subscribers of rj = [bob,alice] Pids: [] {ok,0} (subsman@localhost)7> router:login(alice, self()). ok (subsman@localhost)8> router:send(rj, "RJ did something"). Subscribers of rj = [bob,alice] Pids: [<0.46.0>] {ok,1} (subsman@localhost)9> receive {router_msg, M} -> io:format("~s\n",[M]) end. RJ did something ok
import igraph g = igraph.Graph.Barabasi(1000000, 15, directed=False) print "Edges: " + str(g.ecount()) + " Verticies: " + str(g.vcount()) g.write_edgelist("fakefriends.txt")
-module(readfriends). -export([load/1]). -record(subscription, {subscriber, subscribee}). load(Filename) -> for_each_line_in_file(Filename, fun(Line, Acc) -> [As, Bs] = string:tokens(string:strip(Line, right, $\n), " "), {A, _} = string:to_integer(As), {B, _} = string:to_integer(Bs), [ #subscription{subscriber=A, subscribee=B} | Acc ] end, [read], []). % via: http://www.trapexit.org/Reading_Lines_from_a_File for_each_line_in_file(Name, Proc, Mode, Accum0) -> {ok, Device} = file:open(Name, Mode), for_each_line(Device, Proc, Accum0). for_each_line(Device, Proc, Accum) -> case io:get_line(Device, "") of eof -> file:close(Device), Accum; Line -> NewAccum = Proc(Line, Accum), for_each_line(Device, Proc, NewAccum) end.
$ erl -name router@minifeeds4.gs2 +K true +A 128 -setcookie secretcookie -mnesia dump_log_write_threshold 50000 -mnesia dc_dump_limit 40 erl> c(readfriends), c(subsmanager). erl> subsmanager:first_run(). erl> subsmanager:start_link(). erl> subsmanager:add_subscriptions( readfriends:load("fakefriends.txt") ).
$ for i in `seq 1 17`; do echo sudo ifconfig eth0:$i 10.0.0.$i up ; done
-module(floodtest2). -compile(export_all). -define(SERVERADDR, "10.1.2.3"). % where mochiweb is running -define(SERVERPORT, 8000). % Generate the config in bash like so (chose some available address space): % EACH=62000; for i in `seq 1 17`; do echo "{{10,0,0,$i}, $((($i-1)*$EACH+1)), $(($i*$EACH))}, "; done run(Interval) -> Config = [ {{10,0,0,1}, 1, 62000}, {{10,0,0,2}, 62001, 124000}, {{10,0,0,3}, 124001, 186000}, {{10,0,0,4}, 186001, 248000}, {{10,0,0,5}, 248001, 310000}, {{10,0,0,6}, 310001, 372000}, {{10,0,0,7}, 372001, 434000}, {{10,0,0,8}, 434001, 496000}, {{10,0,0,9}, 496001, 558000}, {{10,0,0,10}, 558001, 620000}, {{10,0,0,11}, 620001, 682000}, {{10,0,0,12}, 682001, 744000}, {{10,0,0,13}, 744001, 806000}, {{10,0,0,14}, 806001, 868000}, {{10,0,0,15}, 868001, 930000}, {{10,0,0,16}, 930001, 992000}, {{10,0,0,17}, 992001, 1054000}], start(Config, Interval). start(Config, Interval) -> Monitor = monitor(), AdjustedInterval = Interval / length(Config), [ spawn(fun start/5, [Lower, Upper, Ip, AdjustedInterval, Monitor]) || {Ip, Lower, Upper} <- Config ], ok. start(LowerID, UpperID, _, _, _) when LowerID == UpperID -> done; start(LowerID, UpperID, LocalIP, Interval, Monitor) -> spawn(fun connect/5, [?SERVERADDR, ?SERVERPORT, LocalIP, "/test/"++LowerID, Monitor]), receive after Interval -> start(LowerID + 1, UpperID, LocalIP, Interval, Monitor) end. connect(ServerAddr, ServerPort, ClientIP, Path, Monitor) -> Opts = [binary, {packet, 0}, {ip, ClientIP}, {reuseaddr, true}, {active, false}], {ok, Sock} = gen_tcp:connect(ServerAddr, ServerPort, Opts), Monitor ! open, ReqL = io_lib:format("GET ~s\r\nHost: ~s\r\n\r\n", [Path, ServerAddr]), Req = list_to_binary(ReqL), ok = gen_tcp:send(Sock, [Req]), do_recv(Sock, Monitor), (catch gen_tcp:close(Sock)), ok. do_recv(Sock, Monitor)-> case gen_tcp:recv(Sock, 0) of {ok, B} -> Monitor ! {bytes, size(B)}, io:format("Recvd ~s\n", [ binary_to_list(B)]), io:format("Recvd ~w bytes\n", [size(B)]), do_recv(Sock, Monitor); {error, closed} -> Monitor ! closed, closed; Other -> Monitor ! closed, io:format("Other:~w\n",[Other]) end. % Monitor process receives stats and reports how much data we received etc: monitor() -> Pid = spawn(?MODULE, monitor0, [{0,0,0,0}]), timer:send_interval(10000, Pid, report), Pid. monitor0({Open, Closed, Chunks, Bytes}=S) -> receive report -> io:format("{Open, Closed, Chunks, Bytes} = ~w\n",[S]); open -> monitor0({Open + 1, Closed, Chunks, Bytes}); closed -> monitor0({Open, Closed + 1, Chunks, Bytes}); chunk -> monitor0({Open, Closed, Chunks + 1, Bytes}); {bytes, B} -> monitor0({Open, Closed, Chunks, Bytes + B}) end.
erl> c(floodtest2), floodtest2:run(20).
#include <sys/types.h> #include <sys/time.h> #include <sys/queue.h> #include <stdlib.h> #include <err.h> #include <event.h> #include <evhttp.h> #include <unistd.h> #include <stdio.h> #include <sys/socket.h> #include <netinet/in.h> #include <time.h> #include <pthread.h> #define BUFSIZE 4096 #define NUMCONNS 62000 #define SERVERADDR "10.103.1.43" #define SERVERPORT 8000 #define SLEEP_MS 10 char buf[BUFSIZE]; int bytes_recvd = 0; int chunks_recvd = 0; int closed = 0; int connected = 0; // called per chunk received void chunkcb(struct evhttp_request * req, void * arg) { int s = evbuffer_remove( req->input_buffer, &buf, BUFSIZE ); //printf("Read %d bytes: %s\n", s, &buf); bytes_recvd += s; chunks_recvd++; if(connected >= NUMCONNS && chunks_recvd%10000==0) printf(">Chunks: %d\tBytes: %d\tClosed: %d\n", chunks_recvd, bytes_recvd, closed); } // gets called when request completes void reqcb(struct evhttp_request * req, void * arg) { closed++; } int main(int argc, char **argv) { event_init(); struct evhttp *evhttp_connection; struct evhttp_request *evhttp_request; char addr[16]; char path[32]; // eg: "/test/123" int i,octet; for(octet=1; octet<=17; octet++){ sprintf(&addr, "10.224.0.%d", octet); for(i=1;i<=NUMCONNS;i++) { evhttp_connection = evhttp_connection_new(SERVERADDR, SERVERPORT); evhttp_connection_set_local_address(evhttp_connection, &addr); evhttp_set_timeout(evhttp_connection, 864000); // 10 day timeout evhttp_request = evhttp_request_new(reqcb, NULL); evhttp_request->chunk_cb = chunkcb; sprintf(&path, "/test/%d", ++connected); if(i%100==0) printf("Req: %s\t->\t%s\n", addr, &path); evhttp_make_request( evhttp_connection, evhttp_request, EVHTTP_REQ_GET, path ); evhttp_connection_set_timeout(evhttp_request->evcon, 864000); event_loop( EVLOOP_NONBLOCK ); if( connected % 200 == 0 ) printf("\nChunks: %d\tBytes: %d\tClosed: %d\n", chunks_recvd, bytes_recvd, closed); usleep(SLEEP_MS*1000); } } event_dispatch(); return 0; }
$ gcc -o httpclient httpclient.c -levent $ ./httpclient
void evhttp_connection_set_local_port(struct evhttp_connection *evcon, u_short port);.
evhttp_connection_set_local_port(evhttp_connection, 1024+i);
# netstat -n | awk '/^tcp/ {t[$NF]++}END{for(state in t){print state, t[state]}}' TIME_WAIT 8 ESTABLISHED 118222
erl> [ spawn( fun()->msggen:start(1000000, 10+random:uniform(100), 1000000) end) || I <- lists:seq(1,1000) ].
net.core.rmem_max = 33554432 net.core.wmem_max = 33554432 net.ipv4.tcp_rmem = 4096 16384 33554432 net.ipv4.tcp_wmem = 4096 16384 33554432 net.ipv4.tcp_mem = 786432 1048576 26777216 net.ipv4.tcp_max_tw_buckets = 360000 net.core.netdev_max_backlog = 2500 vm.min_free_kbytes = 65536 vm.swappiness = 0 net.ipv4.ip_local_port_range = 1024 65535
#include <sys/types.h> #include <sys/time.h> #include <sys/queue.h> #include <stdlib.h> #include <err.h> #include <event.h> #include <evhttp.h> #include <stdio.h> #include <sys/socket.h> #include <netinet/in.h> #include "erl_interface.h" #include "ei.h" #include <pthread.h> #define BUFSIZE 1024 #define MAXUSERS (17*65536) // C1024K // List of current http requests by uid: struct evhttp_request * clients[MAXUSERS+1]; // Memory to store uids passed to the cleanup callback: int slots[MAXUSERS+1]; // called when user disconnects void cleanup(struct evhttp_connection *evcon, void *arg) { int *uidp = (int *) arg; fprintf(stderr, "disconnected uid %d\n", *uidp); clients[*uidp] = NULL; } // handles http connections, sets them up for chunked transfer, // extracts the user id and registers in the global connection table, // also sends a welcome chunk. void request_handler(struct evhttp_request *req, void *arg) { struct evbuffer *buf; buf = evbuffer_new(); if (buf == NULL){ err(1, "failed to create response buffer"); } evhttp_add_header(req->output_headers, "Content-Type", "text/html; charset=utf-8"); int uid = -1; if(strncmp(evhttp_request_uri(req), "/test/", 6) == 0){ uid = atoi( 6+evhttp_request_uri(req) ); } if(uid <= 0){ evbuffer_add_printf(buf, "User id not found, try /test/123 instead"); evhttp_send_reply(req, HTTP_NOTFOUND, "Not Found", buf); evbuffer_free(buf); return; } if(uid > MAXUSERS){ evbuffer_add_printf(buf, "Max uid allowed is %d", MAXUSERS); evhttp_send_reply(req, HTTP_SERVUNAVAIL, "We ran out of numbers", buf); evbuffer_free(buf); return; } evhttp_send_reply_start(req, HTTP_OK, "OK"); // Send welcome chunk: evbuffer_add_printf(buf, "Welcome, Url: '%s' Id: %d\n", evhttp_request_uri(req), uid); evhttp_send_reply_chunk(req, buf); evbuffer_free(buf); // put reference into global uid->connection table: clients[uid] = req; // set close callback evhttp_connection_set_closecb( req->evcon, cleanup, &slots[uid] ); } // runs in a thread – the erlang c-node stuff // expects msgs like {uid, msg} and sends aa 'msg' chunk to uid if connected void cnode_run() { int fd; /* fd to Erlang node */ int got; /* Result of receive */ unsigned char buf[BUFSIZE]; /* Buffer for incoming message */ ErlMessage emsg; /* Incoming message */ ETERM *uid, *msg; erl_init(NULL, 0); if (erl_connect_init(1, "secretcookie", 0) == -1) erl_err_quit("erl_connect_init"); if ((fd = erl_connect("httpdmaster@localhost")) < 0) erl_err_quit("erl_connect"); fprintf(stderr, "Connected to httpdmaster@localhost\n\r"); struct evbuffer *evbuf; while (1) { got = erl_receive_msg(fd, buf, BUFSIZE, &emsg); if (got == ERL_TICK) { continue; } else if (got == ERL_ERROR) { fprintf(stderr, "ERL_ERROR from erl_receive_msg.\n"); break; } else { if (emsg.type == ERL_REG_SEND) { // get uid and body data from eg: {123, <<"Hello">>} uid = erl_element(1, emsg.msg); msg = erl_element(2, emsg.msg); int userid = ERL_INT_VALUE(uid); char *body = (char *) ERL_BIN_PTR(msg); int body_len = ERL_BIN_SIZE(msg); // Is this userid connected? if(clients[userid]){ fprintf(stderr, "Sending %d bytes to uid %d\n", body_len, userid); evbuf = evbuffer_new(); evbuffer_add(evbuf, (const void*)body, (size_t) body_len); evhttp_send_reply_chunk(clients[userid], evbuf); evbuffer_free(evbuf); }else{ fprintf(stderr, "Discarding %d bytes to uid %d – user not connected\n", body_len, userid); // noop } erl_free_term(emsg.msg); erl_free_term(uid); erl_free_term(msg); } } } // if we got here, erlang connection died. // this thread is supposed to run forever // TODO – gracefully handle failure / reconnect / etc pthread_exit(0); } int main(int argc, char **argv) { // Launch the thread that runs the cnode: pthread_attr_t tattr; pthread_t helper; int status; pthread_create(&helper, NULL, cnode_run, NULL); int i; for(i=0;i<=MAXUSERS;i++) slots[i]=i; // Launch libevent httpd: struct evhttp *httpd; event_init(); httpd = evhttp_start("0.0.0.0", 8000); evhttp_set_gencb(httpd, request_handler, NULL); event_dispatch(); // Not reached, event_dispatch() shouldn't return evhttp_free(httpd); return 0; }
$ erl -setcookie secretcookie -sname httpdmaster@localhost
$ gcc -o httpdcnode httpdcnode.c -lerl_interface -lei -levent $ ./httpdcnode
erl> nodes(hidden). [c1@localhost]
erl> {any, c1@localhost} ! {123, <<"Hello Libevent World">>}.
Mem: 32968672k total, 9636488k used, 23332184k free, 180k buffers
Source: https://habr.com/ru/post/111600/
All Articles