gproc_ps.erl 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. %% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
  2. %% --------------------------------------------------
  3. %% This file is provided to you under the Apache License,
  4. %% Version 2.0 (the "License"); you may not use this file
  5. %% except in compliance with the License. You may obtain
  6. %% a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing,
  11. %% software distributed under the License is distributed on an
  12. %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  13. %% KIND, either express or implied. See the License for the
  14. %% specific language governing permissions and limitations
  15. %% under the License.
  16. %% --------------------------------------------------
  17. %%
  18. %% @author Ulf Wiger <ulf@wiger.net>
  19. %%
  20. %% @doc Gproc Publish/Subscribe patterns
  21. %% This module implements a few convenient functions for publish/subscribe.
  22. %%
  23. %% Publish/subscribe with Gproc relies entirely on gproc properties and counters.
  24. %% This makes for a very concise implementation, as the monitoring of subscribers and
  25. %% removal of subscriptions comes for free with Gproc.
  26. %%
  27. %% Using this module instead of rolling your own (which is easy enough) brings the
  28. %% benefit of consistency, in tracing and debugging.
  29. %% The implementation can also serve to illustrate how to use gproc properties and
  30. %% counters to good effect.
  31. %%
  32. %% @type scope() = l | g.
  33. %% @type event() = any().
  34. %% @type msg() = any().
  35. %% @type status() = 1 | 0.
  36. %% @end
  37. -module(gproc_ps).
  38. -export([subscribe/2,
  39. subscribe_cond/3,
  40. change_cond/3,
  41. unsubscribe/2,
  42. publish/3,
  43. publish_cond/3,
  44. list_subs/2
  45. ]).
  46. -export([create_single/2,
  47. delete_single/2,
  48. disable_single/2,
  49. enable_single/2,
  50. tell_singles/3,
  51. notify_single_if_true/4,
  52. list_singles/2]).
  53. -define(ETag, gproc_ps_event).
  54. %% These types are duplicated above in EDoc syntax, since EDoc annoyingly doesn't pick up
  55. %% the type definitions, even if they are referred to in the -spec:s that EDoc does parse.
  56. -type scope() :: l | g.
  57. -type event() :: any().
  58. -type msg() :: any().
  59. -type status() :: 1 | 0.
  60. -spec subscribe(scope(), event()) -> true.
  61. %% @doc Subscribe to events of type `Event'
  62. %%
  63. %% Any messages published with `gproc_ps:publish(Scope, Event, Msg)' will be
  64. %% delivered to the current process, along with all other subscribers.
  65. %%
  66. %% This function creates a property, `{p,Scope,{gproc_ps_event,Event}}', which
  67. %% can be searched and displayed for debugging purposes.
  68. %%
  69. %% Note that, as with {@link gproc:reg/1}, this function will raise an
  70. %% exception if you try to subscribe to the same event twice from the same
  71. %% process.
  72. %% @end
  73. subscribe(Scope, Event) when Scope==l; Scope==g ->
  74. gproc:reg({p,Scope,{?ETag, Event}}).
  75. -spec subscribe_cond(scope(), event(), undefined | ets:match_spec()) -> true.
  76. %% @doc Subscribe conditionally to events of type `Event'
  77. %%
  78. %% This function is similar to {@link subscribe/2}, but adds a condition
  79. %% in the form of a match specification.
  80. %%
  81. %% The condition is tested by the {@link publish_cond/3} function
  82. %% and a message is delivered only if the condition is true. Specifically,
  83. %% the test is:
  84. %%
  85. %% `ets:match_spec_run([Msg], ets:match_spec_compile(Cond)) == [true]'
  86. %%
  87. %% In other words, if the match_spec returns true for a message, that message
  88. %% is sent to the subscriber. For any other result from the match_spec, the
  89. %% message is not sent. `Cond == undefined' means that all messages will be
  90. %% delivered (that is, `publish_cond/3' will treat 'normal' subscribers just
  91. %% like {@link publish/3} does, except that `publish/3' strictly speaking
  92. %% ignores the Value part of the property completely, whereas `publish_cond/3'
  93. %% expects it to be either undefined or a valid match spec).
  94. %%
  95. %% This means that `Cond=undefined' and ``Cond=[{'_',[],[true]}]'' are
  96. %% equivalent.
  97. %%
  98. %% Note that, as with {@link gproc:reg/1}, this function will raise an
  99. %% exception if you try to subscribe to the same event twice from the same
  100. %% process.
  101. %% @end
  102. subscribe_cond(Scope, Event, Spec) when Scope==l; Scope==g ->
  103. case Spec of
  104. undefined -> ok;
  105. [_|_] -> _ = ets:match_spec_compile(Spec); % validation
  106. _ -> error(badarg)
  107. end,
  108. gproc:reg({p,Scope,{?ETag, Event}}, Spec).
  109. -spec change_cond(scope(), event(), undefined | ets:match_spec()) -> true.
  110. %% @doc Change the condition specification of an existing subscription.
  111. %%
  112. %% This function atomically changes the condition spec of an existing
  113. %% subscription (see {@link subscribe_cond/3}). An exception is raised if
  114. %% the subscription doesn't already exist.
  115. %%
  116. %% Note that this function can also be used to change a conditional subscription
  117. %% to an unconditional one (by setting `Spec = undefined'), or a 'normal'
  118. %% subscription to a conditional one.
  119. %% @end
  120. change_cond(Scope, Event, Spec) when Scope==l; Scope==g ->
  121. case Spec of
  122. undefined -> ok;
  123. [_|_] -> _ = ets:match_spec_compile(Spec); % validation
  124. _ -> error(badarg)
  125. end,
  126. gproc:set_value({p,Scope,{?ETag, Event}}, Spec).
  127. -spec unsubscribe(scope(), event()) -> true.
  128. %% @doc Remove subscribtion created using `subscribe(Scope, Event)'
  129. %%
  130. %% This removes the property created through `subscribe/2'.
  131. %% @end
  132. unsubscribe(Scope, Event) when Scope==l; Scope==g ->
  133. gproc:unreg({p,Scope,{?ETag, Event}}).
  134. -spec publish(scope(), event(), msg()) -> ok.
  135. %% @doc Publish the message `Msg' to all subscribers of `Event'
  136. %%
  137. %% The message delivered to each subscriber will be of the form:
  138. %%
  139. %% `{gproc_ps_event, Event, Msg}'
  140. %%
  141. %% The function uses `gproc:send/2' to send a message to all processes which have a
  142. %% property `{p,Scope,{gproc_ps_event,Event}}'.
  143. %% @end
  144. publish(Scope, Event, Msg) when Scope==l; Scope==g ->
  145. gproc:send({p, Scope, {?ETag, Event}}, {?ETag, Event, Msg}).
  146. -spec publish_cond(scope(), event(), msg()) -> msg().
  147. %% @doc Publishes the message `Msg' to conditional subscribers of `Event'
  148. %%
  149. %% The message will be delivered to each subscriber provided their respective
  150. %% condition tests succeed.
  151. %%
  152. %% @see subscribe_cond/3.
  153. %%
  154. publish_cond(Scope, Event, Msg) when Scope==l; Scope==g ->
  155. Message = {?ETag, Event, Msg},
  156. lists:foreach(
  157. fun({Pid, undefined}) -> Pid ! Message;
  158. ({Pid, Spec}) ->
  159. try C = ets:match_spec_compile(Spec),
  160. case ets:match_spec_run([Msg], C) of
  161. [true] -> Pid ! Message;
  162. _ -> ok
  163. end
  164. catch
  165. error:_ ->
  166. ok
  167. end
  168. end, gproc:select({Scope,p}, [{ {{p,Scope,{?ETag,Event}}, '$1', '$2'},
  169. [], [{{'$1','$2'}}] }])).
  170. -spec list_subs(scope(), event()) -> [pid()].
  171. %% @doc List the pids of all processes subscribing to `Event'
  172. %%
  173. %% This function uses `gproc:select/2' to find all properties indicating a subscription.
  174. %% @end
  175. list_subs(Scope, Event) when Scope==l; Scope==g ->
  176. gproc:select({Scope,p}, [{ {{p,Scope,{?ETag,Event}}, '$1', '_'}, [], ['$1'] }]).
  177. -spec create_single(scope(), event()) -> true.
  178. %% @doc Creates a single-shot subscription entry for Event
  179. %%
  180. %% Single-shot subscriptions behave similarly to the `{active,once}' property of sockets.
  181. %% Once a message has been published, the subscription is disabled, and no more messages
  182. %% will be delivered to the subscriber unless the subscription is re-enabled using
  183. %% `enable_single/2'.
  184. %%
  185. %% The function creates a gproc counter entry, `{c,Scope,{gproc_ps_event,Event}}', which
  186. %% will have either of the values `0' (disabled) or `1' (enabled). Initially, the value
  187. %% is `1', meaning the subscription is enabled.
  188. %%
  189. %% Counters are used in this case, since they can be atomically updated by both the
  190. %% subscriber (owner) and publisher. The publisher sets the counter value to `0' as soon
  191. %% as it has delivered a message.
  192. %% @end
  193. create_single(Scope, Event) when Scope==l; Scope==g ->
  194. gproc:reg({c,Scope,{?ETag, Event}}, 1).
  195. -spec delete_single(scope(), event()) -> true.
  196. %% @doc Deletes the single-shot subscription for Event
  197. %%
  198. %% This function deletes the counter entry representing the single-shot description.
  199. %% An exception will be raised if there is no such subscription.
  200. %% @end
  201. delete_single(Scope, Event) when Scope==l; Scope==g ->
  202. gproc:unreg({c,Scope,{?ETag, Event}}).
  203. -spec disable_single(scope(), event()) -> integer().
  204. %% @doc Disables the single-shot subscription for Event
  205. %%
  206. %% This function changes the value of the corresponding gproc counter to `0' (disabled).
  207. %%
  208. %% The subscription remains (e.g. for debugging purposes), but with a 'disabled' status.
  209. %% This function is insensitive to concurrency, using 'wrapping' ets counter update ops.
  210. %% This guarantees that the counter will have either the value 1 or 0, depending on which
  211. %% update happened last.
  212. %%
  213. %% The return value indicates the previous status.
  214. %% @end
  215. disable_single(Scope, Event) when Scope==l; Scope==g ->
  216. gproc:update_counter({c,Scope,{?ETag,Event}}, {-1, 0, 0}).
  217. -spec enable_single(scope(), event()) -> integer().
  218. %% @doc Enables the single-shot subscription for Event
  219. %%
  220. %% This function changes the value of the corresponding gproc counter to `1' (enabled).
  221. %%
  222. %% After enabling, the subscriber will receive the next message published for `Event',
  223. %% after which the subscription is automatically disabled.
  224. %%
  225. %% This function is insensitive to concurrency, using 'wrapping' ets counter update ops.
  226. %% This guarantees that the counter will have either the value 1 or 0, depending on which
  227. %% update happened last.
  228. %%
  229. %% The return value indicates the previous status.
  230. %% @end
  231. enable_single(Scope, Event) when Scope==l; Scope==g ->
  232. gproc:update_counter({c,Scope,{?ETag,Event}}, {1, 1, 1}).
  233. -spec tell_singles(scope(), event(), msg()) -> [pid()].
  234. %% @doc Publish `Msg' to all single-shot subscribers of `Event'
  235. %%
  236. %% The subscriber status of each active subscriber is changed to `0' (disabled) before
  237. %% delivering the message. This reduces the risk that two different processes will be able
  238. %% to both deliver a message before disabling the subscribers. This could happen if the
  239. %% context switch happens just after the select operation (finding the active subscribers)
  240. %% and before the process is able to update the counters. In this case, it is possible
  241. %% that more than one can be delivered.
  242. %%
  243. %% The way to prevent this from happening is to ensure that only one process publishes
  244. %% for `Event'.
  245. %% @end
  246. tell_singles(Scope, Event, Msg) when Scope==l; Scope==g ->
  247. Subs = gproc:select(
  248. {Scope,c},
  249. [{ {{c,Scope,{?ETag,Event}}, '$1', 1}, [],
  250. [{{ {{c,Scope, {{?ETag,wrap(Event)}} }}, '$1', {{-1,0,0}} }}] }]),
  251. _ = gproc:update_counters(Scope, Subs),
  252. [begin P ! {?ETag, Event, Msg}, P end || {_,P,_} <- Subs].
  253. wrap(E) when is_tuple(E) ->
  254. {list_to_tuple([wrap(X) || X <- tuple_to_list(E)])};
  255. wrap(E) when is_list(E) ->
  256. [wrap(X) || X <- E];
  257. wrap(X) ->
  258. X.
  259. -spec list_singles(scope(), event()) -> [{pid(), status()}].
  260. %% @doc Lists all single-shot subscribers of Event, together with their status
  261. %% @end
  262. list_singles(Scope, Event) ->
  263. gproc:select({Scope,c}, [{ {{c,Scope,{?ETag,Event}}, '$1', '$2'},
  264. [], [{{'$1','$2'}}] }]).
  265. -spec notify_single_if_true(scope(), event(), fun(() -> boolean()), msg()) -> ok.
  266. %% @doc Create/enable a single subscription for event; notify at once if F() -> true
  267. %%
  268. %% This function is a convenience function, wrapping a single-shot pub/sub around a
  269. %% user-provided boolean test. `Msg' should be what the publisher will send later, if the
  270. %% immediate test returns `false'.
  271. %% @end
  272. notify_single_if_true(Scope, Event, F, Msg) ->
  273. try enable_single(Scope, Event)
  274. catch
  275. error:_ ->
  276. create_single(Scope, Event)
  277. end,
  278. case F() of
  279. true ->
  280. disable_single(Scope, Event),
  281. self() ! {?ETag, Event, Msg},
  282. ok;
  283. false ->
  284. ok
  285. end.