|
@@ -26,6 +26,8 @@
|
|
|
reg/1, reg/2, unreg/1,
|
|
|
reg_or_locate/3,
|
|
|
reg_shared/2, unreg_shared/1,
|
|
|
+ monitor/2,
|
|
|
+ demonitor/2,
|
|
|
set_attributes/2,
|
|
|
set_attributes_shared/2,
|
|
|
mreg/2,
|
|
@@ -126,6 +128,18 @@ reg_shared({_,g,_} = Key, Value) ->
|
|
|
reg_shared(_, _) ->
|
|
|
?THROW_GPROC_ERROR(badarg).
|
|
|
|
|
|
+monitor({_,g,_} = Key, Type) when Type==info;
|
|
|
+ Type==standby ->
|
|
|
+ leader_call({monitor, Key, self(), Type});
|
|
|
+monitor(_, _) ->
|
|
|
+ ?THROW_GPROC_ERROR(badarg).
|
|
|
+
|
|
|
+demonitor({_,g,_} = Key, Ref) ->
|
|
|
+ leader_call({demonitor, Key, self(), Ref});
|
|
|
+demonitor(_, _) ->
|
|
|
+ ?THROW_GPROC_ERROR(badarg).
|
|
|
+
|
|
|
+
|
|
|
set_attributes({_,g,_} = Key, Attrs) ->
|
|
|
leader_call({set_attributes, Key, Attrs, self()});
|
|
|
set_attributes(_, _) ->
|
|
@@ -336,6 +350,45 @@ handle_leader_call({reg, {_C,g,_Name} = K, Value, Pid}, _From, S, _E) ->
|
|
|
%% end,
|
|
|
{reply, true, [{insert, Vals}], S}
|
|
|
end;
|
|
|
+handle_leader_call({monitor, {T,g,_} = K, MPid, Type}, _From, S, _E) when T==n;
|
|
|
+ T==a ->
|
|
|
+ case ets:lookup(?TAB, {K, T}) of
|
|
|
+ [{_, Pid, _}] ->
|
|
|
+ Opts = get_opts(Pid, K),
|
|
|
+ Ref = make_ref(),
|
|
|
+ Opts1 = gproc_lib:add_monitor(Opts, MPid, Ref, Type),
|
|
|
+ _ = gproc_lib:ensure_monitor(MPid, g),
|
|
|
+ Obj = {{Pid,K}, Opts1},
|
|
|
+ Rev = {{MPid,K}, []},
|
|
|
+ ets:insert(?TAB, [Obj, Rev]),
|
|
|
+ {reply, Ref, [{insert, [Obj, Rev]}], S};
|
|
|
+ [] ->
|
|
|
+ Ref = make_ref(),
|
|
|
+ case Type of
|
|
|
+ standby ->
|
|
|
+ Obj = {{K,T}, MPid, undefined},
|
|
|
+ Rev = {{MPid,K}, []},
|
|
|
+ ets:insert(?TAB, [Obj, Rev]),
|
|
|
+ MPid ! {gproc, {failover,MPid}, Ref, K},
|
|
|
+ {reply, Ref, [{insert, [Obj, Rev]}], S};
|
|
|
+ _ ->
|
|
|
+ MPid ! {gproc, unreg, Ref, K},
|
|
|
+ {reply, Ref, S}
|
|
|
+ end
|
|
|
+ end;
|
|
|
+handle_leader_call({demonitor, {T,g,_} = K, MPid, Ref}, _From, S, _E) ->
|
|
|
+ case ets:lookup(?TAB, {K,T}) of
|
|
|
+ [{_, Pid, _}] ->
|
|
|
+ Opts = get_opts(Pid, K),
|
|
|
+ Opts1 = gproc_lib:remove_monitors(Opts, MPid, Ref),
|
|
|
+ Obj = {{Pid,K}, Opts1},
|
|
|
+ ets:insert(?TAB, Obj),
|
|
|
+ ets:delete(?TAB, {MPid, K}),
|
|
|
+ {reply, ok, [{delete, [{MPid,K}]},
|
|
|
+ {insert, [Obj]}], S};
|
|
|
+ _ ->
|
|
|
+ {reply, ok, S}
|
|
|
+ end;
|
|
|
handle_leader_call({set_attributes, {_,g,_} = K, Attrs, Pid}, _From, S, _E) ->
|
|
|
case gproc_lib:insert_attr(K, Attrs, Pid, g) of
|
|
|
false ->
|
|
@@ -369,39 +422,6 @@ handle_leader_call({reg_or_locate, {n,g,_} = K, Value, P},
|
|
|
[{_, OtherPid, OtherVal}] ->
|
|
|
{reply, {OtherPid, OtherVal}, S}
|
|
|
end;
|
|
|
-handle_leader_call({monitor, {T,g,_} = Key, Pid}, _From, S, _E)
|
|
|
- when T==n; T==a ->
|
|
|
- Ref = make_ref(),
|
|
|
- case gproc:where(Key) of
|
|
|
- undefined ->
|
|
|
- Pid ! {gproc, unreg, Ref, Key},
|
|
|
- {reply, Ref, [], S};
|
|
|
- RegPid ->
|
|
|
- NewRev =
|
|
|
- case ets:lookup_element(?TAB, K = {RegPid, Key}, 2) of
|
|
|
- r ->
|
|
|
- {K, [{monitor, [{Pid,Ref}]}]};
|
|
|
- Opts ->
|
|
|
- {K, gproc_lib:add_monitor(Opts, Pid, Ref)}
|
|
|
- end,
|
|
|
- ets:insert(?TAB, NewRev),
|
|
|
- {reply, Ref, [{insert, [NewRev]}], S}
|
|
|
- end;
|
|
|
-handle_leader_call({demonitor, {T,g,_} = Key, Ref, Pid}, _From, S, _E)
|
|
|
- when T==n; T==a ->
|
|
|
- case gproc:where(Key) of
|
|
|
- undefined ->
|
|
|
- {reply, ok, [], S};
|
|
|
- RegPid ->
|
|
|
- case ets:lookup_element(?TAB, K = {RegPid, Key}, 2) of
|
|
|
- r ->
|
|
|
- {reply, ok, [], S};
|
|
|
- Opts ->
|
|
|
- NewRev = {K, gproc_lib:remove_monitor(Opts, Pid, Ref)},
|
|
|
- ets:insert(?TAB, NewRev),
|
|
|
- {reply, Ref, [{insert, [NewRev]}], S}
|
|
|
- end
|
|
|
- end;
|
|
|
handle_leader_call({update_counter, {T,g,_Ctr} = Key, Incr, Pid}, _From, S, _E)
|
|
|
when is_integer(Incr), T==c;
|
|
|
is_integer(Incr), T==n ->
|
|
@@ -570,7 +590,7 @@ handle_leader_cast({add_globals, Missing}, S, _E) ->
|
|
|
Update = insert_globals(Missing),
|
|
|
{ok, [{insert, Update}], S};
|
|
|
handle_leader_cast({remove_globals, Globals}, S, _E) ->
|
|
|
- delete_globals(Globals, []),
|
|
|
+ delete_globals(Globals),
|
|
|
{ok, S};
|
|
|
handle_leader_cast({cancel_wait, Pid, {T,_,_} = Key, Ref}, S, _E) ->
|
|
|
case ets:lookup(?TAB, {Key, T}) of
|
|
@@ -619,7 +639,14 @@ mk_broadcast_insert_vals(Objs) ->
|
|
|
process_globals(Globals) ->
|
|
|
Modified =
|
|
|
lists:foldl(
|
|
|
- fun({{T,_,_} = Key, Pid}, A) ->
|
|
|
+ fun({{T,_,_} = Key, Pid}, A) when T==n; T==a ->
|
|
|
+ case ets:lookup(?TAB, {Pid,Key}) of
|
|
|
+ [{_, Opts}] when is_list(Opts) ->
|
|
|
+ maybe_failover(Key, Pid, Opts, A);
|
|
|
+ _ ->
|
|
|
+ A
|
|
|
+ end;
|
|
|
+ ({{T,_,_} = Key, Pid}, A) ->
|
|
|
A1 = case T of
|
|
|
c ->
|
|
|
Incr = ets:lookup_element(?TAB, {Key,Pid}, 3),
|
|
@@ -627,26 +654,64 @@ process_globals(Globals) ->
|
|
|
_ ->
|
|
|
A
|
|
|
end,
|
|
|
- K = ets_key(Key, Pid),
|
|
|
- ets:delete(?TAB, K),
|
|
|
- remove_rev_entry(Pid, Key, unreg),
|
|
|
+ remove_entry(Key, Pid, unreg),
|
|
|
A1
|
|
|
end, [], Globals),
|
|
|
[{insert, Modified} || Modified =/= []] ++
|
|
|
- [{delete, Globals, unreg} || Globals =/= []].
|
|
|
+ [{delete, Globals} || Globals =/= []].
|
|
|
|
|
|
-remove_rev_entry(Pid, {T,g,_} = K, Event) when T==n; T==a ->
|
|
|
+maybe_failover({T,_,_} = Key, Pid, Opts, Acc) ->
|
|
|
+ Opts = get_opts(Pid, Key),
|
|
|
+ case filter_standbys(gproc_lib:standbys(Opts)) of
|
|
|
+ [] ->
|
|
|
+ remove_entry(Key, Pid, unreg),
|
|
|
+ Acc;
|
|
|
+ [{ToPid,Ref,_}|_] ->
|
|
|
+ Value = case ets:lookup(?TAB, {Key,T}) of
|
|
|
+ [{_, _, V}] -> V;
|
|
|
+ _ -> undefined
|
|
|
+ end,
|
|
|
+ remove_rev_entry(Opts, Pid, Key, {failover, ToPid}),
|
|
|
+ Opts1 = gproc_lib:remove_monitor(Opts, ToPid, Ref),
|
|
|
+ _ = gproc_lib:ensure_monitor(ToPid, g),
|
|
|
+ NewReg = {{Key,T}, ToPid, Value},
|
|
|
+ NewRev = {{ToPid, Key}, Opts1},
|
|
|
+ ets:insert(?TAB, [NewReg, NewRev]),
|
|
|
+ [NewReg, NewRev | Acc]
|
|
|
+ end.
|
|
|
+
|
|
|
+filter_standbys(SBs) ->
|
|
|
+ filter_standbys(SBs, [node()|nodes()]).
|
|
|
+
|
|
|
+filter_standbys([{Pid,_,_} = H|T], Nodes) ->
|
|
|
+ case lists:member(node(Pid), Nodes) of
|
|
|
+ true ->
|
|
|
+ [H|T];
|
|
|
+ false ->
|
|
|
+ filter_standbys(T, Nodes)
|
|
|
+ end;
|
|
|
+filter_standbys([], _) ->
|
|
|
+ [].
|
|
|
+
|
|
|
+
|
|
|
+remove_entry(Key, Pid, Event) ->
|
|
|
+ K = ets_key(Key, Pid),
|
|
|
+ ets:delete(?TAB, K),
|
|
|
+ remove_rev_entry(get_opts(Pid, Key), Pid, Key, Event).
|
|
|
+
|
|
|
+remove_rev_entry(Opts, Pid, {T,g,_} = K, Event) when T==n; T==a ->
|
|
|
Key = {Pid, K},
|
|
|
- _ = case ets:lookup(?TAB, Key) of
|
|
|
- [] -> ok;
|
|
|
- [{_, r}] -> ok;
|
|
|
- [{_, Opts}] when is_list(Opts) ->
|
|
|
- gproc_lib:notify(Event, K, Opts)
|
|
|
- end,
|
|
|
+ gproc_lib:notify(Event, K, Opts),
|
|
|
ets:delete(?TAB, Key);
|
|
|
-remove_rev_entry(Pid, K, _Event) ->
|
|
|
+remove_rev_entry(_, Pid, K, _Event) ->
|
|
|
ets:delete(?TAB, {Pid, K}).
|
|
|
|
|
|
+get_opts(Pid, K) ->
|
|
|
+ case ets:lookup(?TAB, {Pid, K}) of
|
|
|
+ [] -> [];
|
|
|
+ [{_, r}] -> [];
|
|
|
+ [{_, Opts}] -> Opts
|
|
|
+ end.
|
|
|
|
|
|
code_change(_FromVsn, S, _Extra, _E) ->
|
|
|
{ok, S}.
|
|
@@ -659,21 +724,23 @@ from_leader({sync, Ref}, S, _E) ->
|
|
|
{ok, S};
|
|
|
from_leader(Ops, S, _E) ->
|
|
|
lists:foreach(
|
|
|
- fun({delete, Globals, Event}) ->
|
|
|
- delete_globals(Globals, Event);
|
|
|
+ fun({delete, Globals}) ->
|
|
|
+ delete_globals(Globals);
|
|
|
({insert, Globals}) ->
|
|
|
_ = insert_globals(Globals)
|
|
|
end, Ops),
|
|
|
{ok, S}.
|
|
|
|
|
|
insert_globals(Globals) ->
|
|
|
- ets:insert(?TAB, Globals),
|
|
|
lists:foldl(
|
|
|
- fun({{{_,_,_} = Key,Pid}, Pid, _}, A) ->
|
|
|
+ fun({{{_,_,_} = Key,Pid}, Pid, _} = Obj, A) ->
|
|
|
+ ets:insert(?TAB, Obj),
|
|
|
ets:insert_new(?TAB, {{Pid,Key}, []}),
|
|
|
gproc_lib:ensure_monitor(Pid,g),
|
|
|
A;
|
|
|
- ({{{_,_,_}, n}, Pid, _}, A) ->
|
|
|
+ ({{{_,_,_} = Key, n}, Pid, _} = Obj, A) ->
|
|
|
+ ets:insert(?TAB, Obj),
|
|
|
+ ets:insert_new(?TAB, {{Pid,Key}, []}),
|
|
|
gproc_lib:ensure_monitor(Pid,g),
|
|
|
A;
|
|
|
({{P,_K}, Opts} = Obj, A) when is_pid(P), is_list(Opts),Opts =/= [] ->
|
|
@@ -685,18 +752,14 @@ insert_globals(Globals) ->
|
|
|
end, Globals, Globals).
|
|
|
|
|
|
|
|
|
-delete_globals(Globals, Event) ->
|
|
|
+delete_globals(Globals) ->
|
|
|
lists:foreach(
|
|
|
fun({{_,g,_},T} = K) when is_atom(T) ->
|
|
|
ets:delete(?TAB, K);
|
|
|
({Key, Pid}) when is_pid(Pid); Pid==shared ->
|
|
|
- K = ets_key(Key,Pid),
|
|
|
- ets:delete(?TAB, K),
|
|
|
- remove_rev_entry(Pid, Key, Event);
|
|
|
+ ets:delete(?TAB, {Pid, Key});
|
|
|
({Pid, Key}) when is_pid(Pid); Pid==shared ->
|
|
|
- K = ets_key(Key, Pid),
|
|
|
- ets:delete(?TAB, K),
|
|
|
- remove_rev_entry(Pid, Key, Event)
|
|
|
+ ets:delete(?TAB, {Pid, Key})
|
|
|
end, Globals).
|
|
|
|
|
|
ets_key({T,_,_} = K, _) when T==n; T==a ->
|