|
@@ -23,32 +23,32 @@
|
|
|
-behaviour(gen_leader).
|
|
|
|
|
|
-export([start_link/0, start_link/1,
|
|
|
- reg/1, reg/2, unreg/1,
|
|
|
- mreg/2,
|
|
|
- munreg/2,
|
|
|
- set_value/2,
|
|
|
- give_away/2,
|
|
|
- update_counter/2]).
|
|
|
+ reg/1, reg/2, unreg/1,
|
|
|
+ mreg/2,
|
|
|
+ munreg/2,
|
|
|
+ set_value/2,
|
|
|
+ give_away/2,
|
|
|
+ update_counter/2]).
|
|
|
|
|
|
-export([leader_call/1,
|
|
|
- leader_cast/1,
|
|
|
- sync/0,
|
|
|
- get_leader/0]).
|
|
|
+ leader_cast/1,
|
|
|
+ sync/0,
|
|
|
+ get_leader/0]).
|
|
|
|
|
|
%%% internal exports
|
|
|
-export([init/1,
|
|
|
- handle_cast/3,
|
|
|
- handle_call/4,
|
|
|
- handle_info/2,
|
|
|
- handle_leader_call/4,
|
|
|
- handle_leader_cast/3,
|
|
|
- handle_DOWN/3,
|
|
|
+ handle_cast/3,
|
|
|
+ handle_call/4,
|
|
|
+ handle_info/2,
|
|
|
+ handle_leader_call/4,
|
|
|
+ handle_leader_cast/3,
|
|
|
+ handle_DOWN/3,
|
|
|
elected/2, % original version
|
|
|
- elected/3,
|
|
|
- surrendered/3,
|
|
|
- from_leader/3,
|
|
|
- code_change/4,
|
|
|
- terminate/2]).
|
|
|
+ elected/3,
|
|
|
+ surrendered/3,
|
|
|
+ from_leader/3,
|
|
|
+ code_change/4,
|
|
|
+ terminate/2]).
|
|
|
|
|
|
-include("gproc.hrl").
|
|
|
|
|
@@ -57,8 +57,10 @@
|
|
|
-record(state, {
|
|
|
always_broadcast = false,
|
|
|
is_leader,
|
|
|
- sync_requests = []}).
|
|
|
+ sync_requests = []}).
|
|
|
|
|
|
+%% ==========================================================
|
|
|
+%% Start functions
|
|
|
|
|
|
start_link() ->
|
|
|
start_link({[node()|nodes()], []}).
|
|
@@ -70,9 +72,9 @@ start_link(Nodes) when is_list(Nodes) ->
|
|
|
start_link({Nodes, Opts}) ->
|
|
|
gen_leader:start_link(
|
|
|
?SERVER, Nodes, Opts, ?MODULE, [], []).
|
|
|
-
|
|
|
-%% ?SERVER, Nodes, [],?MODULE, [], [{debug,[trace]}]).
|
|
|
|
|
|
+%% ==========================================================
|
|
|
+%% API
|
|
|
|
|
|
%% {@see gproc:reg/1}
|
|
|
%%
|
|
@@ -103,19 +105,17 @@ munreg(T, Keys) ->
|
|
|
if is_list(Keys) -> leader_call({munreg, T, g, Keys, self()});
|
|
|
true -> erlang:error(badarg)
|
|
|
end.
|
|
|
-
|
|
|
|
|
|
unreg({_,g,_} = Key) ->
|
|
|
leader_call({unreg, Key, self()});
|
|
|
unreg(_) ->
|
|
|
erlang:error(badarg).
|
|
|
|
|
|
-
|
|
|
set_value({T,g,_} = Key, Value) when T==a; T==c ->
|
|
|
if is_integer(Value) ->
|
|
|
- leader_call({set, Key, Value});
|
|
|
+ leader_call({set, Key, Value});
|
|
|
true ->
|
|
|
- erlang:error(badarg)
|
|
|
+ erlang:error(badarg)
|
|
|
end;
|
|
|
set_value({_,g,_} = Key, Value) ->
|
|
|
leader_call({set, Key, Value, self()});
|
|
@@ -153,8 +153,8 @@ sync() ->
|
|
|
get_leader() ->
|
|
|
gen_leader:call(?MODULE, get_leader).
|
|
|
|
|
|
-%%% ==========================================================
|
|
|
-
|
|
|
+%% ==========================================================
|
|
|
+%% Server-side
|
|
|
|
|
|
handle_cast(_Msg, S, _) ->
|
|
|
{stop, unknown_cast, S}.
|
|
@@ -166,9 +166,6 @@ handle_call(_, _, S, _) ->
|
|
|
|
|
|
handle_info({'DOWN', _MRef, process, Pid, _}, S) ->
|
|
|
leader_cast({pid_is_DOWN, Pid}),
|
|
|
-%% ets:select_delete(?TAB, [{{{Pid,'_'}}, [], [true]}]),
|
|
|
-%% ets:delete(?TAB, Pid),
|
|
|
-%% lists:foreach(fun(Key) -> gproc_lib:remove_reg_1(Key, Pid) end, Keys),
|
|
|
{ok, S};
|
|
|
handle_info(_, S) ->
|
|
|
{ok, S}.
|
|
@@ -214,47 +211,45 @@ handle_DOWN(Node, S, _E) ->
|
|
|
Broadcast ->
|
|
|
{ok, Broadcast, S1}
|
|
|
end.
|
|
|
-%% ets:select_delete(?TAB, [{Head, Gs, [true]}]),
|
|
|
-%% {ok, [{delete, Globs}], S}.
|
|
|
|
|
|
check_sync_requests(Node, #state{sync_requests = SReqs} = S) ->
|
|
|
SReqs1 = lists:flatmap(
|
|
|
- fun({From, Ns}) ->
|
|
|
- case Ns -- [Node] of
|
|
|
- [] ->
|
|
|
- gen_leader:reply(From, {leader, reply, true}),
|
|
|
- [];
|
|
|
- Ns1 ->
|
|
|
- [{From, Ns1}]
|
|
|
- end
|
|
|
- end, SReqs),
|
|
|
+ fun({From, Ns}) ->
|
|
|
+ case Ns -- [Node] of
|
|
|
+ [] ->
|
|
|
+ gen_leader:reply(From, {leader, reply, true}),
|
|
|
+ [];
|
|
|
+ Ns1 ->
|
|
|
+ [{From, Ns1}]
|
|
|
+ end
|
|
|
+ end, SReqs),
|
|
|
S#state{sync_requests = SReqs1}.
|
|
|
|
|
|
handle_leader_call(sync, From, #state{sync_requests = SReqs} = S, E) ->
|
|
|
case gen_leader:alive(E) -- [node()] of
|
|
|
- [] ->
|
|
|
- {reply, true, S};
|
|
|
- Alive ->
|
|
|
- gen_leader:broadcast({from_leader, {sync, From}}, Alive, E),
|
|
|
- {noreply, S#state{sync_requests = [{From, Alive}|SReqs]}}
|
|
|
+ [] ->
|
|
|
+ {reply, true, S};
|
|
|
+ Alive ->
|
|
|
+ gen_leader:broadcast({from_leader, {sync, From}}, Alive, E),
|
|
|
+ {noreply, S#state{sync_requests = [{From, Alive}|SReqs]}}
|
|
|
end;
|
|
|
handle_leader_call({reg, {C,g,Name} = K, Value, Pid}, _From, S, _E) ->
|
|
|
case gproc_lib:insert_reg(K, Value, Pid, g) of
|
|
|
- false ->
|
|
|
- {reply, badarg, S};
|
|
|
- true ->
|
|
|
- gproc_lib:ensure_monitor(Pid,g),
|
|
|
- Vals =
|
|
|
- if C == a ->
|
|
|
- ets:lookup(?TAB, {K,a});
|
|
|
- C == c ->
|
|
|
+ false ->
|
|
|
+ {reply, badarg, S};
|
|
|
+ true ->
|
|
|
+ _ = gproc_lib:ensure_monitor(Pid,g),
|
|
|
+ Vals =
|
|
|
+ if C == a ->
|
|
|
+ ets:lookup(?TAB, {K,a});
|
|
|
+ C == c ->
|
|
|
[{{K,Pid},Pid,Value} | ets:lookup(?TAB,{{a,g,Name},a})];
|
|
|
- C == n ->
|
|
|
- [{{K,n},Pid,Value}];
|
|
|
- true ->
|
|
|
- [{{K,Pid},Pid,Value}]
|
|
|
- end,
|
|
|
- {reply, true, [{insert, Vals}], S}
|
|
|
+ C == n ->
|
|
|
+ [{{K,n},Pid,Value}];
|
|
|
+ true ->
|
|
|
+ [{{K,Pid},Pid,Value}]
|
|
|
+ end,
|
|
|
+ {reply, true, [{insert, Vals}], S}
|
|
|
end;
|
|
|
handle_leader_call({update_counter, {c,g,_Ctr} = Key, Incr, Pid}, _From, S, _E)
|
|
|
when is_integer(Incr) ->
|
|
@@ -267,102 +262,102 @@ handle_leader_call({update_counter, {c,g,_Ctr} = Key, Incr, Pid}, _From, S, _E)
|
|
|
end;
|
|
|
handle_leader_call({unreg, {T,g,Name} = K, Pid}, _From, S, _E) ->
|
|
|
Key = if T == n; T == a -> {K,T};
|
|
|
- true -> {K, Pid}
|
|
|
- end,
|
|
|
+ true -> {K, Pid}
|
|
|
+ end,
|
|
|
case ets:member(?TAB, Key) of
|
|
|
- true ->
|
|
|
- gproc_lib:remove_reg(K, Pid),
|
|
|
- if T == c ->
|
|
|
- case ets:lookup(?TAB, {{a,g,Name},a}) of
|
|
|
- [Aggr] ->
|
|
|
- %% updated by remove_reg/2
|
|
|
- {reply, true, [{delete,[Key, {Pid,K}]},
|
|
|
- {insert, [Aggr]}], S};
|
|
|
- [] ->
|
|
|
- {reply, true, [{delete, [Key, {Pid,K}]}], S}
|
|
|
- end;
|
|
|
- true ->
|
|
|
- {reply, true, [{delete, [Key]}], S}
|
|
|
- end;
|
|
|
- false ->
|
|
|
- {reply, badarg, S}
|
|
|
+ true ->
|
|
|
+ _ = gproc_lib:remove_reg(K, Pid),
|
|
|
+ if T == c ->
|
|
|
+ case ets:lookup(?TAB, {{a,g,Name},a}) of
|
|
|
+ [Aggr] ->
|
|
|
+ %% updated by remove_reg/2
|
|
|
+ {reply, true, [{delete,[Key, {Pid,K}]},
|
|
|
+ {insert, [Aggr]}], S};
|
|
|
+ [] ->
|
|
|
+ {reply, true, [{delete, [Key, {Pid,K}]}], S}
|
|
|
+ end;
|
|
|
+ true ->
|
|
|
+ {reply, true, [{delete, [Key]}], S}
|
|
|
+ end;
|
|
|
+ false ->
|
|
|
+ {reply, badarg, S}
|
|
|
end;
|
|
|
handle_leader_call({give_away, {T,g,_} = K, To, Pid}, _From, S, _E)
|
|
|
when T == a; T == n ->
|
|
|
Key = {K, T},
|
|
|
case ets:lookup(?TAB, Key) of
|
|
|
- [{_, Pid, Value}] ->
|
|
|
- case pid_to_give_away_to(To) of
|
|
|
- Pid ->
|
|
|
- {reply, Pid, S};
|
|
|
- ToPid when is_pid(ToPid) ->
|
|
|
- ets:insert(?TAB, [{Key, ToPid, Value},
|
|
|
- {{ToPid,K}, r}]),
|
|
|
- gproc_lib:ensure_monitor(ToPid, g),
|
|
|
- {reply, ToPid, [{delete, [Key, {Pid,K}]},
|
|
|
- {insert, [{Key, ToPid, Value}]}], S};
|
|
|
- undefined ->
|
|
|
- ets:delete(?TAB, Key),
|
|
|
- ets:delete(?TAB, {Pid, K}),
|
|
|
- {reply, undefined, [{delete, [Key, {Pid,K}]}], S}
|
|
|
- end;
|
|
|
- _ ->
|
|
|
- {reply, badarg, S}
|
|
|
+ [{_, Pid, Value}] ->
|
|
|
+ case pid_to_give_away_to(To) of
|
|
|
+ Pid ->
|
|
|
+ {reply, Pid, S};
|
|
|
+ ToPid when is_pid(ToPid) ->
|
|
|
+ ets:insert(?TAB, [{Key, ToPid, Value},
|
|
|
+ {{ToPid,K}, r}]),
|
|
|
+ _ = gproc_lib:ensure_monitor(ToPid, g),
|
|
|
+ {reply, ToPid, [{delete, [Key, {Pid,K}]},
|
|
|
+ {insert, [{Key, ToPid, Value}]}], S};
|
|
|
+ undefined ->
|
|
|
+ ets:delete(?TAB, Key),
|
|
|
+ ets:delete(?TAB, {Pid, K}),
|
|
|
+ {reply, undefined, [{delete, [Key, {Pid,K}]}], S}
|
|
|
+ end;
|
|
|
+ _ ->
|
|
|
+ {reply, badarg, S}
|
|
|
end;
|
|
|
handle_leader_call({mreg, T, g, L, Pid}, _From, S, _E) ->
|
|
|
if T==p; T==n ->
|
|
|
- try gproc_lib:insert_many(T, g, L, Pid) of
|
|
|
- {true,Objs} -> {reply, true, [{insert,Objs}], S};
|
|
|
- false -> {reply, badarg, S}
|
|
|
- catch
|
|
|
- error:_ -> {reply, badarg, S}
|
|
|
- end;
|
|
|
+ try gproc_lib:insert_many(T, g, L, Pid) of
|
|
|
+ {true,Objs} -> {reply, true, [{insert,Objs}], S};
|
|
|
+ false -> {reply, badarg, S}
|
|
|
+ catch
|
|
|
+ error:_ -> {reply, badarg, S}
|
|
|
+ end;
|
|
|
true -> {reply, badarg, S}
|
|
|
end;
|
|
|
handle_leader_call({munreg, T, g, L, Pid}, _From, S, _E) ->
|
|
|
try gproc_lib:remove_many(T, g, L, Pid) of
|
|
|
- [] ->
|
|
|
- {reply, true, S};
|
|
|
- Objs ->
|
|
|
- {reply, true, [{delete, Objs}], S}
|
|
|
+ [] ->
|
|
|
+ {reply, true, S};
|
|
|
+ Objs ->
|
|
|
+ {reply, true, [{delete, Objs}], S}
|
|
|
catch
|
|
|
- error:_ -> {reply, badarg, S}
|
|
|
+ error:_ -> {reply, badarg, S}
|
|
|
end;
|
|
|
handle_leader_call({set,{T,g,N} =K,V,Pid}, _From, S, _E) ->
|
|
|
if T == a ->
|
|
|
- if is_integer(V) ->
|
|
|
- case gproc_lib:do_set_value(K, V, Pid) of
|
|
|
- true -> {reply, true, [{insert,[{{K,T},Pid,V}]}], S};
|
|
|
- false -> {reply, badarg, S}
|
|
|
- end
|
|
|
- end;
|
|
|
+ if is_integer(V) ->
|
|
|
+ case gproc_lib:do_set_value(K, V, Pid) of
|
|
|
+ true -> {reply, true, [{insert,[{{K,T},Pid,V}]}], S};
|
|
|
+ false -> {reply, badarg, S}
|
|
|
+ end
|
|
|
+ end;
|
|
|
T == c ->
|
|
|
- try gproc_lib:do_set_counter_value(K, V, Pid),
|
|
|
- AKey = {{a,g,N},a},
|
|
|
- Aggr = ets:lookup(?TAB, AKey), % may be []
|
|
|
- {reply, true, [{insert, [{{K,Pid},Pid,V} | Aggr]}], S}
|
|
|
- catch
|
|
|
- error:_ ->
|
|
|
- {reply, badarg, S}
|
|
|
- end;
|
|
|
+ try gproc_lib:do_set_counter_value(K, V, Pid),
|
|
|
+ AKey = {{a,g,N},a},
|
|
|
+ Aggr = ets:lookup(?TAB, AKey), % may be []
|
|
|
+ {reply, true, [{insert, [{{K,Pid},Pid,V} | Aggr]}], S}
|
|
|
+ catch
|
|
|
+ error:_ ->
|
|
|
+ {reply, badarg, S}
|
|
|
+ end;
|
|
|
true ->
|
|
|
- case gproc_lib:do_set_value(K, V, Pid) of
|
|
|
- true ->
|
|
|
- Obj = if T==n -> {{K, T}, Pid, V};
|
|
|
- true -> {{K, Pid}, Pid, V}
|
|
|
- end,
|
|
|
- {reply, true, [{insert,[Obj]}], S};
|
|
|
- false ->
|
|
|
- {reply, badarg, S}
|
|
|
- end
|
|
|
+ case gproc_lib:do_set_value(K, V, Pid) of
|
|
|
+ true ->
|
|
|
+ Obj = if T==n -> {{K, T}, Pid, V};
|
|
|
+ true -> {{K, Pid}, Pid, V}
|
|
|
+ end,
|
|
|
+ {reply, true, [{insert,[Obj]}], S};
|
|
|
+ false ->
|
|
|
+ {reply, badarg, S}
|
|
|
+ end
|
|
|
end;
|
|
|
handle_leader_call({await, Key, Pid}, {_,Ref} = From, S, _E) ->
|
|
|
%% The pid in _From is of the gen_leader instance that forwarded the
|
|
|
%% call - not of the client. This is why the Pid is explicitly passed.
|
|
|
%% case gproc_lib:await(Key, {Pid,Ref}) of
|
|
|
case gproc_lib:await(Key, Pid, From) of
|
|
|
- {reply, {Ref, {K, P, V}}} ->
|
|
|
- {reply, {Ref, {K, P, V}}, S};
|
|
|
+ {reply, {Ref, {K, P, V}}} ->
|
|
|
+ {reply, {Ref, {K, P, V}}, S};
|
|
|
{reply, Reply, Insert} ->
|
|
|
{reply, Reply, [{insert, Insert}], S}
|
|
|
end;
|
|
@@ -372,21 +367,21 @@ handle_leader_call(_, _, S, _E) ->
|
|
|
handle_leader_cast({sync_reply, Node, Ref}, S, _E) ->
|
|
|
#state{sync_requests = SReqs} = S,
|
|
|
case lists:keyfind(Ref, 1, SReqs) of
|
|
|
- false ->
|
|
|
- %% This should never happen, except perhaps if the leader who
|
|
|
- %% received the sync request died, and the new leader gets the
|
|
|
- %% sync reply. In that case, we trust that the client has been notified
|
|
|
- %% anyway, and ignore the message.
|
|
|
- {ok, S};
|
|
|
- {_, Ns} ->
|
|
|
- case lists:delete(Node, Ns) of
|
|
|
- [] ->
|
|
|
- gen_leader:reply(Ref, {leader, reply, true}),
|
|
|
- {ok, S#state{sync_requests = lists:keydelete(Ref,1,SReqs)}};
|
|
|
- Ns1 ->
|
|
|
- SReqs1 = lists:keyreplace(Ref, 1, SReqs, {Ref, Ns1}),
|
|
|
- {ok, S#state{sync_requests = SReqs1}}
|
|
|
- end
|
|
|
+ false ->
|
|
|
+ %% This should never happen, except perhaps if the leader who
|
|
|
+ %% received the sync request died, and the new leader gets the
|
|
|
+ %% sync reply. In that case, we trust that the client has been notified
|
|
|
+ %% anyway, and ignore the message.
|
|
|
+ {ok, S};
|
|
|
+ {_, Ns} ->
|
|
|
+ case lists:delete(Node, Ns) of
|
|
|
+ [] ->
|
|
|
+ gen_leader:reply(Ref, {leader, reply, true}),
|
|
|
+ {ok, S#state{sync_requests = lists:keydelete(Ref,1,SReqs)}};
|
|
|
+ Ns1 ->
|
|
|
+ SReqs1 = lists:keyreplace(Ref, 1, SReqs, {Ref, Ns1}),
|
|
|
+ {ok, S#state{sync_requests = SReqs1}}
|
|
|
+ end
|
|
|
end;
|
|
|
handle_leader_cast({add_globals, Missing}, S, _E) ->
|
|
|
%% This is an audit message: a peer (non-leader) had info about granted
|
|
@@ -399,13 +394,13 @@ handle_leader_cast({remove_globals, Globals}, S, _E) ->
|
|
|
{ok, S};
|
|
|
handle_leader_cast({pid_is_DOWN, Pid}, S, _E) ->
|
|
|
Globals = ets:select(?TAB, [{{{Pid,'$1'},r},
|
|
|
- [{'==',{element,2,'$1'},g}],[{{'$1',Pid}}]}]),
|
|
|
+ [{'==',{element,2,'$1'},g}],[{{'$1',Pid}}]}]),
|
|
|
ets:delete(?TAB, {Pid,g}),
|
|
|
case process_globals(Globals) of
|
|
|
- [] ->
|
|
|
- {ok, S};
|
|
|
- Broadcast ->
|
|
|
- {ok, Broadcast, S}
|
|
|
+ [] ->
|
|
|
+ {ok, S};
|
|
|
+ Broadcast ->
|
|
|
+ {ok, Broadcast, S}
|
|
|
end.
|
|
|
|
|
|
process_globals(Globals) ->
|
|
@@ -427,69 +422,58 @@ process_globals(Globals) ->
|
|
|
[{Op,Objs} || {Op,Objs} <- [{insert,Modified},
|
|
|
{delete,Globals}], Objs =/= []].
|
|
|
|
|
|
-
|
|
|
code_change(_FromVsn, S, _Extra, _E) ->
|
|
|
{ok, S}.
|
|
|
|
|
|
terminate(_Reason, _S) ->
|
|
|
ok.
|
|
|
|
|
|
-
|
|
|
-
|
|
|
from_leader({sync, Ref}, S, _E) ->
|
|
|
gen_leader:leader_cast(?MODULE, {sync_reply, node(), Ref}),
|
|
|
{ok, S};
|
|
|
from_leader(Ops, S, _E) ->
|
|
|
lists:foreach(
|
|
|
fun({delete, Globals}) ->
|
|
|
- delete_globals(Globals);
|
|
|
- ({insert, Globals}) ->
|
|
|
- ets:insert(?TAB, Globals),
|
|
|
- lists:foreach(
|
|
|
- fun({{{_,g,_}=Key,_}, P, _}) ->
|
|
|
- ets:insert(?TAB, {{P,Key},r}),
|
|
|
- gproc_lib:ensure_monitor(P,g);
|
|
|
+ delete_globals(Globals);
|
|
|
+ ({insert, Globals}) ->
|
|
|
+ ets:insert(?TAB, Globals),
|
|
|
+ lists:foreach(
|
|
|
+ fun({{{_,g,_}=Key,_}, P, _}) ->
|
|
|
+ ets:insert(?TAB, {{P,Key},r}),
|
|
|
+ gproc_lib:ensure_monitor(P,g);
|
|
|
({{P,_K},r}) ->
|
|
|
gproc_lib:ensure_monitor(P,g);
|
|
|
(_) ->
|
|
|
skip
|
|
|
- end, Globals)
|
|
|
+ end, Globals)
|
|
|
end, Ops),
|
|
|
{ok, S}.
|
|
|
|
|
|
delete_globals(Globals) ->
|
|
|
lists:foreach(
|
|
|
fun({{_,g,_},T} = K) when is_atom(T) ->
|
|
|
- ets:delete(?TAB, K);
|
|
|
- ({Key, Pid}) when is_pid(Pid) ->
|
|
|
+ ets:delete(?TAB, K);
|
|
|
+ ({Key, Pid}) when is_pid(Pid) ->
|
|
|
K = ets_key(Key,Pid),
|
|
|
- ets:delete(?TAB, K),
|
|
|
- ets:delete(?TAB, {Pid, Key});
|
|
|
- ({Pid, K}) when is_pid(Pid) ->
|
|
|
- ets:delete(?TAB, {Pid, K})
|
|
|
- %% case node(Pid) =:= node() of
|
|
|
- %% true ->
|
|
|
- %% ets:delete(?TAB, {Pid,g});
|
|
|
- %% _ -> ok
|
|
|
- %% end
|
|
|
+ ets:delete(?TAB, K),
|
|
|
+ ets:delete(?TAB, {Pid, Key});
|
|
|
+ ({Pid, K}) when is_pid(Pid) ->
|
|
|
+ ets:delete(?TAB, {Pid, K})
|
|
|
end, Globals).
|
|
|
|
|
|
ets_key({T,_,_} = K, _) when T==n; T==a ->
|
|
|
{K, T};
|
|
|
ets_key(K, Pid) ->
|
|
|
{K, Pid}.
|
|
|
-
|
|
|
|
|
|
leader_call(Req) ->
|
|
|
case gen_leader:leader_call(?MODULE, Req) of
|
|
|
- badarg -> erlang:error(badarg, Req);
|
|
|
- Reply -> Reply
|
|
|
+ badarg -> erlang:error(badarg, Req);
|
|
|
+ Reply -> Reply
|
|
|
end.
|
|
|
|
|
|
leader_cast(Msg) ->
|
|
|
gen_leader:leader_cast(?MODULE, Msg).
|
|
|
-
|
|
|
-
|
|
|
|
|
|
init(Opts) ->
|
|
|
S0 = #state{},
|
|
@@ -497,44 +481,42 @@ init(Opts) ->
|
|
|
S0#state.always_broadcast),
|
|
|
{ok, #state{always_broadcast = AlwaysBcast}}.
|
|
|
|
|
|
-
|
|
|
surrendered_1(Globs) ->
|
|
|
My_local_globs =
|
|
|
- ets:select(?TAB, [{{{{'_',g,'_'},'_'},'$1', '_'},
|
|
|
- [{'==', {node,'$1'}, node()}],
|
|
|
- ['$_']}]),
|
|
|
+ ets:select(?TAB, [{{{{'_',g,'_'},'_'},'$1', '_'},
|
|
|
+ [{'==', {node,'$1'}, node()}],
|
|
|
+ ['$_']}]),
|
|
|
%% remove all remote globals - we don't have monitors on them.
|
|
|
ets:select_delete(?TAB, [{{{{'_',g,'_'},'_'}, '$1', '_'},
|
|
|
- [{'=/=', {node,'$1'}, node()}],
|
|
|
- [true]}]),
|
|
|
+ [{'=/=', {node,'$1'}, node()}],
|
|
|
+ [true]}]),
|
|
|
%% insert new non-local globals, collect the leader's version of
|
|
|
%% what my globals are
|
|
|
Ldr_local_globs =
|
|
|
- lists:foldl(
|
|
|
- fun({{Key,_}=K, Pid, V}, Acc) when node(Pid) =/= node() ->
|
|
|
- ets:insert(?TAB, [{K, Pid, V}, {{Pid,Key}}]),
|
|
|
- Acc;
|
|
|
- ({_, Pid, _} = Obj, Acc) when node(Pid) == node() ->
|
|
|
- [Obj|Acc]
|
|
|
- end, [], Globs),
|
|
|
+ lists:foldl(
|
|
|
+ fun({{Key,_}=K, Pid, V}, Acc) when node(Pid) =/= node() ->
|
|
|
+ ets:insert(?TAB, [{K, Pid, V}, {{Pid,Key}}]),
|
|
|
+ Acc;
|
|
|
+ ({_, Pid, _} = Obj, Acc) when node(Pid) == node() ->
|
|
|
+ [Obj|Acc]
|
|
|
+ end, [], Globs),
|
|
|
case [{K,P,V} || {K,P,V} <- My_local_globs,
|
|
|
- not(lists:keymember(K, 1, Ldr_local_globs))] of
|
|
|
- [] ->
|
|
|
- %% phew! We have the same picture
|
|
|
- ok;
|
|
|
- [_|_] = Missing ->
|
|
|
- %% This is very unlikely, I think
|
|
|
- leader_cast({add_globals, Missing})
|
|
|
+ not(lists:keymember(K, 1, Ldr_local_globs))] of
|
|
|
+ [] ->
|
|
|
+ %% phew! We have the same picture
|
|
|
+ ok;
|
|
|
+ [_|_] = Missing ->
|
|
|
+ %% This is very unlikely, I think
|
|
|
+ leader_cast({add_globals, Missing})
|
|
|
end,
|
|
|
case [{K,P} || {K,P,_} <- Ldr_local_globs,
|
|
|
- not(lists:keymember(K, 1, My_local_globs))] of
|
|
|
- [] ->
|
|
|
- ok;
|
|
|
- [_|_] = Remove ->
|
|
|
- leader_cast({remove_globals, Remove})
|
|
|
+ not(lists:keymember(K, 1, My_local_globs))] of
|
|
|
+ [] ->
|
|
|
+ ok;
|
|
|
+ [_|_] = Remove ->
|
|
|
+ leader_cast({remove_globals, Remove})
|
|
|
end.
|
|
|
|
|
|
-
|
|
|
update_aggr_counter({c,g,Ctr}, Incr) ->
|
|
|
Key = {{a,g,Ctr},a},
|
|
|
case ets:lookup(?TAB, Key) of
|
|
@@ -555,154 +537,3 @@ pid_to_give_away_to({T,g,_} = Key) when T==n; T==a ->
|
|
|
_ ->
|
|
|
undefined
|
|
|
end.
|
|
|
-
|
|
|
-%% -ifdef(TEST).
|
|
|
-
|
|
|
-%% dist_test_() ->
|
|
|
-%% {timeout, 60,
|
|
|
-%% [{foreach,
|
|
|
-%% fun() ->
|
|
|
-%% Ns = start_slaves([n1, n2]),
|
|
|
-%% %% dbg:tracer(),
|
|
|
-%% %% [dbg:n(N) || N <- Ns],
|
|
|
-%% %% dbg:tpl(gproc_dist, x),
|
|
|
-%% %% dbg:p(all,[c]),
|
|
|
-%% Ns
|
|
|
-%% end,
|
|
|
-%% fun(Ns) ->
|
|
|
-%% [rpc:call(N, init, stop, []) || N <- Ns]
|
|
|
-%% end,
|
|
|
-%% [
|
|
|
-%% {with, [fun(Ns) -> {in_parallel, [fun(X) -> t_simple_reg(X) end,
|
|
|
-%% fun(X) -> t_await_reg(X) end,
|
|
|
-%% fun(X) -> t_give_away(X) end]
|
|
|
-%% }
|
|
|
-%% end]}
|
|
|
-%% ]}
|
|
|
-%% ]}.
|
|
|
-
|
|
|
-%% -define(T_NAME, {n, g, {?MODULE, ?LINE}}).
|
|
|
-
|
|
|
-%% t_simple_reg([H|_] = Ns) ->
|
|
|
-%% ?debugMsg(t_simple_reg),
|
|
|
-%% Name = ?T_NAME,
|
|
|
-%% P = t_spawn_reg(H, Name),
|
|
|
-%% ?assertMatch(ok, t_lookup_everywhere(Name, Ns, P)),
|
|
|
-%% ?assertMatch(true, t_call(P, {apply, gproc, unreg, [Name]})),
|
|
|
-%% ?assertMatch(ok, t_lookup_everywhere(Name, Ns, undefined)),
|
|
|
-%% ?assertMatch(ok, t_call(P, die)).
|
|
|
-
|
|
|
-%% t_await_reg([A,B|_]) ->
|
|
|
-%% ?debugMsg(t_await_reg),
|
|
|
-%% Name = ?T_NAME,
|
|
|
-%% P = t_spawn(A),
|
|
|
-%% P ! {self(), {apply, gproc, await, [Name]}},
|
|
|
-%% P1 = t_spawn_reg(B, Name),
|
|
|
-%% ?assert(P1 == receive
|
|
|
-%% {P, Res} ->
|
|
|
-%% element(1, Res)
|
|
|
-%% end),
|
|
|
-%% ?assertMatch(ok, t_call(P, die)),
|
|
|
-%% ?assertMatch(ok, t_call(P1, die)).
|
|
|
-
|
|
|
-%% t_give_away([A,B|_] = Ns) ->
|
|
|
-%% ?debugMsg(t_give_away),
|
|
|
-%% Na = ?T_NAME,
|
|
|
-%% Nb = ?T_NAME,
|
|
|
-%% Pa = t_spawn_reg(A, Na),
|
|
|
-%% Pb = t_spawn_reg(B, Nb),
|
|
|
-%% ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pa)),
|
|
|
-%% ?assertMatch(ok, t_lookup_everywhere(Nb, Ns, Pb)),
|
|
|
-%% %% ?debugHere,
|
|
|
-%% ?assertMatch(Pb, t_call(Pa, {apply, {gproc, give_away, [Na, Nb]}})),
|
|
|
-%% ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pb)),
|
|
|
-%% %% ?debugHere,
|
|
|
-%% ?assertMatch(Pa, t_call(Pa, {apply, {gproc, give_away, [Na, Pa]}})),
|
|
|
-%% ?assertMatch(ok, t_lookup_everywhere(Na, Ns, Pa)),
|
|
|
-%% %% ?debugHere,
|
|
|
-%% ?assertMatch(ok, t_call(Pa, die)),
|
|
|
-%% ?assertMatch(ok, t_call(Pb, die)).
|
|
|
-
|
|
|
-%% t_sleep() ->
|
|
|
-%% timer:sleep(1000).
|
|
|
-
|
|
|
-%% t_lookup_everywhere(Key, Nodes, Exp) ->
|
|
|
-%% t_lookup_everywhere(Key, Nodes, Exp, 3).
|
|
|
-
|
|
|
-%% t_lookup_everywhere(Key, _, Exp, 0) ->
|
|
|
-%% {lookup_failed, Key, Exp};
|
|
|
-%% t_lookup_everywhere(Key, Nodes, Exp, I) ->
|
|
|
-%% Expected = [{N, Exp} || N <- Nodes],
|
|
|
-%% Found = [{N,rpc:call(N, gproc, where, [Key])} || N <- Nodes],
|
|
|
-%% if Expected =/= Found ->
|
|
|
-%% ?debugFmt("lookup ~p failed (~p), retrying...~n", [Key, Found]),
|
|
|
-%% t_sleep(),
|
|
|
-%% t_lookup_everywhere(Key, Nodes, Exp, I-1);
|
|
|
-%% true ->
|
|
|
-%% ok
|
|
|
-%% end.
|
|
|
-
|
|
|
-
|
|
|
-%% t_spawn(Node) ->
|
|
|
-%% Me = self(),
|
|
|
-%% P = spawn(Node, fun() ->
|
|
|
-%% Me ! {self(), ok},
|
|
|
-%% t_loop()
|
|
|
-%% end),
|
|
|
-%% receive
|
|
|
-%% {P, ok} -> P
|
|
|
-%% end.
|
|
|
-
|
|
|
-%% t_spawn_reg(Node, Name) ->
|
|
|
-%% Me = self(),
|
|
|
-%% spawn(Node, fun() ->
|
|
|
-%% ?assertMatch(true, gproc:reg(Name)),
|
|
|
-%% Me ! {self(), ok},
|
|
|
-%% t_loop()
|
|
|
-%% end),
|
|
|
-%% receive
|
|
|
-%% {P, ok} -> P
|
|
|
-%% end.
|
|
|
-
|
|
|
-%% t_call(P, Req) ->
|
|
|
-%% P ! {self(), Req},
|
|
|
-%% receive
|
|
|
-%% {P, Res} ->
|
|
|
-%% Res
|
|
|
-%% end.
|
|
|
-
|
|
|
-%% t_loop() ->
|
|
|
-%% receive
|
|
|
-%% {From, die} ->
|
|
|
-%% From ! {self(), ok};
|
|
|
-%% {From, {apply, M, F, A}} ->
|
|
|
-%% From ! {self(), apply(M, F, A)},
|
|
|
-%% t_loop()
|
|
|
-%% end.
|
|
|
-
|
|
|
-%% start_slaves(Ns) ->
|
|
|
-%% [H|T] = Nodes = [start_slave(N) || N <- Ns],
|
|
|
-%% %% ?debugVal([pong = rpc:call(H, net, ping, [N]) || N <- T]),
|
|
|
-%% %% ?debugVal(rpc:multicall(Nodes, application, start, [gproc])),
|
|
|
-%% Nodes.
|
|
|
-
|
|
|
-%% start_slave(Name) ->
|
|
|
-%% case node() of
|
|
|
-%% nonode@nohost ->
|
|
|
-%% os:cmd("epmd -daemon"),
|
|
|
-%% {ok, _} = net_kernel:start([gproc_master, shortnames]);
|
|
|
-%% _ ->
|
|
|
-%% ok
|
|
|
-%% end,
|
|
|
-%% {ok, Node} = slave:start(
|
|
|
-%% host(), Name,
|
|
|
-%% "-pa . -pz ../ebin -pa ../deps/gen_leader/ebin "
|
|
|
-%% "-gproc gproc_dist all"),
|
|
|
-%% %% io:fwrite(user, "Slave node: ~p~n", [Node]),
|
|
|
-%% Node.
|
|
|
-
|
|
|
-%% host() ->
|
|
|
-%% [Name, Host] = re:split(atom_to_list(node()), "@", [{return, list}]),
|
|
|
-%% list_to_atom(Host).
|
|
|
-
|
|
|
-%% -endif.
|