Browse Source

Add meta to processes in Groups.

[ostinelli/syn#7]
Roberto Ostinelli 8 years ago
parent
commit
50b4c270ed
5 changed files with 99 additions and 18 deletions
  1. 22 4
      README.md
  2. 2 1
      include/syn.hrl
  3. 10 2
      src/syn.erl
  4. 34 9
      src/syn_groups.erl
  5. 31 2
      test/syn_groups_SUITE.erl

+ 22 - 4
README.md

@@ -233,15 +233,22 @@ Types:
 To add a process to a group:
 
 ```erlang
-syn:join(Name, Pid) -> ok.
+syn:join(Name, Pid) ->
+	syn:join(Name, Pid, undefined).
+```
+
+```erlang
+syn:join(Name, Pid, Meta) -> ok.
 
 Types:
 	Name = any()
 	Pid = pid()
+	meta = any()
 ```
 
-> A process can join multiple groups. Also, a process may join the same group multiple times, though it will still be listed only once in it.
-> When a process joins a group, Syn will automatically monitor it.
+> A process can join multiple groups. When a process joins a group, Syn will automatically monitor it.
+> 
+> A process may join the same group multiple times, for example if you need to update its metadata, though it will still be listed only once in it.
 
 To remove a process from a group:
 
@@ -265,6 +272,17 @@ Types:
 	Name = any()
 ```
 
+To get a list of the members of a group with their metadata:
+
+```erlang
+syn:get_members(Name, with_meta) ->
+	[{pid(), Meta}].
+
+Types:
+	Name = any()
+	Meta = any()
+```
+
 > The order of member pids in the returned array is guaranteed to be the same on every node, however it is not guaranteed to match the order of joins.
 
 To know if a process is a member of a group:
@@ -451,4 +469,4 @@ Ensure that  proper testing is included. To run Syn tests you simply have to be
 
 ```bash
 $ make tests
-```
+```

+ 2 - 1
include/syn.hrl

@@ -33,5 +33,6 @@
 -record(syn_groups_table, {
     name = undefined :: any(),
     pid = undefined :: undefined | pid() | atom(),
-    node = undefined :: atom()
+    node = undefined :: atom(),
+    meta = undefined :: any()
 }).

+ 10 - 2
src/syn.erl

@@ -43,10 +43,10 @@
 -export([send/2]).
 
 %% groups
--export([join/2]).
+-export([join/2, join/3]).
 -export([leave/2]).
 -export([member/2]).
--export([get_members/1]).
+-export([get_members/1, get_members/2]).
 -export([publish/2]).
 -export([multi_call/2, multi_call/3]).
 -export([multi_call_reply/2]).
@@ -134,6 +134,10 @@ send(Name, Message) ->
 join(Name, Pid) ->
     syn_groups:join(Name, Pid).
 
+-spec join(Name :: any(), Pid :: pid(), Meta :: any()) -> ok.
+join(Name, Pid, Meta) ->
+    syn_groups:join(Name, Pid, Meta).
+
 -spec leave(Name :: any(), Pid :: pid()) -> ok | {error, pid_not_in_group}.
 leave(Name, Pid) ->
     syn_groups:leave(Name, Pid).
@@ -146,6 +150,10 @@ member(Pid, Name) ->
 get_members(Name) ->
     syn_groups:get_members(Name).
 
+-spec get_members(Name :: any(), with_meta) -> [{pid(), Meta :: any()}].
+get_members(Name, with_meta) ->
+    syn_groups:get_members(Name, with_meta).
+
 -spec publish(Name :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
 publish(Name, Message) ->
     syn_groups:publish(Name, Message).

+ 34 - 9
src/syn_groups.erl

@@ -28,10 +28,10 @@
 
 %% API
 -export([start_link/0]).
--export([join/2]).
+-export([join/2, join/3]).
 -export([leave/2]).
 -export([member/2]).
--export([get_members/1]).
+-export([get_members/1, get_members/2]).
 -export([publish/2]).
 -export([multi_call/2, multi_call/3]).
 -export([multi_call_reply/2]).
@@ -61,9 +61,13 @@ start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], Options).
 
 -spec join(Name :: any(), Pid :: pid()) -> ok.
-join(Name, Pid) when is_pid(Pid) ->
+join(Name, Pid) ->
+    join(Name, Pid, undefined).
+
+-spec join(Name :: any(), Pid :: pid(), Meta :: any()) -> ok.
+join(Name, Pid, Meta) when is_pid(Pid) ->
     Node = node(Pid),
-    gen_server:call({?MODULE, Node}, {join, Name, Pid}).
+    gen_server:call({?MODULE, Node}, {join, Name, Pid, Meta}).
 
 -spec leave(Name :: any(), Pid :: pid()) -> ok | {error, pid_not_in_group}.
 leave(Name, Pid) when is_pid(Pid) ->
@@ -78,6 +82,10 @@ member(Pid, Name) when is_pid(Pid) ->
 get_members(Name) ->
     i_get_members(Name).
 
+-spec get_members(Name :: any(), with_meta) -> [{pid(), Meta :: any()}].
+get_members(Name, with_meta) ->
+    i_get_members(Name, with_meta).
+
 -spec publish(Name :: any(), Message :: any()) -> {ok, RecipientCount :: non_neg_integer()}.
 publish(Name, Message) ->
     MemberPids = i_get_members(Name),
@@ -121,7 +129,7 @@ multi_call_reply(CallerPid, Reply) ->
 init([]) ->
     %% trap linked processes signal
     process_flag(trap_exit, true),
-
+    
     %% build state
     {ok, #state{}}.
 
@@ -136,12 +144,21 @@ init([]) ->
     {stop, Reason :: any(), Reply :: any(), #state{}} |
     {stop, Reason :: any(), #state{}}.
 
-handle_call({join, Name, Pid}, _From, State) ->
+handle_call({join, Name, Pid, Meta}, _From, State) ->
+    %% check if pid is already in group
+    case find_by_pid_and_name(Pid, Name) of
+        undefined ->
+            ok;
+        Process ->
+            %% remove old reference
+            mnesia:dirty_delete_object(Process)
+    end,
     %% add to group
     mnesia:dirty_write(#syn_groups_table{
         name = Name,
         pid = Pid,
-        node = node()
+        node = node(),
+        meta = Meta
     }),
     %% link
     erlang:link(Pid),
@@ -196,7 +213,7 @@ handle_info({'EXIT', Pid, Reason}, State) ->
                 _ ->
                     error_logger:error_msg("Received an exit message from an unlinked process ~p with reason: ~p", [Pid, Reason])
             end;
-
+        
         Processes ->
             F = fun(Process) ->
                 %% get group
@@ -272,6 +289,14 @@ i_get_members(Name) ->
     end, Processes),
     lists:sort(Pids).
 
+-spec i_get_members(Name :: any(), with_meta) -> [{pid(), Meta :: any()}].
+i_get_members(Name, with_meta) ->
+    Processes = mnesia:dirty_read(syn_groups_table, Name),
+    PidsWithMeta = lists:map(fun(Process) ->
+        {Process#syn_groups_table.pid, Process#syn_groups_table.meta}
+    end, Processes),
+    lists:keysort(1, PidsWithMeta).
+
 -spec find_groups_by_pid(Pid :: pid()) -> [Process :: #syn_groups_table{}].
 find_groups_by_pid(Pid) ->
     mnesia:dirty_index_read(syn_groups_table, Pid, #syn_groups_table.pid).
@@ -289,7 +314,7 @@ remove_process(Process) ->
 multi_call_and_receive(CollectorPid, Pid, Message, Timeout) ->
     MonitorRef = monitor(process, Pid),
     Pid ! {syn_multi_call, self(), Message},
-
+    
     receive
         {syn_multi_call_reply, Pid, Reply} ->
             CollectorPid ! {reply, Pid, Reply};

+ 31 - 2
test/syn_groups_SUITE.erl

@@ -38,7 +38,8 @@
     single_node_kill/1,
     single_node_publish/1,
     single_node_multi_call/1,
-    single_node_multi_call_when_recipient_crashes/1
+    single_node_multi_call_when_recipient_crashes/1,
+    single_node_meta/1
 ]).
 -export([
     two_nodes_kill/1,
@@ -91,7 +92,8 @@ groups() ->
             single_node_kill,
             single_node_publish,
             single_node_multi_call,
-            single_node_multi_call_when_recipient_crashes
+            single_node_multi_call_when_recipient_crashes,
+            single_node_meta
         ]},
         {two_nodes_process_groups, [shuffle], [
             two_nodes_kill,
@@ -336,6 +338,33 @@ single_node_multi_call_when_recipient_crashes(_Config) ->
     syn_test_suite_helper:kill_process(Pid1),
     syn_test_suite_helper:kill_process(Pid2).
 
+single_node_meta(Config) ->
+    %% set schema location
+    application:set_env(mnesia, schema_location, ram),
+    %% start
+    ok = syn:start(),
+    ok = syn:init(),
+    %% start process
+    Pid = syn_test_suite_helper:start_process(),
+    %% retrieve
+    [] = syn:get_members(<<"my group">>, with_meta),
+    false = syn:member(Pid, <<"my group">>),
+    %% join
+    ok = syn:join(<<"my group">>, Pid, {some, meta}),
+    %% retrieve
+    [{Pid, {some, meta}}] = syn:get_members(<<"my group">>, with_meta),
+    %% allow to rejoin to update meta
+    ok = syn:join(<<"my group">>, Pid, {updated, meta}),
+    %% retrieve
+    [{Pid, {updated, meta}}] = syn:get_members(<<"my group">>, with_meta),
+    %% leave
+    ok = syn:leave(<<"my group">>, Pid),
+    %% retrieve
+    [] = syn:get_members(<<"my group">>),
+    false = syn:member(Pid, <<"my group">>),
+    %% kill process
+    syn_test_suite_helper:kill_process(Pid).
+
 two_nodes_kill(Config) ->
     %% get slave
     SlaveNode = proplists:get_value(slave_node, Config),