gproc_ps.erl 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. %% ``The contents of this file are subject to the Erlang Public License,
  2. %% Version 1.1, (the "License"); you may not use this file except in
  3. %% compliance with the License. You should have received a copy of the
  4. %% Erlang Public License along with this software. If not, it can be
  5. %% retrieved via the world wide web at http://www.erlang.org/.
  6. %%
  7. %% Software distributed under the License is distributed on an "AS IS"
  8. %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  9. %% the License for the specific language governing rights and limitations
  10. %% under the License.
  11. %%
  12. %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
  13. %% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
  14. %% AB. All Rights Reserved.''
  15. %%
  16. %% @author Ulf Wiger <ulf@wiger.net>
  17. %%
  18. %% @doc Gproc Publish/Subscribe patterns
  19. %% This module implements a few convenient functions for publish/subscribe.
  20. %%
  21. %% Publish/subscribe with Gproc relies entirely on gproc properties and counters.
  22. %% This makes for a very concise implementation, as the monitoring of subscribers and
  23. %% removal of subscriptions comes for free with Gproc.
  24. %%
  25. %% Using this module instead of rolling your own (which is easy enough) brings the
  26. %% benefit of consistency, in tracing and debugging.
  27. %% The implementation can also serve to illustrate how to use gproc properties and
  28. %% counters to good effect.
  29. %%
  30. %% @type scope() = l | g.
  31. %% @type event() = any().
  32. %% @type msg() = any().
  33. %% @type status() = 1 | 0.
  34. %% @end
  35. -module(gproc_ps).
  36. -export([subscribe/2,
  37. subscribe_cond/3,
  38. change_cond/3,
  39. unsubscribe/2,
  40. publish/3,
  41. publish_cond/3,
  42. list_subs/2
  43. ]).
  44. -export([create_single/2,
  45. delete_single/2,
  46. disable_single/2,
  47. enable_single/2,
  48. tell_singles/3,
  49. notify_single_if_true/4,
  50. list_singles/2]).
  51. -define(ETag, gproc_ps_event).
  52. %% These types are duplicated above in EDoc syntax, since EDoc annoyingly doesn't pick up
  53. %% the type definitions, even if they are referred to in the -spec:s that EDoc does parse.
  54. -type scope() :: l | g.
  55. -type event() :: any().
  56. -type msg() :: any().
  57. -type status() :: 1 | 0.
  58. -spec subscribe(scope(), event()) -> true.
  59. %% @doc Subscribe to events of type `Event'
  60. %%
  61. %% Any messages published with `gproc_ps:publish(Scope, Event, Msg)' will be
  62. %% delivered to the current process, along with all other subscribers.
  63. %%
  64. %% This function creates a property, `{p,Scope,{gproc_ps_event,Event}}', which
  65. %% can be searched and displayed for debugging purposes.
  66. %%
  67. %% Note that, as with {@link gproc:reg/1}, this function will raise an
  68. %% exception if you try to subscribe to the same event twice from the same
  69. %% process.
  70. %% @end
  71. subscribe(Scope, Event) when Scope==l; Scope==g ->
  72. gproc:reg({p,Scope,{?ETag, Event}}).
  73. -spec subscribe_cond(scope(), event(), undefined | ets:match_spec()) -> true.
  74. %% @doc Subscribe conditionally to events of type `Event'
  75. %%
  76. %% This function is similar to {@link subscribe/2}, but adds a condition
  77. %% in the form of a match specification.
  78. %%
  79. %% The condition is tested by the {@link publish_cond/3} function
  80. %% and a message is delivered only if the condition is true. Specifically,
  81. %% the test is:
  82. %%
  83. %% `ets:match_spec_run([Msg], ets:match_spec_compile(Cond)) == [true]'
  84. %%
  85. %% In other words, if the match_spec returns true for a message, that message
  86. %% is sent to the subscriber. For any other result from the match_spec, the
  87. %% message is not sent. `Cond == undefined' means that all messages will be
  88. %% delivered (that is, `publish_cond/3' will treat 'normal' subscribers just
  89. %% like {@link publish/3} does, except that `publish/3' strictly speaking
  90. %% ignores the Value part of the property completely, whereas `publish_cond/3'
  91. %% expects it to be either undefined or a valid match spec).
  92. %%
  93. %% This means that `Cond=undefined' and ``Cond=[{'_',[],[true]}]'' are
  94. %% equivalent.
  95. %%
  96. %% Note that, as with {@link gproc:reg/1}, this function will raise an
  97. %% exception if you try to subscribe to the same event twice from the same
  98. %% process.
  99. %% @end
  100. subscribe_cond(Scope, Event, Spec) when Scope==l; Scope==g ->
  101. case Spec of
  102. undefined -> ok;
  103. [_|_] -> _ = ets:match_spec_compile(Spec); % validation
  104. _ -> error(badarg)
  105. end,
  106. gproc:reg({p,Scope,{?ETag, Event}}, Spec).
  107. -spec change_cond(scope(), event(), undefined | ets:match_spec()) -> true.
  108. %% @doc Change the condition specification of an existing subscription.
  109. %%
  110. %% This function atomically changes the condition spec of an existing
  111. %% subscription (see {@link subscribe_cond/3}). An exception is raised if
  112. %% the subscription doesn't already exist.
  113. %%
  114. %% Note that this function can also be used to change a conditional subscription
  115. %% to an unconditional one (by setting `Spec = undefined'), or a 'normal'
  116. %% subscription to a conditional one.
  117. %% @end
  118. change_cond(Scope, Event, Spec) when Scope==l; Scope==g ->
  119. case Spec of
  120. undefined -> ok;
  121. [_|_] -> _ = ets:match_spec_compile(Spec); % validation
  122. _ -> error(badarg)
  123. end,
  124. gproc:set_value({p,Scope,{?ETag, Event}}, Spec).
  125. -spec unsubscribe(scope(), event()) -> true.
  126. %% @doc Remove subscribtion created using `subscribe(Scope, Event)'
  127. %%
  128. %% This removes the property created through `subscribe/2'.
  129. %% @end
  130. unsubscribe(Scope, Event) when Scope==l; Scope==g ->
  131. gproc:unreg({p,Scope,{?ETag, Event}}).
  132. -spec publish(scope(), event(), msg()) -> ok.
  133. %% @doc Publish the message `Msg' to all subscribers of `Event'
  134. %%
  135. %% The message delivered to each subscriber will be of the form:
  136. %%
  137. %% `{gproc_ps_event, Event, Msg}'
  138. %%
  139. %% The function uses `gproc:send/2' to send a message to all processes which have a
  140. %% property `{p,Scope,{gproc_ps_event,Event}}'.
  141. %% @end
  142. publish(Scope, Event, Msg) when Scope==l; Scope==g ->
  143. gproc:send({p, Scope, {?ETag, Event}}, {?ETag, Event, Msg}).
  144. -spec publish_cond(scope(), event(), msg()) -> msg().
  145. %% @doc Publishes the message `Msg' to conditional subscribers of `Event'
  146. %%
  147. %% The message will be delivered to each subscriber provided their respective
  148. %% condition tests succeed.
  149. %%
  150. %% @see subscribe_cond/3.
  151. %%
  152. publish_cond(Scope, Event, Msg) when Scope==l; Scope==g ->
  153. Message = {?ETag, Event, Msg},
  154. lists:foreach(
  155. fun({Pid, undefined}) -> Pid ! Message;
  156. ({Pid, Spec}) ->
  157. try C = ets:match_spec_compile(Spec),
  158. case ets:match_spec_run([Msg], C) of
  159. [true] -> Pid ! Message;
  160. _ -> ok
  161. end
  162. catch
  163. error:_ ->
  164. ok
  165. end
  166. end, gproc:select({Scope,p}, [{ {{p,Scope,{?ETag,Event}}, '$1', '$2'},
  167. [], [{{'$1','$2'}}] }])).
  168. -spec list_subs(scope(), event()) -> [pid()].
  169. %% @doc List the pids of all processes subscribing to `Event'
  170. %%
  171. %% This function uses `gproc:select/2' to find all properties indicating a subscription.
  172. %% @end
  173. list_subs(Scope, Event) when Scope==l; Scope==g ->
  174. gproc:select({Scope,p}, [{ {{p,Scope,{?ETag,Event}}, '$1', '_'}, [], ['$1'] }]).
  175. -spec create_single(scope(), event()) -> true.
  176. %% @doc Creates a single-shot subscription entry for Event
  177. %%
  178. %% Single-shot subscriptions behave similarly to the `{active,once}' property of sockets.
  179. %% Once a message has been published, the subscription is disabled, and no more messages
  180. %% will be delivered to the subscriber unless the subscription is re-enabled using
  181. %% `enable_single/2'.
  182. %%
  183. %% The function creates a gproc counter entry, `{c,Scope,{gproc_ps_event,Event}}', which
  184. %% will have either of the values `0' (disabled) or `1' (enabled). Initially, the value
  185. %% is `1', meaning the subscription is enabled.
  186. %%
  187. %% Counters are used in this case, since they can be atomically updated by both the
  188. %% subscriber (owner) and publisher. The publisher sets the counter value to `0' as soon
  189. %% as it has delivered a message.
  190. %% @end
  191. create_single(Scope, Event) when Scope==l; Scope==g ->
  192. gproc:reg({c,Scope,{?ETag, Event}}, 1).
  193. -spec delete_single(scope(), event()) -> true.
  194. %% @doc Deletes the single-shot subscription for Event
  195. %%
  196. %% This function deletes the counter entry representing the single-shot description.
  197. %% An exception will be raised if there is no such subscription.
  198. %% @end
  199. delete_single(Scope, Event) when Scope==l; Scope==g ->
  200. gproc:unreg({c,Scope,{?ETag, Event}}).
  201. -spec disable_single(scope(), event()) -> integer().
  202. %% @doc Disables the single-shot subscription for Event
  203. %%
  204. %% This function changes the value of the corresponding gproc counter to `0' (disabled).
  205. %%
  206. %% The subscription remains (e.g. for debugging purposes), but with a 'disabled' status.
  207. %% This function is insensitive to concurrency, using 'wrapping' ets counter update ops.
  208. %% This guarantees that the counter will have either the value 1 or 0, depending on which
  209. %% update happened last.
  210. %%
  211. %% The return value indicates the previous status.
  212. %% @end
  213. disable_single(Scope, Event) when Scope==l; Scope==g ->
  214. gproc:update_counter({c,Scope,{?ETag,Event}}, {-1, 0, 0}).
  215. -spec enable_single(scope(), event()) -> integer().
  216. %% @doc Enables the single-shot subscription for Event
  217. %%
  218. %% This function changes the value of the corresponding gproc counter to `1' (enabled).
  219. %%
  220. %% After enabling, the subscriber will receive the next message published for `Event',
  221. %% after which the subscription is automatically disabled.
  222. %%
  223. %% This function is insensitive to concurrency, using 'wrapping' ets counter update ops.
  224. %% This guarantees that the counter will have either the value 1 or 0, depending on which
  225. %% update happened last.
  226. %%
  227. %% The return value indicates the previous status.
  228. %% @end
  229. enable_single(Scope, Event) when Scope==l; Scope==g ->
  230. gproc:update_counter({c,Scope,{?ETag,Event}}, {1, 1, 1}).
  231. -spec tell_singles(scope(), event(), msg()) -> [pid()].
  232. %% @doc Publish `Msg' to all single-shot subscribers of `Event'
  233. %%
  234. %% The subscriber status of each active subscriber is changed to `0' (disabled) before
  235. %% delivering the message. This reduces the risk that two different processes will be able
  236. %% to both deliver a message before disabling the subscribers. This could happen if the
  237. %% context switch happens just after the select operation (finding the active subscribers)
  238. %% and before the process is able to update the counters. In this case, it is possible
  239. %% that more than one can be delivered.
  240. %%
  241. %% The way to prevent this from happening is to ensure that only one process publishes
  242. %% for `Event'.
  243. %% @end
  244. tell_singles(Scope, Event, Msg) when Scope==l; Scope==g ->
  245. Subs = gproc:select(
  246. {Scope,c},
  247. [{ {{c,Scope,{?ETag,Event}}, '$1', 1}, [],
  248. [{{ {{c,Scope, {{?ETag,wrap(Event)}} }}, '$1', {{-1,0,0}} }}] }]),
  249. _ = gproc:update_counters(Scope, Subs),
  250. [begin P ! {?ETag, Event, Msg}, P end || {_,P,_} <- Subs].
  251. wrap(E) when is_tuple(E) ->
  252. {list_to_tuple([wrap(X) || X <- tuple_to_list(E)])};
  253. wrap(E) when is_list(E) ->
  254. [wrap(X) || X <- E];
  255. wrap(X) ->
  256. X.
  257. -spec list_singles(scope(), event()) -> [{pid(), status()}].
  258. %% @doc Lists all single-shot subscribers of Event, together with their status
  259. %% @end
  260. list_singles(Scope, Event) ->
  261. gproc:select({Scope,c}, [{ {{c,Scope,{?ETag,Event}}, '$1', '$2'},
  262. [], [{{'$1','$2'}}] }]).
  263. -spec notify_single_if_true(scope(), event(), fun(() -> boolean()), msg()) -> ok.
  264. %% @doc Create/enable a single subscription for event; notify at once if F() -> true
  265. %%
  266. %% This function is a convenience function, wrapping a single-shot pub/sub around a
  267. %% user-provided boolean test. `Msg' should be what the publisher will send later, if the
  268. %% immediate test returns `false'.
  269. %% @end
  270. notify_single_if_true(Scope, Event, F, Msg) ->
  271. try enable_single(Scope, Event)
  272. catch
  273. error:_ ->
  274. create_single(Scope, Event)
  275. end,
  276. case F() of
  277. true ->
  278. disable_single(Scope, Event),
  279. self() ! {?ETag, Event, Msg},
  280. ok;
  281. false ->
  282. ok
  283. end.