|
@@ -44,7 +44,7 @@
|
|
|
-export([init/1,
|
|
|
handle_cast/3,
|
|
|
handle_call/4,
|
|
|
- handle_info/2,
|
|
|
+ handle_info/2, handle_info/3,
|
|
|
handle_leader_call/4,
|
|
|
handle_leader_cast/3,
|
|
|
handle_DOWN/3,
|
|
@@ -208,11 +208,15 @@ handle_call(_, _, S, _) ->
|
|
|
{reply, badarg, S}.
|
|
|
|
|
|
handle_info({'DOWN', _MRef, process, Pid, _}, S) ->
|
|
|
+ ets:delete(?TAB, {{Pid, g}}),
|
|
|
leader_cast({pid_is_DOWN, Pid}),
|
|
|
{ok, S};
|
|
|
handle_info(_, S) ->
|
|
|
{ok, S}.
|
|
|
|
|
|
+handle_info(Msg, S, _E) ->
|
|
|
+ handle_info(Msg, S).
|
|
|
+
|
|
|
|
|
|
elected(S, _E) ->
|
|
|
{ok, {globals,globs()}, S#state{is_leader = true}}.
|
|
@@ -233,8 +237,14 @@ elected(S, _E, _Node) ->
|
|
|
end.
|
|
|
|
|
|
globs() ->
|
|
|
- ets:select(?TAB, [{{{{'_',g,'_'},'_'},'_','_'},[],['$_']}]).
|
|
|
+ Gs = ets:select(?TAB, [{{{{'_',g,'_'},'_'},'_','_'},[],['$_']}]),
|
|
|
+ _ = [gproc_lib:ensure_monitor(Pid, g) || {_, Pid, _} <- Gs],
|
|
|
+ Gs.
|
|
|
|
|
|
+surrendered(#state{is_leader = true} = S, {globals, Globs}, _E) ->
|
|
|
+ %% Leader conflict!
|
|
|
+ surrendered_1(Globs),
|
|
|
+ {ok, S#state{is_leader = false}};
|
|
|
surrendered(S, {globals, Globs}, _E) ->
|
|
|
%% globals from this node should be more correct in our table than
|
|
|
%% in the leader's
|
|
@@ -276,22 +286,23 @@ handle_leader_call(sync, From, #state{sync_requests = SReqs} = S, E) ->
|
|
|
gen_leader:broadcast({from_leader, {sync, From}}, Alive, E),
|
|
|
{noreply, S#state{sync_requests = [{From, Alive}|SReqs]}}
|
|
|
end;
|
|
|
-handle_leader_call({reg, {C,g,Name} = K, Value, Pid}, _From, S, _E) ->
|
|
|
+handle_leader_call({reg, {_C,g,_Name} = K, Value, Pid}, _From, S, _E) ->
|
|
|
case gproc_lib:insert_reg(K, Value, Pid, g) of
|
|
|
false ->
|
|
|
{reply, badarg, S};
|
|
|
true ->
|
|
|
_ = gproc_lib:ensure_monitor(Pid,g),
|
|
|
- Vals =
|
|
|
- if C == a ->
|
|
|
- ets:lookup(?TAB, {K,a});
|
|
|
- C == c ->
|
|
|
- [{{K,Pid},Pid,Value} | ets:lookup(?TAB,{{a,g,Name},a})];
|
|
|
- C == n ->
|
|
|
- [{{K,n},Pid,Value}];
|
|
|
- true ->
|
|
|
- [{{K,Pid},Pid,Value}]
|
|
|
- end,
|
|
|
+ Vals = mk_broadcast_insert_vals([{K, Pid, Value}]),
|
|
|
+ %% Vals =
|
|
|
+ %% if C == a ->
|
|
|
+ %% ets:lookup(?TAB, {K,a});
|
|
|
+ %% C == c ->
|
|
|
+ %% [{{K,Pid},Pid,Value} | ets:lookup(?TAB,{{a,g,Name},a})];
|
|
|
+ %% C == n ->
|
|
|
+ %% [{{K,n},Pid,Value}];
|
|
|
+ %% true ->
|
|
|
+ %% [{{K,Pid},Pid,Value}]
|
|
|
+ %% end,
|
|
|
{reply, true, [{insert, Vals}], S}
|
|
|
end;
|
|
|
handle_leader_call({reg_or_locate, {n,g,_} = K, Value, Pid}, _From, S, _E) ->
|
|
@@ -500,8 +511,8 @@ handle_leader_cast({add_globals, Missing}, S, _E) ->
|
|
|
%% This is an audit message: a peer (non-leader) had info about granted
|
|
|
%% global resources that we didn't know of when we became leader.
|
|
|
%% This could happen due to a race condition when the old leader died.
|
|
|
- ets:insert(?TAB, Missing),
|
|
|
- {ok, [{insert, Missing}], S};
|
|
|
+ Update = insert_globals(Missing),
|
|
|
+ {ok, [{insert, Update}], S};
|
|
|
handle_leader_cast({remove_globals, Globals}, S, _E) ->
|
|
|
delete_globals(Globals, []),
|
|
|
{ok, S};
|
|
@@ -533,6 +544,22 @@ handle_leader_cast({pid_is_DOWN, Pid}, S, _E) ->
|
|
|
{ok, Broadcast, S}
|
|
|
end.
|
|
|
|
|
|
+mk_broadcast_insert_vals(Objs) ->
|
|
|
+ lists:flatmap(
|
|
|
+ fun({{C, g, Name} = K, Pid, Value}) ->
|
|
|
+ if C == a ->
|
|
|
+ ets:lookup(?TAB, {K,a}) ++ ets:lookup(?TAB, {Pid,K});
|
|
|
+ C == c ->
|
|
|
+ [{{K,Pid},Pid,Value} | ets:lookup(?TAB,{{a,g,Name},a})]
|
|
|
+ ++ ets:lookup(?TAB, {Pid,K});
|
|
|
+ C == n ->
|
|
|
+ [{{K,n},Pid,Value}| ets:lookup(?TAB, {Pid,K})];
|
|
|
+ true ->
|
|
|
+ [{{K,Pid},Pid,Value} | ets:lookup(?TAB, {Pid,K})]
|
|
|
+ end
|
|
|
+ end, Objs).
|
|
|
+
|
|
|
+
|
|
|
process_globals(Globals) ->
|
|
|
Modified =
|
|
|
lists:foldl(
|
|
@@ -579,20 +606,36 @@ from_leader(Ops, S, _E) ->
|
|
|
fun({delete, Globals, Event}) ->
|
|
|
delete_globals(Globals, Event);
|
|
|
({insert, Globals}) ->
|
|
|
- ets:insert(?TAB, Globals),
|
|
|
- lists:foreach(
|
|
|
- fun({{{_,g,_}=Key,_}, P, _}) ->
|
|
|
- ets:insert_new(?TAB, {{P,Key}, []}),
|
|
|
- gproc_lib:ensure_monitor(P,g);
|
|
|
- ({{P,_K}, _Opts} = Obj) when is_pid(P) ->
|
|
|
- ets:insert(?TAB, Obj),
|
|
|
- gproc_lib:ensure_monitor(P,g);
|
|
|
- (_) ->
|
|
|
- skip
|
|
|
- end, Globals)
|
|
|
+ _ = insert_globals(Globals)
|
|
|
end, Ops),
|
|
|
{ok, S}.
|
|
|
|
|
|
+insert_globals(Globals) ->
|
|
|
+ ets:insert(?TAB, Globals),
|
|
|
+ lists:foldl(
|
|
|
+ fun({{{T,_,_} = Key,Pid}, Pid, _}, A) ->
|
|
|
+ A1 = case T of
|
|
|
+ c ->
|
|
|
+ Incr = ets:lookup_element(?TAB, {Key,Pid}, 3),
|
|
|
+ update_aggr_counter(Key, -Incr) ++ A;
|
|
|
+ _ ->
|
|
|
+ A
|
|
|
+ end,
|
|
|
+ ets:insert_new(?TAB, {{Pid,Key}, []}),
|
|
|
+ gproc_lib:ensure_monitor(Pid,g),
|
|
|
+ A1;
|
|
|
+ ({{{_,_,_}, n}, Pid, _}, A) ->
|
|
|
+ gproc_lib:ensure_monitor(Pid,g),
|
|
|
+ A;
|
|
|
+ ({{P,_K}, Opts} = Obj, A) when is_pid(P), is_list(Opts),Opts =/= [] ->
|
|
|
+ ets:insert(?TAB, Obj),
|
|
|
+ gproc_lib:ensure_monitor(P,g),
|
|
|
+ [Obj] ++ A;
|
|
|
+ (_Other, A) ->
|
|
|
+ A
|
|
|
+ end, Globals, Globals).
|
|
|
+
|
|
|
+
|
|
|
delete_globals(Globals, Event) ->
|
|
|
lists:foreach(
|
|
|
fun({{_,g,_},T} = K) when is_atom(T) ->
|
|
@@ -629,10 +672,11 @@ init(Opts) ->
|
|
|
|
|
|
surrendered_1(Globs) ->
|
|
|
My_local_globs =
|
|
|
- ets:select(?TAB, [{{{{'_',g,'_'},'_'},'$1', '_'},
|
|
|
+ ets:select(?TAB, [{{{{'_',g,'_'},'_'},'$1', '$2'},
|
|
|
[{'==', {node,'$1'}, node()}],
|
|
|
- ['$_']}]),
|
|
|
- %% remove all remote globals - we don't have monitors on them.
|
|
|
+ [{{ {element,1,{element,1,'$_'}}, '$1', '$2' }}]}]),
|
|
|
+ _ = [gproc_lib:ensure_monitor(Pid, g) || {_, Pid, _} <- My_local_globs],
|
|
|
+ %% remove all remote globals.
|
|
|
ets:select_delete(?TAB, [{{{{'_',g,'_'},'_'}, '$1', '_'},
|
|
|
[{'=/=', {node,'$1'}, node()}],
|
|
|
[true]}]),
|
|
@@ -642,7 +686,8 @@ surrendered_1(Globs) ->
|
|
|
lists:foldl(
|
|
|
fun({{Key,_}=K, Pid, V}, Acc) when node(Pid) =/= node() ->
|
|
|
ets:insert(?TAB, {K, Pid, V}),
|
|
|
- ets:insert_new(?TAB, {{Pid,Key}, r}),
|
|
|
+ _ = gproc_lib:ensure_monitor(Pid, g),
|
|
|
+ ets:insert_new(?TAB, {{Pid,Key}, []}),
|
|
|
Acc;
|
|
|
({{Pid,_}=K, Opts}, Acc) when node(Pid) =/= node() ->
|
|
|
ets:insert(?TAB, {K, Opts}),
|
|
@@ -658,7 +703,7 @@ surrendered_1(Globs) ->
|
|
|
ok;
|
|
|
[_|_] = Missing ->
|
|
|
%% This is very unlikely, I think
|
|
|
- leader_cast({add_globals, Missing})
|
|
|
+ leader_cast({add_globals, mk_broadcast_insert_vals(Missing)})
|
|
|
end,
|
|
|
case [{K,P} || {K,P,_} <- Ldr_local_globs,
|
|
|
is_pid(P) andalso
|