relay_ng.erl 17 KB


  1. %%% -------------------------------------------------------------------
  2. %%% Author : Sergei Polkovnikov <serge.polkovnikov@gmail.com>
  3. %%% Description : The server for retranslating events of the table.
  4. %%%
  5. %%% Created : Feb 25, 2013
  6. %%% -------------------------------------------------------------------
  7. %% Table -> Relay requests:
  8. %% {register_player, UserId, PlayerId} -> ok
  9. %% {unregister_player, PlayerId, Reason} -> ok
  10. %% Table -> Relay messages:
  11. %% {publish, Event}
  12. %% {to_client, PlayerId, Event}
  13. %% {to_subscriber, SubscrId, Event}
  14. %% {allow_broadcast_for_player, PlayerId}
  15. %% stop
  16. %% Relay -> Table notifications:
  17. %% {player_disconnected, PlayerId}
  18. %% {player_connected, PlayerId}
  19. %% {subscriber_added, PlayerId, SubscriptionId} - it's a hack to retrive current game state
  20. %% Subscriber -> Relay requests:
  21. %% {subscribe, Pid, UserId, RegNum} -> {ok, SubscriptionId} | {error, Reason}
  22. %% {unsubscribe, SubscriptionId} -> ok | {error, Reason}
  23. %% Relay -> Subscribers notifications:
  24. %% {relay_kick, SubscrId, Reason}
  25. %% {relay_event, SubscrId, Event}
  26. -module(relay_ng).
  27. -behaviour(gen_server).
  28. %% --------------------------------------------------------------------
  29. %% Include files
  30. %% --------------------------------------------------------------------
  31. -include_lib("server/include/basic_types.hrl").
  32. -include_lib("server/include/log.hrl").
  33. %% --------------------------------------------------------------------
  34. %% External exports
  35. -export([start/1, start_link/1, table_message/2, table_request/2]).
  36. -export([subscribe/4, unsubscribe/2, publish/2]).
  37. %% gen_server callbacks
  38. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
  39. -record(state, {subscribers,
  40. players,
  41. observers_allowed :: boolean(),
  42. table :: {atom(), pid()},
  43. table_mon_ref :: reference()
  44. }).
  45. -record(subscriber,
  46. {id :: reference(),
  47. pid :: pid(),
  48. user_id :: binary(),
  49. player_id :: integer(),
  50. mon_ref :: reference(),
  51. broadcast_allowed :: boolean()
  52. }).
  53. -record(player,
  54. {id :: integer(),
  55. user_id :: binary(),
  56. status :: online | offline
  57. }).
  58. %% ====================================================================
  59. %% External functions
  60. %% ====================================================================
  61. start(Params) ->
  62. gen_server:start(?MODULE, [Params], []).
  63. start_link(Params) ->
  64. gen_server:start_link(?MODULE, [Params], []).
  65. subscribe(Relay, Pid, UserId, RegNum) ->
  66. client_request(Relay, {subscribe, Pid, UserId, RegNum}).
  67. unsubscribe(Relay, SubscriptionId) ->
  68. client_request(Relay, {unsubscribe, SubscriptionId}).
  69. publish(Relay, Message) ->
  70. client_message(Relay, {publish, Message}).
  71. table_message(Relay, Message) ->
  72. gen_server:cast(Relay, {table_message, Message}).
  73. table_request(Relay, Request) ->
  74. table_request(Relay, Request, 5000).
  75. table_request(Relay, Request, Timeout) ->
  76. gen_server:call(Relay, {table_request, Request}, Timeout).
  77. client_message(Relay, Message) ->
  78. gen_server:cast(Relay, {client_message, Message}).
  79. client_request(Relay, Request) ->
  80. client_request(Relay, Request, 5000).
  81. client_request(Relay, Request, Timeout) ->
  82. gen_server:call(Relay, {client_request, Request}, Timeout).
  83. %% ====================================================================
  84. %% Server functions
  85. %% ====================================================================
  86. %% --------------------------------------------------------------------
  87. init([Params]) ->
  88. PlayersInfo = proplists:get_value(players, Params),
  89. ObserversAllowed = proplists:get_value(observers_allowed, Params),
  90. Table = {_, TablePid} = proplists:get_value(table, Params),
  91. Players = init_players(PlayersInfo),
  92. MonRef = erlang:monitor(process, TablePid),
  93. {ok, #state{subscribers = subscribers_init(),
  94. players = Players,
  95. observers_allowed = ObserversAllowed,
  96. table = Table,
  97. table_mon_ref = MonRef}}.
  98. %% --------------------------------------------------------------------
  99. handle_call({client_request, Request}, From, State) ->
  100. handle_client_request(Request, From, State);
  101. handle_call({table_request, Request}, From, State) ->
  102. handle_table_request(Request, From, State);
  103. handle_call(_Request, _From, State) ->
  104. Reply = ok,
  105. {reply, Reply, State}.
  106. %% --------------------------------------------------------------------
  107. handle_cast({client_message, Msg}, State) ->
  108. gas:info(?MODULE,"RELAY_NG Received client message: ~p", [Msg]),
  109. handle_client_message(Msg, State);
  110. handle_cast({table_message, Msg}, State) ->
  111. gas:info(?MODULE,"RELAY_NG Received table message: ~p", [Msg]),
  112. handle_table_message(Msg, State);
  113. handle_cast(_Msg, State) ->
  114. {noreply, State}.
  115. %% --------------------------------------------------------------------
  116. handle_info({'DOWN', TableMonRef, process, _Pid, _Info},
  117. #state{subscribers = Subscribers,
  118. table_mon_ref = TableMonRef} = State) ->
  119. gas:info(?MODULE,"RELAY_NG All The parent table is down. "
  120. "Disconnecting all subscribers and sutting down.", []),
  121. [begin
  122. erlang:demonitor(MonRef, [flush]),
  123. Pid ! {relay_kick, SubscrId, table_down}
  124. end || #subscriber{id = SubscrId, pid = Pid, mon_ref = MonRef}
  125. <- subscribers_to_list(Subscribers)],
  126. {stop, normal, State};
  127. handle_info({'DOWN', MonRef, process, _Pid, _Info},
  128. #state{subscribers = Subscribers, players = Players,
  129. table = {TableMod, TablePid}} = State) ->
  130. case find_subscribers_by_mon_ref(MonRef, Subscribers) of
  131. [#subscriber{player_id = undefined, id = SubscrId}] ->
  132. NewSubscribers = del_subscriber(SubscrId, Subscribers),
  133. {noreply, State#state{subscribers = NewSubscribers}};
  134. [#subscriber{player_id = PlayerId, user_id = UserId, id = SubscrId}] ->
  135. NewSubscribers = del_subscriber(SubscrId, Subscribers),
  136. case find_subscribers_by_player_id(PlayerId, NewSubscribers) of
  137. [] ->
  138. gas:info(?MODULE,"RELAY_NG All sessions of player <~p> (~p) are closed. "
  139. "Sending the notification to the table.", [PlayerId, UserId]),
  140. NewPlayers = update_player_status(PlayerId, offline, Players),
  141. TableMod:relay_message(TablePid, {player_disconnected, PlayerId}),
  142. {noreply, State#state{subscribers = NewSubscribers, players = NewPlayers}};
  143. _ ->
  144. {noreply, State#state{subscribers = NewSubscribers}}
  145. end;
  146. [] ->
  147. {noreply, State}
  148. end;
  149. handle_info(_Info, State) ->
  150. {noreply, State}.
  151. %% --------------------------------------------------------------------
  152. terminate(_Reason, _State) ->
  153. ok.
  154. %% --------------------------------------------------------------------
  155. code_change(_OldVsn, State, _Extra) ->
  156. {ok, State}.
  157. %% --------------------------------------------------------------------
  158. %%% Internal functions
  159. %% --------------------------------------------------------------------
  160. handle_client_request({subscribe, Pid, UserId, observer}, _From,
  161. #state{observers_allowed = ObserversAllowed,
  162. subscribers = Subscribers} = State) ->
  163. if ObserversAllowed ->
  164. MonRef = erlang:monitor(process, Pid),
  165. SubscrId = erlang:make_ref(),
  166. NewSubscribers = store_subscriber(SubscrId, Pid, UserId, undefined, MonRef, true, Subscribers),
  167. {reply, ok, State#state{subscribers = NewSubscribers}};
  168. true ->
  169. {reply, {error, observers_not_allowed}, State}
  170. end;
  171. handle_client_request({subscribe, Pid, UserId, PlayerId}, _From,
  172. #state{players = Players, subscribers = Subscribers,
  173. table = {TableMod, TablePid}} = State) ->
  174. gas:info(?MODULE,"RELAY_NG Subscription request from user ~p, PlayerId: <~p>", [UserId, PlayerId]),
  175. case find_player(PlayerId, Players) of
  176. {ok, #player{user_id = UserId, status = Status} = P} -> %% The user id is matched
  177. gas:info(?MODULE,"RELAY_NG User ~p is registered as player <~p>", [UserId, PlayerId]),
  178. gas:info(?MODULE,"RELAY_NG User ~p player info: ~p", [UserId, P]),
  179. MonRef = erlang:monitor(process, Pid),
  180. SubscrId = erlang:make_ref(),
  181. NewSubscribers = store_subscriber(SubscrId, Pid, UserId, PlayerId, MonRef,
  182. _BroadcastAllowed = false, Subscribers),
  183. NewPlayers = if Status == offline ->
  184. gas:info(?MODULE,"RELAY_NG Notifying the table about user ~p (<~p>).", [PlayerId, UserId]),
  185. TableMod:relay_message(TablePid, {player_connected, PlayerId}),
  186. update_player_status(PlayerId, online, Players);
  187. true ->
  188. gas:info(?MODULE,"RELAY_NG User ~p (<~p>) is already subscribed.", [PlayerId, UserId]),
  189. Players
  190. end,
  191. TableMod:relay_message(TablePid, {subscriber_added, PlayerId, SubscrId}),
  192. {reply, {ok, SubscrId}, State#state{players = NewPlayers, subscribers = NewSubscribers}};
  193. {ok, #player{}=P} ->
  194. gas:info(?MODULE,"RELAY_NG Subscription for user ~p rejected. There is another owner of the "
  195. "PlayerId <~p>: ~p", [UserId, PlayerId, P]),
  196. {reply, {error, not_player_id_owner}, State};
  197. error ->
  198. {reply, {error, unknown_player_id}, State}
  199. end;
  200. handle_client_request({unsubscribe, SubscrId}, _From,
  201. #state{subscribers = Subscribers, players = Players,
  202. table = {TableMod, TablePid}} = State) ->
  203. case get_subscriber(SubscrId, Subscribers) of
  204. error ->
  205. {reply, {error, not_subscribed}, State};
  206. {ok, #subscriber{id = SubscrId, mon_ref = MonRef, player_id = undefined}} ->
  207. erlang:demonitor(MonRef, [flush]),
  208. NewSubscribers = del_subscriber(SubscrId, Subscribers),
  209. {reply, ok, State#state{subscribers = NewSubscribers}};
  210. {ok, #subscriber{id = SubscrId, mon_ref = MonRef, player_id = PlayerId}} ->
  211. erlang:demonitor(MonRef, [flush]),
  212. NewSubscribers = del_subscriber(SubscrId, Subscribers),
  213. NewPlayers = case find_subscribers_by_player_id(PlayerId, Subscribers) of
  214. [] ->
  215. TableMod:relay_message(TablePid, {player_disconnected, PlayerId}),
  216. update_player_status(PlayerId, offline, Players);
  217. _ -> Players
  218. end,
  219. {reply, ok, State#state{subscribers = NewSubscribers,
  220. players = NewPlayers}}
  221. end;
  222. handle_client_request(_Request, _From, State) ->
  223. {reply, {error, unknown_request}, State}.
  224. %%===================================================================
  225. handle_table_request({register_player, UserId, PlayerId}, _From,
  226. #state{players = Players} = State) ->
  227. NewPlayers = store_player(PlayerId, UserId, offline, Players),
  228. {reply, ok, State#state{players = NewPlayers}};
  229. handle_table_request({unregister_player, PlayerId, Reason}, _From,
  230. #state{players = Players,
  231. subscribers = Subscribers} = State) ->
  232. NewPlayers = del_player(PlayerId, Players),
  233. UpdSubscribers = find_subscribers_by_player_id(PlayerId, Subscribers),
  234. F = fun(#subscriber{id = SubscrId, pid = Pid, mon_ref = MonRef}, Acc) ->
  235. erlang:demonitor(MonRef, [flush]),
  236. Pid ! {relay_kick, SubscrId, Reason},
  237. del_subscriber(SubscrId, Acc)
  238. end,
  239. NewSubscribers = lists:foldl(F, Subscribers, UpdSubscribers),
  240. {reply, ok, State#state{players = NewPlayers,
  241. subscribers = NewSubscribers}};
  242. handle_table_request(_Request, _From, State) ->
  243. Reply = ok,
  244. {reply, Reply, State}.
  245. %%===================================================================
  246. %% XXX: Unsecure because of spoofing posibility (a client session can send events by the
  247. %% name of the table). Legacy.
  248. handle_client_message({publish, Msg}, #state{subscribers = Subscribers} = State) ->
  249. Receipients = subscribers_to_list(Subscribers),
  250. [Pid ! {relay_event, SubscrId, Msg} ||
  251. #subscriber{id = SubscrId, pid = Pid, broadcast_allowed = true} <- Receipients],
  252. {noreply, State};
  253. handle_client_message(_Msg, State) ->
  254. {noreply, State}.
  255. %%===================================================================
  256. handle_table_message({publish, Msg}, #state{subscribers = Subscribers} = State) ->
  257. gas:info(?MODULE,"RELAY_NG The table publish message: ~p", [Msg]),
  258. Receipients = subscribers_to_list(Subscribers),
  259. [Pid ! {relay_event, SubscrId, Msg} ||
  260. #subscriber{id = SubscrId, pid = Pid, broadcast_allowed = true} <- Receipients],
  261. {noreply, State};
  262. handle_table_message({to_client, PlayerId, Msg}, #state{subscribers = Subscribers} = State) ->
  263. Recepients = find_subscribers_by_player_id(PlayerId, Subscribers),
  264. gas:info(?MODULE,"RELAY_NG Send table message to player's (~p) sessions: ~p. Message: ~p",
  265. [PlayerId, Recepients, Msg]),
  266. [Pid ! {relay_event, SubscrId, Msg} || #subscriber{id = SubscrId, pid = Pid} <- Recepients],
  267. {noreply, State};
  268. handle_table_message({to_subscriber, SubscrId, Msg}, #state{subscribers = Subscribers} = State) ->
  269. gas:info(?MODULE,"RELAY_NG Send table message to subscriber: ~p. Message: ~p", [SubscrId, Msg]),
  270. case get_subscriber(SubscrId, Subscribers) of
  271. {ok, #subscriber{pid = Pid}} -> Pid ! {relay_event, SubscrId, Msg};
  272. _ -> do_nothing
  273. end,
  274. {noreply, State};
  275. handle_table_message({allow_broadcast_for_player, PlayerId},
  276. #state{subscribers = Subscribers} = State) ->
  277. gas:info(?MODULE,"RELAY_NG Received directive to allow receiving published messages for player <~p>",
  278. [PlayerId]),
  279. PlSubscribers = find_subscribers_by_player_id(PlayerId, Subscribers),
  280. F = fun(Subscriber, Acc) ->
  281. store_subscriber_rec(Subscriber#subscriber{broadcast_allowed = true}, Acc)
  282. end,
  283. NewSubscribers = lists:foldl(F, Subscribers, PlSubscribers),
  284. {noreply, State#state{subscribers = NewSubscribers}};
  285. handle_table_message(stop, #state{subscribers = Subscribers} = State) ->
  286. [begin
  287. erlang:demonitor(MonRef, [flush]),
  288. Pid ! {relay_kick, SubscrId, table_closed}
  289. end || #subscriber{id = SubscrId, pid = Pid, mon_ref = MonRef}
  290. <- subscribers_to_list(Subscribers)],
  291. {stop, normal, State#state{subscribers = subscribers_init()}};
  292. handle_table_message(_Msg, State) ->
  293. {noreply, State}.
  294. %%===================================================================
  295. init_players(PlayersInfo) ->
  296. init_players(PlayersInfo, players_init()).
  297. init_players([], Players) ->
  298. Players;
  299. init_players([{PlayerId, UserId} | PlayersInfo], Players) ->
  300. NewPlayers = store_player(PlayerId, UserId, offline, Players),
  301. init_players(PlayersInfo, NewPlayers).
  302. %%===================================================================
  303. players_init() -> midict:new().
  304. store_player(PlayerId, UserId, Status, Players) ->
  305. store_player_rec(#player{id = PlayerId, user_id = UserId, status = Status}, Players).
  306. store_player_rec(#player{id = PlayerId, user_id = _UserId, status = _Status} = Rec, Players) ->
  307. midict:store(PlayerId, Rec, [], Players).
  308. del_player(PlayerId, Players) ->
  309. midict:erase(PlayerId, Players).
  310. fetch_player(PlayerId, Players) ->
  311. midict:fetch(PlayerId, Players).
  312. find_player(PlayerId, Players) ->
  313. midict:find(PlayerId, Players).
  314. update_player_status(PlayerId, Status, Players) ->
  315. Rec = fetch_player(PlayerId, Players),
  316. store_player_rec(Rec#player{status = Status}, Players).
  317. %%===================================================================
  318. subscribers_init() -> midict:new().
  319. store_subscriber(SubscrId, Pid, UserId, PlayerId, MonRef, BroadcastAllowed, Subscribers) ->
  320. store_subscriber_rec(#subscriber{id = SubscrId, pid = Pid, user_id = UserId,
  321. player_id = PlayerId, mon_ref = MonRef,
  322. broadcast_allowed = BroadcastAllowed}, Subscribers).
  323. store_subscriber_rec(#subscriber{id = SubscrId, pid = Pid, user_id = _UserId,
  324. player_id = PlayerId, mon_ref = MonRef} = Rec, Subscribers) ->
  325. midict:store(SubscrId, Rec, [{pid, Pid}, {player_id, PlayerId}, {mon_ref, MonRef}],
  326. Subscribers).
  327. %% del_subscriber(SubscrId, Subscribers) -> NewSubscribers
  328. del_subscriber(SubscrId, Subscribers) ->
  329. midict:erase(SubscrId, Subscribers).
  330. %% get_subscriber(Id, Subscribers) -> {ok, #subscriber{}} | error
  331. get_subscriber(Id, Subscribers) ->
  332. midict:find(Id, Subscribers).
  333. %% find_subscribers_by_player_id(PlayerId, Subscribers) -> list(#subscriber{})
  334. find_subscribers_by_player_id(PlayerId, Subscribers) ->
  335. midict:geti(PlayerId, player_id, Subscribers).
  336. %% find_subscribers_by_mon_ref(MonRef, Subscribers) -> list(#subscriber{})
  337. find_subscribers_by_mon_ref(MonRef, Subscribers) ->
  338. midict:geti(MonRef, mon_ref, Subscribers).
  339. %% subscribers_to_list(Subscribers) -> list(#subscriber{})
  340. subscribers_to_list(Subscribers) ->
  341. midict:all_values(Subscribers).