|
@@ -26,6 +26,8 @@
|
|
|
reg/1, reg/2, unreg/1,
|
|
|
reg_or_locate/3,
|
|
|
reg_shared/2, unreg_shared/1,
|
|
|
+ monitor/2,
|
|
|
+ demonitor/2,
|
|
|
set_attributes/2,
|
|
|
set_attributes_shared/2,
|
|
|
mreg/2,
|
|
@@ -126,6 +128,19 @@ reg_shared({_,g,_} = Key, Value) ->
|
|
|
reg_shared(_, _) ->
|
|
|
?THROW_GPROC_ERROR(badarg).
|
|
|
|
|
|
+monitor({_,g,_} = Key, Type) when Type==info;
|
|
|
+ Type==follow;
|
|
|
+ Type==standby ->
|
|
|
+ leader_call({monitor, Key, self(), Type});
|
|
|
+monitor(_, _) ->
|
|
|
+ ?THROW_GPROC_ERROR(badarg).
|
|
|
+
|
|
|
+demonitor({_,g,_} = Key, Ref) ->
|
|
|
+ leader_call({demonitor, Key, self(), Ref});
|
|
|
+demonitor(_, _) ->
|
|
|
+ ?THROW_GPROC_ERROR(badarg).
|
|
|
+
|
|
|
+
|
|
|
set_attributes({_,g,_} = Key, Attrs) ->
|
|
|
leader_call({set_attributes, Key, Attrs, self()});
|
|
|
set_attributes(_, _) ->
|
|
@@ -336,6 +351,68 @@ handle_leader_call({reg, {_C,g,_Name} = K, Value, Pid}, _From, S, _E) ->
|
|
|
%% end,
|
|
|
{reply, true, [{insert, Vals}], S}
|
|
|
end;
|
|
|
+handle_leader_call({monitor, {T,g,_} = K, MPid, Type}, _From, S, _E) when T==n;
|
|
|
+ T==a ->
|
|
|
+ case ets:lookup(?TAB, {K, T}) of
|
|
|
+ [{_, Pid, _}] ->
|
|
|
+ Opts = get_opts(Pid, K),
|
|
|
+ Ref = make_ref(),
|
|
|
+ Opts1 = gproc_lib:add_monitor(Opts, MPid, Ref, Type),
|
|
|
+ _ = gproc_lib:ensure_monitor(MPid, g),
|
|
|
+ Obj = {{Pid,K}, Opts1},
|
|
|
+ Rev = {{MPid,K}, []},
|
|
|
+ ets:insert(?TAB, [Obj, Rev]),
|
|
|
+ {reply, Ref, [{insert, [Obj, Rev]}], S};
|
|
|
+ LookupRes ->
|
|
|
+ Ref = make_ref(),
|
|
|
+ case Type of
|
|
|
+ standby ->
|
|
|
+ Event = {failover, MPid},
|
|
|
+ Msgs = insert_reg(LookupRes, K, undefined, MPid, Event),
|
|
|
+ Obj = {{K,T}, MPid, undefined},
|
|
|
+ Rev = {{MPid,K}, []},
|
|
|
+ ets:insert(?TAB, [Obj, Rev]),
|
|
|
+ MPid ! {gproc, {failover,MPid}, Ref, K},
|
|
|
+ {reply, Ref, [{insert, [Obj, Rev]},
|
|
|
+ {notify, Msgs}], S};
|
|
|
+ follow ->
|
|
|
+ case LookupRes of
|
|
|
+ [{_, Waiters}] ->
|
|
|
+ add_follow_to_waiters(Waiters, K, MPid, Ref, S);
|
|
|
+ [] ->
|
|
|
+ add_follow_to_waiters([], K, MPid, Ref, S);
|
|
|
+ [{_, Pid, _}] ->
|
|
|
+ case ets:lookup(?TAB, {Pid,K}) of
|
|
|
+ [{_, Opts}] when is_list(Opts) ->
|
|
|
+ Opts1 = gproc_lib:add_monitor(
|
|
|
+ Opts, MPid, Ref, follow),
|
|
|
+ ets:insert(?TAB, {{Pid,K}, Opts1}),
|
|
|
+ {reply, Ref,
|
|
|
+ [{insert, [{{Pid,K}, Opts1}]}], S}
|
|
|
+ end
|
|
|
+ end;
|
|
|
+ _ ->
|
|
|
+ MPid ! {gproc, unreg, Ref, K},
|
|
|
+ {reply, Ref, S}
|
|
|
+ end
|
|
|
+ end;
|
|
|
+handle_leader_call({demonitor, {T,g,_} = K, MPid, Ref}, _From, S, _E) ->
|
|
|
+ case ets:lookup(?TAB, {K,T}) of
|
|
|
+ [{_, Pid, _}] ->
|
|
|
+ Opts = get_opts(Pid, K),
|
|
|
+ Opts1 = gproc_lib:remove_monitors(Opts, MPid, Ref),
|
|
|
+ Obj = {{Pid,K}, Opts1},
|
|
|
+ ets:insert(?TAB, Obj),
|
|
|
+ ets:delete(?TAB, {MPid, K}),
|
|
|
+ {reply, ok, [{delete, [{MPid,K}]},
|
|
|
+ {insert, [Obj]}], S};
|
|
|
+ [{Key, Waiters}] ->
|
|
|
+ NewWaiters = [W || W <- Waiters,
|
|
|
+ W =/= {MPid, Ref, follow}],
|
|
|
+ {reply, ok, [{insert, [{Key, NewWaiters}]}], S};
|
|
|
+ _ ->
|
|
|
+ {reply, ok, S}
|
|
|
+ end;
|
|
|
handle_leader_call({set_attributes, {_,g,_} = K, Attrs, Pid}, _From, S, _E) ->
|
|
|
case gproc_lib:insert_attr(K, Attrs, Pid, g) of
|
|
|
false ->
|
|
@@ -369,39 +446,6 @@ handle_leader_call({reg_or_locate, {n,g,_} = K, Value, P},
|
|
|
[{_, OtherPid, OtherVal}] ->
|
|
|
{reply, {OtherPid, OtherVal}, S}
|
|
|
end;
|
|
|
-handle_leader_call({monitor, {T,g,_} = Key, Pid}, _From, S, _E)
|
|
|
- when T==n; T==a ->
|
|
|
- Ref = make_ref(),
|
|
|
- case gproc:where(Key) of
|
|
|
- undefined ->
|
|
|
- Pid ! {gproc, unreg, Ref, Key},
|
|
|
- {reply, Ref, [], S};
|
|
|
- RegPid ->
|
|
|
- NewRev =
|
|
|
- case ets:lookup_element(?TAB, K = {RegPid, Key}, 2) of
|
|
|
- r ->
|
|
|
- {K, [{monitor, [{Pid,Ref}]}]};
|
|
|
- Opts ->
|
|
|
- {K, gproc_lib:add_monitor(Opts, Pid, Ref)}
|
|
|
- end,
|
|
|
- ets:insert(?TAB, NewRev),
|
|
|
- {reply, Ref, [{insert, [NewRev]}], S}
|
|
|
- end;
|
|
|
-handle_leader_call({demonitor, {T,g,_} = Key, Ref, Pid}, _From, S, _E)
|
|
|
- when T==n; T==a ->
|
|
|
- case gproc:where(Key) of
|
|
|
- undefined ->
|
|
|
- {reply, ok, [], S};
|
|
|
- RegPid ->
|
|
|
- case ets:lookup_element(?TAB, K = {RegPid, Key}, 2) of
|
|
|
- r ->
|
|
|
- {reply, ok, [], S};
|
|
|
- Opts ->
|
|
|
- NewRev = {K, gproc_lib:remove_monitor(Opts, Pid, Ref)},
|
|
|
- ets:insert(?TAB, NewRev),
|
|
|
- {reply, Ref, [{insert, [NewRev]}], S}
|
|
|
- end
|
|
|
- end;
|
|
|
handle_leader_call({update_counter, {T,g,_Ctr} = Key, Incr, Pid}, _From, S, _E)
|
|
|
when is_integer(Incr), T==c;
|
|
|
is_integer(Incr), T==n ->
|
|
@@ -451,13 +495,14 @@ handle_leader_call({unreg, {T,g,Name} = K, Pid}, _From, S, _E) ->
|
|
|
case ets:lookup(?TAB, {{a,g,Name},a}) of
|
|
|
[Aggr] ->
|
|
|
%% updated by remove_reg/3
|
|
|
- {reply, true, [{delete,[Key, {Pid,K}], unreg},
|
|
|
+ {reply, true, [{delete,[Key, {Pid,K}]},
|
|
|
{insert, [Aggr]}], S};
|
|
|
[] ->
|
|
|
- {reply, true, [{delete, [Key, {Pid,K}], unreg}], S}
|
|
|
+ {reply, true, [{delete, [Key, {Pid,K}]}], S}
|
|
|
end;
|
|
|
true ->
|
|
|
- {reply, true, [{delete, [Key, {Pid,K}], unreg}], S}
|
|
|
+ {reply, true, [{notify, [{K, Pid, unreg}]},
|
|
|
+ {delete, [Key, {Pid,K}]}], S}
|
|
|
end;
|
|
|
false ->
|
|
|
{reply, badarg, S}
|
|
@@ -467,19 +512,27 @@ handle_leader_call({give_away, {T,g,_} = K, To, Pid}, _From, S, _E)
|
|
|
Key = {K, T},
|
|
|
case ets:lookup(?TAB, Key) of
|
|
|
[{_, Pid, Value}] ->
|
|
|
+ Opts = get_opts(Pid, K),
|
|
|
case pid_to_give_away_to(To) of
|
|
|
Pid ->
|
|
|
{reply, Pid, S};
|
|
|
ToPid when is_pid(ToPid) ->
|
|
|
ets:insert(?TAB, [{Key, ToPid, Value},
|
|
|
- {{ToPid,K}, []}]),
|
|
|
+ {{ToPid,K}, Opts}]),
|
|
|
_ = gproc_lib:ensure_monitor(ToPid, g),
|
|
|
- {reply, ToPid, [{delete, [Key, {Pid,K}], {migrated,ToPid}},
|
|
|
- {insert, [{Key, ToPid, Value}]}], S};
|
|
|
+ Rev = {Pid, K},
|
|
|
+ ets:delete(?TAB, Rev),
|
|
|
+ gproc_lib:notify({migrated, ToPid}, K, Opts),
|
|
|
+ {reply, ToPid, [{insert, [{Key, ToPid, Value}]},
|
|
|
+ {notify, [{K, Pid, {migrated, ToPid}}]},
|
|
|
+ {delete, [Rev]}], S};
|
|
|
undefined ->
|
|
|
ets:delete(?TAB, Key),
|
|
|
- Rev = gproc_lib:remove_reverse_mapping(unreg, Pid, K),
|
|
|
- {reply, undefined, [{delete, [Key, Rev], unreg}], S}
|
|
|
+ Rev = {Pid, K},
|
|
|
+ ets:delete(?TAB, Rev),
|
|
|
+ gproc_lib:notify(unreg, K, Opts),
|
|
|
+ {reply, undefined, [{notify, [{K, Pid, unreg}]},
|
|
|
+ {delete, [Key, Rev]}], S}
|
|
|
end;
|
|
|
_ ->
|
|
|
{reply, badarg, S}
|
|
@@ -499,7 +552,7 @@ handle_leader_call({munreg, T, g, L, Pid}, _From, S, _E) ->
|
|
|
[] ->
|
|
|
{reply, true, S};
|
|
|
Objs ->
|
|
|
- {reply, true, [{delete, Objs, unreg}], S}
|
|
|
+ {reply, true, [{delete, Objs}], S}
|
|
|
catch
|
|
|
error:_ -> {reply, badarg, S}
|
|
|
end;
|
|
@@ -570,7 +623,7 @@ handle_leader_cast({add_globals, Missing}, S, _E) ->
|
|
|
Update = insert_globals(Missing),
|
|
|
{ok, [{insert, Update}], S};
|
|
|
handle_leader_cast({remove_globals, Globals}, S, _E) ->
|
|
|
- delete_globals(Globals, []),
|
|
|
+ delete_globals(Globals),
|
|
|
{ok, S};
|
|
|
handle_leader_cast({cancel_wait, Pid, {T,_,_} = Key, Ref}, S, _E) ->
|
|
|
case ets:lookup(?TAB, {Key, T}) of
|
|
@@ -617,36 +670,84 @@ mk_broadcast_insert_vals(Objs) ->
|
|
|
|
|
|
|
|
|
process_globals(Globals) ->
|
|
|
- Modified =
|
|
|
+ {Modified, Notifications} =
|
|
|
lists:foldl(
|
|
|
- fun({{T,_,_} = Key, Pid}, A) ->
|
|
|
- A1 = case T of
|
|
|
- c ->
|
|
|
- Incr = ets:lookup_element(?TAB, {Key,Pid}, 3),
|
|
|
- update_aggr_counter(Key, -Incr) ++ A;
|
|
|
- _ ->
|
|
|
- A
|
|
|
- end,
|
|
|
- K = ets_key(Key, Pid),
|
|
|
- ets:delete(?TAB, K),
|
|
|
- remove_rev_entry(Pid, Key, unreg),
|
|
|
- A1
|
|
|
- end, [], Globals),
|
|
|
+ fun({{T,_,_} = Key, Pid}, A) when T==n; T==a ->
|
|
|
+ case ets:lookup(?TAB, {Pid,Key}) of
|
|
|
+ [{_, Opts}] when is_list(Opts) ->
|
|
|
+ maybe_failover(Key, Pid, Opts, A);
|
|
|
+ _ ->
|
|
|
+ A
|
|
|
+ end;
|
|
|
+ ({{T,_,_} = Key, Pid}, {MA,NA}) ->
|
|
|
+ MA1 = case T of
|
|
|
+ c ->
|
|
|
+ Incr = ets:lookup_element(?TAB, {Key,Pid}, 3),
|
|
|
+ update_aggr_counter(Key, -Incr) ++ MA;
|
|
|
+ _ ->
|
|
|
+ MA
|
|
|
+ end,
|
|
|
+ N = remove_entry(Key, Pid, unreg),
|
|
|
+ {MA1, N ++ NA}
|
|
|
+ end, {[],[]}, Globals),
|
|
|
[{insert, Modified} || Modified =/= []] ++
|
|
|
- [{delete, Globals, unreg} || Globals =/= []].
|
|
|
+ [{notify, Notifications} || Notifications =/= []] ++
|
|
|
+ [{delete, Globals} || Globals =/= []].
|
|
|
+
|
|
|
+maybe_failover({T,_,_} = Key, Pid, Opts, {MAcc, NAcc}) ->
|
|
|
+ Opts = get_opts(Pid, Key),
|
|
|
+ case filter_standbys(gproc_lib:standbys(Opts)) of
|
|
|
+ [] ->
|
|
|
+ Notify = remove_entry(Key, Pid, unreg),
|
|
|
+ {MAcc, Notify ++ NAcc};
|
|
|
+ [{ToPid,Ref,_}|_] ->
|
|
|
+ Value = case ets:lookup(?TAB, {Key,T}) of
|
|
|
+ [{_, _, V}] -> V;
|
|
|
+ _ -> undefined
|
|
|
+ end,
|
|
|
+ Notify = remove_rev_entry(Opts, Pid, Key, {failover, ToPid}),
|
|
|
+ Opts1 = gproc_lib:remove_monitor(Opts, ToPid, Ref),
|
|
|
+ _ = gproc_lib:ensure_monitor(ToPid, g),
|
|
|
+ NewReg = {{Key,T}, ToPid, Value},
|
|
|
+ NewRev = {{ToPid, Key}, Opts1},
|
|
|
+ ets:insert(?TAB, [NewReg, NewRev]),
|
|
|
+ {[NewReg, NewRev | MAcc], Notify ++ NAcc}
|
|
|
+ end.
|
|
|
+
|
|
|
+filter_standbys(SBs) ->
|
|
|
+ filter_standbys(SBs, [node()|nodes()]).
|
|
|
+
|
|
|
+filter_standbys([{Pid,_,_} = H|T], Nodes) ->
|
|
|
+ case lists:member(node(Pid), Nodes) of
|
|
|
+ true ->
|
|
|
+ [H|T];
|
|
|
+ false ->
|
|
|
+ filter_standbys(T, Nodes)
|
|
|
+ end;
|
|
|
+filter_standbys([], _) ->
|
|
|
+ [].
|
|
|
|
|
|
-remove_rev_entry(Pid, {T,g,_} = K, Event) when T==n; T==a ->
|
|
|
- Key = {Pid, K},
|
|
|
- _ = case ets:lookup(?TAB, Key) of
|
|
|
- [] -> ok;
|
|
|
- [{_, r}] -> ok;
|
|
|
- [{_, Opts}] when is_list(Opts) ->
|
|
|
- gproc_lib:notify(Event, K, Opts)
|
|
|
- end,
|
|
|
- ets:delete(?TAB, Key);
|
|
|
-remove_rev_entry(Pid, K, _Event) ->
|
|
|
- ets:delete(?TAB, {Pid, K}).
|
|
|
|
|
|
+remove_entry(Key, Pid, Event) ->
|
|
|
+ K = ets_key(Key, Pid),
|
|
|
+ ets:delete(?TAB, K),
|
|
|
+ remove_rev_entry(get_opts(Pid, Key), Pid, Key, Event).
|
|
|
+
|
|
|
+remove_rev_entry(Opts, Pid, {T,g,_} = K, Event) when T==n; T==a ->
|
|
|
+ Key = {Pid, K},
|
|
|
+ gproc_lib:notify(Event, K, Opts),
|
|
|
+ ets:delete(?TAB, Key),
|
|
|
+ [{K, Pid, Event}];
|
|
|
+remove_rev_entry(_, Pid, K, _Event) ->
|
|
|
+ ets:delete(?TAB, {Pid, K}),
|
|
|
+ [].
|
|
|
+
|
|
|
+get_opts(Pid, K) ->
|
|
|
+ case ets:lookup(?TAB, {Pid, K}) of
|
|
|
+ [] -> [];
|
|
|
+ [{_, r}] -> [];
|
|
|
+ [{_, Opts}] -> Opts
|
|
|
+ end.
|
|
|
|
|
|
code_change(_FromVsn, S, _Extra, _E) ->
|
|
|
{ok, S}.
|
|
@@ -659,23 +760,22 @@ from_leader({sync, Ref}, S, _E) ->
|
|
|
{ok, S};
|
|
|
from_leader(Ops, S, _E) ->
|
|
|
lists:foreach(
|
|
|
- fun({delete, Globals, Event}) ->
|
|
|
- delete_globals(Globals, Event);
|
|
|
+ fun({delete, Globals}) ->
|
|
|
+ delete_globals(Globals);
|
|
|
({insert, Globals}) ->
|
|
|
- _ = insert_globals(Globals)
|
|
|
+ _ = insert_globals(Globals);
|
|
|
+ ({notify, Events}) ->
|
|
|
+ do_notify(Events)
|
|
|
end, Ops),
|
|
|
{ok, S}.
|
|
|
|
|
|
insert_globals(Globals) ->
|
|
|
- ets:insert(?TAB, Globals),
|
|
|
lists:foldl(
|
|
|
- fun({{{_,_,_} = Key,Pid}, Pid, _}, A) ->
|
|
|
+ fun({{{_,_,_} = Key,_}, Pid, _} = Obj, A) ->
|
|
|
+ ets:insert(?TAB, Obj),
|
|
|
ets:insert_new(?TAB, {{Pid,Key}, []}),
|
|
|
gproc_lib:ensure_monitor(Pid,g),
|
|
|
A;
|
|
|
- ({{{_,_,_}, n}, Pid, _}, A) ->
|
|
|
- gproc_lib:ensure_monitor(Pid,g),
|
|
|
- A;
|
|
|
({{P,_K}, Opts} = Obj, A) when is_pid(P), is_list(Opts),Opts =/= [] ->
|
|
|
ets:insert(?TAB, Obj),
|
|
|
gproc_lib:ensure_monitor(P,g),
|
|
@@ -685,20 +785,30 @@ insert_globals(Globals) ->
|
|
|
end, Globals, Globals).
|
|
|
|
|
|
|
|
|
-delete_globals(Globals, Event) ->
|
|
|
+delete_globals(Globals) ->
|
|
|
lists:foreach(
|
|
|
fun({{_,g,_},T} = K) when is_atom(T) ->
|
|
|
ets:delete(?TAB, K);
|
|
|
({Key, Pid}) when is_pid(Pid); Pid==shared ->
|
|
|
- K = ets_key(Key,Pid),
|
|
|
- ets:delete(?TAB, K),
|
|
|
- remove_rev_entry(Pid, Key, Event);
|
|
|
+ ets:delete(?TAB, {Pid, Key});
|
|
|
({Pid, Key}) when is_pid(Pid); Pid==shared ->
|
|
|
- K = ets_key(Key, Pid),
|
|
|
- ets:delete(?TAB, K),
|
|
|
- remove_rev_entry(Pid, Key, Event)
|
|
|
+ ets:delete(?TAB, {Pid, Key})
|
|
|
end, Globals).
|
|
|
|
|
|
+do_notify([{P, Msg}|T]) when is_pid(P) ->
|
|
|
+ P ! Msg,
|
|
|
+ do_notify(T);
|
|
|
+do_notify([{K, P, E}|T]) ->
|
|
|
+ case ets:lookup(?TAB, {P,K}) of
|
|
|
+ [{_, Opts}] when is_list(Opts) ->
|
|
|
+ gproc_lib:notify(E, K, Opts);
|
|
|
+ _ ->
|
|
|
+ do_notify(T)
|
|
|
+ end;
|
|
|
+do_notify([]) ->
|
|
|
+ ok.
|
|
|
+
|
|
|
+
|
|
|
ets_key({T,_,_} = K, _) when T==n; T==a ->
|
|
|
{K, T};
|
|
|
ets_key(K, Pid) ->
|
|
@@ -846,3 +956,37 @@ pid_to_give_away_to({T,g,_} = Key) when T==n; T==a ->
|
|
|
_ ->
|
|
|
undefined
|
|
|
end.
|
|
|
+
|
|
|
+insert_reg([{_, Waiters}], K, Val, Pid, Event) ->
|
|
|
+ gproc_lib:insert_reg(K, Val, Pid, g, []),
|
|
|
+ tell_waiters(Waiters, K, Pid, Val, Event).
|
|
|
+
|
|
|
+tell_waiters([{P,R}|T], K, Pid, V, Event) ->
|
|
|
+ Msg = {gproc, R, registered, {K, Pid, V}},
|
|
|
+ if node(P) == node() ->
|
|
|
+ P ! Msg;
|
|
|
+ true ->
|
|
|
+ [{P, Msg} | tell_waiters(T, K, Pid, V, Event)]
|
|
|
+ end;
|
|
|
+tell_waiters([{P,R,follow}|T], K, Pid, V, Event) ->
|
|
|
+ Msg = {gproc, Event, R, K},
|
|
|
+ if node(P) == node() ->
|
|
|
+ P ! Msg;
|
|
|
+ true ->
|
|
|
+ [{P, Msg} | tell_waiters(T, K, Pid, V, Event)]
|
|
|
+ end;
|
|
|
+tell_waiters([], _, _, _, _) ->
|
|
|
+ [].
|
|
|
+
|
|
|
+add_follow_to_waiters(Waiters, {T,_,_} = K, Pid, Ref, S) ->
|
|
|
+ Obj = {{K,T}, [{Pid, Ref, follow}|Waiters]},
|
|
|
+ Rev = {{Pid,K}, []},
|
|
|
+ ets:insert(?TAB, [Obj, Rev]),
|
|
|
+ Msg = {gproc, unreg, Ref, K},
|
|
|
+ if node(Pid) == node() ->
|
|
|
+ Pid ! Msg,
|
|
|
+ {reply, Ref, [{insert, [Obj, Rev]}], S};
|
|
|
+ true ->
|
|
|
+ {reply, Ref, [{insert, [Obj, Rev]},
|
|
|
+ {notify, [{Pid, Msg}]}]}
|
|
|
+ end.
|