relay_ng.erl 17 KB


  1. %%% -------------------------------------------------------------------
  2. %%% Author : Sergei Polkovnikov <serge.polkovnikov@gmail.com>
  3. %%% Description : The server for retranslating events of the table.
  4. %%%
  5. %%% Created : Feb 25, 2013
  6. %%% -------------------------------------------------------------------
  7. %% Table -> Relay requests:
  8. %% {register_player, UserId, PlayerId} -> ok
  9. %% {unregister_player, PlayerId, Reason} -> ok
  10. %% Table -> Relay messages:
  11. %% {publish, Event}
  12. %% {to_client, PlayerId, Event}
  13. %% {to_subscriber, SubscrId, Event}
  14. %% {allow_broadcast_for_player, PlayerId}
  15. %% stop
  16. %% Relay -> Table notifications:
  17. %% {player_disconnected, PlayerId}
  18. %% {player_connected, PlayerId}
  19. %% {subscriber_added, PlayerId, SubscriptionId} - it's a hack to retrive current game state
  20. %% Subscriber -> Relay requests:
  21. %% {subscribe, Pid, UserId, RegNum} -> {ok, SubscriptionId} | {error, Reason}
  22. %% {unsubscribe, SubscriptionId} -> ok | {error, Reason}
  23. %% Relay -> Subscribers notifications:
  24. %% {relay_kick, SubscrId, Reason}
  25. %% {relay_event, SubscrId, Event}
  26. -module(relay_ng).
  27. -behaviour(gen_server).
  28. %% --------------------------------------------------------------------
  29. %% Include files
  30. %% --------------------------------------------------------------------
  31. -include_lib("server/include/basic_types.hrl").
  32. %% --------------------------------------------------------------------
  33. %% External exports
  34. -export([start/1, start_link/1, table_message/2, table_request/2]).
  35. -export([subscribe/4, unsubscribe/2, publish/2]).
  36. %% gen_server callbacks
  37. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
  38. -record(state, {subscribers,
  39. players,
  40. observers_allowed :: boolean(),
  41. table :: {atom(), pid()},
  42. table_mon_ref :: reference()
  43. }).
  44. -record(subscriber,
  45. {id :: reference(),
  46. pid :: pid(),
  47. user_id :: binary(),
  48. player_id :: integer(),
  49. mon_ref :: reference(),
  50. broadcast_allowed :: boolean()
  51. }).
  52. -record(player,
  53. {id :: integer(),
  54. user_id :: binary(),
  55. status :: online | offline
  56. }).
  57. %% ====================================================================
  58. %% External functions
  59. %% ====================================================================
  60. start(Params) ->
  61. gen_server:start(?MODULE, [Params], []).
  62. start_link(Params) ->
  63. gen_server:start_link(?MODULE, [Params], []).
  64. subscribe(Relay, Pid, UserId, RegNum) ->
  65. client_request(Relay, {subscribe, Pid, UserId, RegNum}).
  66. unsubscribe(Relay, SubscriptionId) ->
  67. client_request(Relay, {unsubscribe, SubscriptionId}).
  68. publish(Relay, Message) ->
  69. client_message(Relay, {publish, Message}).
  70. table_message(Relay, Message) ->
  71. gen_server:cast(Relay, {table_message, Message}).
  72. table_request(Relay, Request) ->
  73. table_request(Relay, Request, 5000).
  74. table_request(Relay, Request, Timeout) ->
  75. gen_server:call(Relay, {table_request, Request}, Timeout).
  76. client_message(Relay, Message) ->
  77. gen_server:cast(Relay, {client_message, Message}).
  78. client_request(Relay, Request) ->
  79. client_request(Relay, Request, 5000).
  80. client_request(Relay, Request, Timeout) ->
  81. gen_server:call(Relay, {client_request, Request}, Timeout).
  82. %% ====================================================================
  83. %% Server functions
  84. %% ====================================================================
  85. %% --------------------------------------------------------------------
  86. init([Params]) ->
  87. PlayersInfo = proplists:get_value(players, Params),
  88. ObserversAllowed = proplists:get_value(observers_allowed, Params),
  89. Table = {_, TablePid} = proplists:get_value(table, Params),
  90. Players = init_players(PlayersInfo),
  91. MonRef = erlang:monitor(process, TablePid),
  92. {ok, #state{subscribers = subscribers_init(),
  93. players = Players,
  94. observers_allowed = ObserversAllowed,
  95. table = Table,
  96. table_mon_ref = MonRef}}.
  97. %% --------------------------------------------------------------------
  98. handle_call({client_request, Request}, From, State) ->
  99. handle_client_request(Request, From, State);
  100. handle_call({table_request, Request}, From, State) ->
  101. handle_table_request(Request, From, State);
  102. handle_call(_Request, _From, State) ->
  103. Reply = ok,
  104. {reply, Reply, State}.
  105. %% --------------------------------------------------------------------
  106. handle_cast({client_message, Msg}, State) ->
  107. gas:info(?MODULE,"RELAY_NG Received client message: ~p", [Msg]),
  108. handle_client_message(Msg, State);
  109. handle_cast({table_message, Msg}, State) ->
  110. gas:info(?MODULE,"RELAY_NG Received table message: ~p", [Msg]),
  111. handle_table_message(Msg, State);
  112. handle_cast(_Msg, State) ->
  113. {noreply, State}.
  114. %% --------------------------------------------------------------------
  115. handle_info({'DOWN', TableMonRef, process, _Pid, _Info},
  116. #state{subscribers = Subscribers,
  117. table_mon_ref = TableMonRef} = State) ->
  118. gas:info(?MODULE,"RELAY_NG All The parent table is down. "
  119. "Disconnecting all subscribers and sutting down.", []),
  120. [begin
  121. erlang:demonitor(MonRef, [flush]),
  122. Pid ! {relay_kick, SubscrId, table_down}
  123. end || #subscriber{id = SubscrId, pid = Pid, mon_ref = MonRef}
  124. <- subscribers_to_list(Subscribers)],
  125. {stop, normal, State};
  126. handle_info({'DOWN', MonRef, process, _Pid, _Info},
  127. #state{subscribers = Subscribers, players = Players,
  128. table = {TableMod, TablePid}} = State) ->
  129. case find_subscribers_by_mon_ref(MonRef, Subscribers) of
  130. [#subscriber{player_id = undefined, id = SubscrId}] ->
  131. NewSubscribers = del_subscriber(SubscrId, Subscribers),
  132. {noreply, State#state{subscribers = NewSubscribers}};
  133. [#subscriber{player_id = PlayerId, user_id = UserId, id = SubscrId}] ->
  134. NewSubscribers = del_subscriber(SubscrId, Subscribers),
  135. case find_subscribers_by_player_id(PlayerId, NewSubscribers) of
  136. [] ->
  137. gas:info(?MODULE,"RELAY_NG All sessions of player <~p> (~p) are closed. "
  138. "Sending the notification to the table.", [PlayerId, UserId]),
  139. NewPlayers = update_player_status(PlayerId, offline, Players),
  140. TableMod:relay_message(TablePid, {player_disconnected, PlayerId}),
  141. {noreply, State#state{subscribers = NewSubscribers, players = NewPlayers}};
  142. _ ->
  143. {noreply, State#state{subscribers = NewSubscribers}}
  144. end;
  145. [] ->
  146. {noreply, State}
  147. end;
  148. handle_info(_Info, State) ->
  149. {noreply, State}.
  150. %% --------------------------------------------------------------------
  151. terminate(_Reason, _State) ->
  152. ok.
  153. %% --------------------------------------------------------------------
  154. code_change(_OldVsn, State, _Extra) ->
  155. {ok, State}.
  156. %% --------------------------------------------------------------------
  157. %%% Internal functions
  158. %% --------------------------------------------------------------------
  159. handle_client_request({subscribe, Pid, UserId, observer}, _From,
  160. #state{observers_allowed = ObserversAllowed,
  161. subscribers = Subscribers} = State) ->
  162. if ObserversAllowed ->
  163. MonRef = erlang:monitor(process, Pid),
  164. SubscrId = erlang:make_ref(),
  165. NewSubscribers = store_subscriber(SubscrId, Pid, UserId, undefined, MonRef, true, Subscribers),
  166. {reply, ok, State#state{subscribers = NewSubscribers}};
  167. true ->
  168. {reply, {error, observers_not_allowed}, State}
  169. end;
  170. handle_client_request({subscribe, Pid, UserId, PlayerId}, _From,
  171. #state{players = Players, subscribers = Subscribers,
  172. table = {TableMod, TablePid}} = State) ->
  173. gas:info(?MODULE,"RELAY_NG Subscription request from user ~p, PlayerId: <~p>", [UserId, PlayerId]),
  174. case find_player(PlayerId, Players) of
  175. {ok, #player{user_id = UserId, status = Status} = P} -> %% The user id is matched
  176. gas:info(?MODULE,"RELAY_NG User ~p is registered as player <~p>", [UserId, PlayerId]),
  177. gas:info(?MODULE,"RELAY_NG User ~p player info: ~p", [UserId, P]),
  178. MonRef = erlang:monitor(process, Pid),
  179. SubscrId = erlang:make_ref(),
  180. NewSubscribers = store_subscriber(SubscrId, Pid, UserId, PlayerId, MonRef,
  181. _BroadcastAllowed = false, Subscribers),
  182. NewPlayers = if Status == offline ->
  183. gas:info(?MODULE,"RELAY_NG Notifying the table about user ~p (<~p>).", [PlayerId, UserId]),
  184. TableMod:relay_message(TablePid, {player_connected, PlayerId}),
  185. update_player_status(PlayerId, online, Players);
  186. true ->
  187. gas:info(?MODULE,"RELAY_NG User ~p (<~p>) is already subscribed.", [PlayerId, UserId]),
  188. Players
  189. end,
  190. TableMod:relay_message(TablePid, {subscriber_added, PlayerId, SubscrId}),
  191. {reply, {ok, SubscrId}, State#state{players = NewPlayers, subscribers = NewSubscribers}};
  192. {ok, #player{}=P} ->
  193. gas:info(?MODULE,"RELAY_NG Subscription for user ~p rejected. There is another owner of the "
  194. "PlayerId <~p>: ~p", [UserId, PlayerId, P]),
  195. {reply, {error, not_player_id_owner}, State};
  196. error ->
  197. {reply, {error, unknown_player_id}, State}
  198. end;
  199. handle_client_request({unsubscribe, SubscrId}, _From,
  200. #state{subscribers = Subscribers, players = Players,
  201. table = {TableMod, TablePid}} = State) ->
  202. case get_subscriber(SubscrId, Subscribers) of
  203. error ->
  204. {reply, {error, not_subscribed}, State};
  205. {ok, #subscriber{id = SubscrId, mon_ref = MonRef, player_id = undefined}} ->
  206. erlang:demonitor(MonRef, [flush]),
  207. NewSubscribers = del_subscriber(SubscrId, Subscribers),
  208. {reply, ok, State#state{subscribers = NewSubscribers}};
  209. {ok, #subscriber{id = SubscrId, mon_ref = MonRef, player_id = PlayerId}} ->
  210. erlang:demonitor(MonRef, [flush]),
  211. NewSubscribers = del_subscriber(SubscrId, Subscribers),
  212. NewPlayers = case find_subscribers_by_player_id(PlayerId, Subscribers) of
  213. [] ->
  214. TableMod:relay_message(TablePid, {player_disconnected, PlayerId}),
  215. update_player_status(PlayerId, offline, Players);
  216. _ -> Players
  217. end,
  218. {reply, ok, State#state{subscribers = NewSubscribers,
  219. players = NewPlayers}}
  220. end;
  221. handle_client_request(_Request, _From, State) ->
  222. {reply, {error, unknown_request}, State}.
  223. %%===================================================================
  224. handle_table_request({register_player, UserId, PlayerId}, _From,
  225. #state{players = Players} = State) ->
  226. NewPlayers = store_player(PlayerId, UserId, offline, Players),
  227. {reply, ok, State#state{players = NewPlayers}};
  228. handle_table_request({unregister_player, PlayerId, Reason}, _From,
  229. #state{players = Players,
  230. subscribers = Subscribers} = State) ->
  231. NewPlayers = del_player(PlayerId, Players),
  232. UpdSubscribers = find_subscribers_by_player_id(PlayerId, Subscribers),
  233. F = fun(#subscriber{id = SubscrId, pid = Pid, mon_ref = MonRef}, Acc) ->
  234. erlang:demonitor(MonRef, [flush]),
  235. Pid ! {relay_kick, SubscrId, Reason},
  236. del_subscriber(SubscrId, Acc)
  237. end,
  238. NewSubscribers = lists:foldl(F, Subscribers, UpdSubscribers),
  239. {reply, ok, State#state{players = NewPlayers,
  240. subscribers = NewSubscribers}};
  241. handle_table_request(_Request, _From, State) ->
  242. Reply = ok,
  243. {reply, Reply, State}.
  244. %%===================================================================
  245. %% XXX: Unsecure because of spoofing posibility (a client session can send events by the
  246. %% name of the table). Legacy.
  247. handle_client_message({publish, Msg}, #state{subscribers = Subscribers} = State) ->
  248. Receipients = subscribers_to_list(Subscribers),
  249. [Pid ! {relay_event, SubscrId, Msg} ||
  250. #subscriber{id = SubscrId, pid = Pid, broadcast_allowed = true} <- Receipients],
  251. {noreply, State};
  252. handle_client_message(_Msg, State) ->
  253. {noreply, State}.
  254. %%===================================================================
  255. handle_table_message({publish, Msg}, #state{subscribers = Subscribers} = State) ->
  256. gas:info(?MODULE,"RELAY_NG The table publish message: ~p", [Msg]),
  257. Receipients = subscribers_to_list(Subscribers),
  258. [Pid ! {relay_event, SubscrId, Msg} ||
  259. #subscriber{id = SubscrId, pid = Pid, broadcast_allowed = true} <- Receipients],
  260. {noreply, State};
  261. handle_table_message({to_client, PlayerId, Msg}, #state{subscribers = Subscribers} = State) ->
  262. Recepients = find_subscribers_by_player_id(PlayerId, Subscribers),
  263. gas:info(?MODULE,"RELAY_NG Send table message to player's (~p) sessions: ~p. Message: ~p",
  264. [PlayerId, Recepients, Msg]),
  265. %handle_log(PlayerId,Players,Event,State),
  266. [Pid ! {relay_event, SubscrId, Msg} || #subscriber{id = SubscrId, pid = Pid} <- Recepients],
  267. {noreply, State};
  268. handle_table_message({to_subscriber, SubscrId, Msg}, #state{subscribers = Subscribers} = State) ->
  269. gas:info(?MODULE,"RELAY_NG Send table message to subscriber: ~p. Message: ~p", [SubscrId, Msg]),
  270. case get_subscriber(SubscrId, Subscribers) of
  271. {ok, #subscriber{pid = Pid}} -> Pid ! {relay_event, SubscrId, Msg};
  272. _ -> do_nothing
  273. end,
  274. {noreply, State};
  275. handle_table_message({allow_broadcast_for_player, PlayerId},
  276. #state{subscribers = Subscribers} = State) ->
  277. gas:info(?MODULE,"RELAY_NG Received directive to allow receiving published messages for player <~p>",
  278. [PlayerId]),
  279. PlSubscribers = find_subscribers_by_player_id(PlayerId, Subscribers),
  280. F = fun(Subscriber, Acc) ->
  281. store_subscriber_rec(Subscriber#subscriber{broadcast_allowed = true}, Acc)
  282. end,
  283. NewSubscribers = lists:foldl(F, Subscribers, PlSubscribers),
  284. {noreply, State#state{subscribers = NewSubscribers}};
  285. handle_table_message(stop, #state{subscribers = Subscribers} = State) ->
  286. [begin
  287. erlang:demonitor(MonRef, [flush]),
  288. Pid ! {relay_kick, SubscrId, table_closed}
  289. end || #subscriber{id = SubscrId, pid = Pid, mon_ref = MonRef}
  290. <- subscribers_to_list(Subscribers)],
  291. {stop, normal, State#state{subscribers = subscribers_init()}};
  292. handle_table_message(_Msg, State) ->
  293. {noreply, State}.
  294. %%===================================================================
  295. init_players(PlayersInfo) ->
  296. init_players(PlayersInfo, players_init()).
  297. init_players([], Players) ->
  298. Players;
  299. init_players([{PlayerId, UserId} | PlayersInfo], Players) ->
  300. NewPlayers = store_player(PlayerId, UserId, offline, Players),
  301. init_players(PlayersInfo, NewPlayers).
  302. %%===================================================================
  303. players_init() -> midict:new().
  304. store_player(PlayerId, UserId, Status, Players) ->
  305. store_player_rec(#player{id = PlayerId, user_id = UserId, status = Status}, Players).
  306. store_player_rec(#player{id = PlayerId, user_id = _UserId, status = _Status} = Rec, Players) ->
  307. midict:store(PlayerId, Rec, [], Players).
  308. del_player(PlayerId, Players) ->
  309. midict:erase(PlayerId, Players).
  310. fetch_player(PlayerId, Players) ->
  311. midict:fetch(PlayerId, Players).
  312. find_player(PlayerId, Players) ->
  313. midict:find(PlayerId, Players).
  314. update_player_status(PlayerId, Status, Players) ->
  315. Rec = fetch_player(PlayerId, Players),
  316. store_player_rec(Rec#player{status = Status}, Players).
  317. %%===================================================================
  318. subscribers_init() -> midict:new().
  319. store_subscriber(SubscrId, Pid, UserId, PlayerId, MonRef, BroadcastAllowed, Subscribers) ->
  320. store_subscriber_rec(#subscriber{id = SubscrId, pid = Pid, user_id = UserId,
  321. player_id = PlayerId, mon_ref = MonRef,
  322. broadcast_allowed = BroadcastAllowed}, Subscribers).
  323. store_subscriber_rec(#subscriber{id = SubscrId, pid = Pid, user_id = _UserId,
  324. player_id = PlayerId, mon_ref = MonRef} = Rec, Subscribers) ->
  325. midict:store(SubscrId, Rec, [{pid, Pid}, {player_id, PlayerId}, {mon_ref, MonRef}],
  326. Subscribers).
  327. %% del_subscriber(SubscrId, Subscribers) -> NewSubscribers
  328. del_subscriber(SubscrId, Subscribers) ->
  329. midict:erase(SubscrId, Subscribers).
  330. %% get_subscriber(Id, Subscribers) -> {ok, #subscriber{}} | error
  331. get_subscriber(Id, Subscribers) ->
  332. midict:find(Id, Subscribers).
  333. %% find_subscribers_by_player_id(PlayerId, Subscribers) -> list(#subscriber{})
  334. find_subscribers_by_player_id(PlayerId, Subscribers) ->
  335. midict:geti(PlayerId, player_id, Subscribers).
  336. %% find_subscribers_by_mon_ref(MonRef, Subscribers) -> list(#subscriber{})
  337. find_subscribers_by_mon_ref(MonRef, Subscribers) ->
  338. midict:geti(MonRef, mon_ref, Subscribers).
  339. %% subscribers_to_list(Subscribers) -> list(#subscriber{})
  340. subscribers_to_list(Subscribers) ->
  341. midict:all_values(Subscribers).