123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423 |
- %%% -------------------------------------------------------------------
- %%% Author : Sergei Polkovnikov <serge.polkovnikov@gmail.com>
- %%% Description : The server for retranslating events of the table.
- %%%
- %%% Created : Feb 25, 2013
- %%% -------------------------------------------------------------------
- %% Table -> Relay requests:
- %% {register_player, UserId, PlayerId} -> ok
- %% {unregister_player, PlayerId, Reason} -> ok
- %% Table -> Relay messages:
- %% {publish, Event}
- %% {to_client, PlayerId, Event}
- %% {to_subscriber, SubscrId, Event}
- %% {allow_broadcast_for_player, PlayerId}
- %% stop
- %% Relay -> Table notifications:
- %% {player_disconnected, PlayerId}
- %% {player_connected, PlayerId}
- %% {subscriber_added, PlayerId, SubscriptionId} - it's a hack to retrive current game state
- %% Subscriber -> Relay requests:
- %% {subscribe, Pid, UserId, RegNum} -> {ok, SubscriptionId} | {error, Reason}
- %% {unsubscribe, SubscriptionId} -> ok | {error, Reason}
- %% Relay -> Subscribers notifications:
- %% {relay_kick, SubscrId, Reason}
- %% {relay_event, SubscrId, Event}
- -module(relay_ng).
- -behaviour(gen_server).
- %% --------------------------------------------------------------------
- %% Include files
- %% --------------------------------------------------------------------
- -include_lib("server/include/basic_types.hrl").
- -include_lib("server/include/log.hrl").
- %% --------------------------------------------------------------------
- %% External exports
- -export([start/1, start_link/1, table_message/2, table_request/2]).
- -export([subscribe/4, unsubscribe/2, publish/2]).
- %% gen_server callbacks
- -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
- -record(state, {subscribers,
- players,
- observers_allowed :: boolean(),
- table :: {atom(), pid()},
- table_mon_ref :: reference()
- }).
- -record(subscriber,
- {id :: reference(),
- pid :: pid(),
- user_id :: binary(),
- player_id :: integer(),
- mon_ref :: reference(),
- broadcast_allowed :: boolean()
- }).
- -record(player,
- {id :: integer(),
- user_id :: binary(),
- status :: online | offline
- }).
- %% ====================================================================
- %% External functions
- %% ====================================================================
- start(Params) ->
- gen_server:start(?MODULE, [Params], []).
- start_link(Params) ->
- gen_server:start_link(?MODULE, [Params], []).
- subscribe(Relay, Pid, UserId, RegNum) ->
- client_request(Relay, {subscribe, Pid, UserId, RegNum}).
- unsubscribe(Relay, SubscriptionId) ->
- client_request(Relay, {unsubscribe, SubscriptionId}).
- publish(Relay, Message) ->
- client_message(Relay, {publish, Message}).
- table_message(Relay, Message) ->
- gen_server:cast(Relay, {table_message, Message}).
- table_request(Relay, Request) ->
- table_request(Relay, Request, 5000).
- table_request(Relay, Request, Timeout) ->
- gen_server:call(Relay, {table_request, Request}, Timeout).
- client_message(Relay, Message) ->
- gen_server:cast(Relay, {client_message, Message}).
- client_request(Relay, Request) ->
- client_request(Relay, Request, 5000).
- client_request(Relay, Request, Timeout) ->
- gen_server:call(Relay, {client_request, Request}, Timeout).
- %% ====================================================================
- %% Server functions
- %% ====================================================================
- %% --------------------------------------------------------------------
- init([Params]) ->
- PlayersInfo = proplists:get_value(players, Params),
- ObserversAllowed = proplists:get_value(observers_allowed, Params),
- Table = {_, TablePid} = proplists:get_value(table, Params),
- Players = init_players(PlayersInfo),
- MonRef = erlang:monitor(process, TablePid),
- {ok, #state{subscribers = subscribers_init(),
- players = Players,
- observers_allowed = ObserversAllowed,
- table = Table,
- table_mon_ref = MonRef}}.
- %% --------------------------------------------------------------------
- handle_call({client_request, Request}, From, State) ->
- handle_client_request(Request, From, State);
- handle_call({table_request, Request}, From, State) ->
- handle_table_request(Request, From, State);
- handle_call(_Request, _From, State) ->
- Reply = ok,
- {reply, Reply, State}.
- %% --------------------------------------------------------------------
- handle_cast({client_message, Msg}, State) ->
- gas:info(?MODULE,"RELAY_NG Received client message: ~p", [Msg]),
- handle_client_message(Msg, State);
- handle_cast({table_message, Msg}, State) ->
- gas:info(?MODULE,"RELAY_NG Received table message: ~p", [Msg]),
- handle_table_message(Msg, State);
- handle_cast(_Msg, State) ->
- {noreply, State}.
- %% --------------------------------------------------------------------
- handle_info({'DOWN', TableMonRef, process, _Pid, _Info},
- #state{subscribers = Subscribers,
- table_mon_ref = TableMonRef} = State) ->
- gas:info(?MODULE,"RELAY_NG All The parent table is down. "
- "Disconnecting all subscribers and sutting down.", []),
- [begin
- erlang:demonitor(MonRef, [flush]),
- Pid ! {relay_kick, SubscrId, table_down}
- end || #subscriber{id = SubscrId, pid = Pid, mon_ref = MonRef}
- <- subscribers_to_list(Subscribers)],
- {stop, normal, State};
- handle_info({'DOWN', MonRef, process, _Pid, _Info},
- #state{subscribers = Subscribers, players = Players,
- table = {TableMod, TablePid}} = State) ->
- case find_subscribers_by_mon_ref(MonRef, Subscribers) of
- [#subscriber{player_id = undefined, id = SubscrId}] ->
- NewSubscribers = del_subscriber(SubscrId, Subscribers),
- {noreply, State#state{subscribers = NewSubscribers}};
- [#subscriber{player_id = PlayerId, user_id = UserId, id = SubscrId}] ->
- NewSubscribers = del_subscriber(SubscrId, Subscribers),
- case find_subscribers_by_player_id(PlayerId, NewSubscribers) of
- [] ->
- gas:info(?MODULE,"RELAY_NG All sessions of player <~p> (~p) are closed. "
- "Sending the notification to the table.", [PlayerId, UserId]),
- NewPlayers = update_player_status(PlayerId, offline, Players),
- TableMod:relay_message(TablePid, {player_disconnected, PlayerId}),
- {noreply, State#state{subscribers = NewSubscribers, players = NewPlayers}};
- _ ->
- {noreply, State#state{subscribers = NewSubscribers}}
- end;
- [] ->
- {noreply, State}
- end;
- handle_info(_Info, State) ->
- {noreply, State}.
- %% --------------------------------------------------------------------
- terminate(_Reason, _State) ->
- ok.
- %% --------------------------------------------------------------------
- code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
- %% --------------------------------------------------------------------
- %%% Internal functions
- %% --------------------------------------------------------------------
- handle_client_request({subscribe, Pid, UserId, observer}, _From,
- #state{observers_allowed = ObserversAllowed,
- subscribers = Subscribers} = State) ->
- if ObserversAllowed ->
- MonRef = erlang:monitor(process, Pid),
- SubscrId = erlang:make_ref(),
- NewSubscribers = store_subscriber(SubscrId, Pid, UserId, undefined, MonRef, true, Subscribers),
- {reply, ok, State#state{subscribers = NewSubscribers}};
- true ->
- {reply, {error, observers_not_allowed}, State}
- end;
- handle_client_request({subscribe, Pid, UserId, PlayerId}, _From,
- #state{players = Players, subscribers = Subscribers,
- table = {TableMod, TablePid}} = State) ->
- gas:info(?MODULE,"RELAY_NG Subscription request from user ~p, PlayerId: <~p>", [UserId, PlayerId]),
- case find_player(PlayerId, Players) of
- {ok, #player{user_id = UserId, status = Status} = P} -> %% The user id is matched
- gas:info(?MODULE,"RELAY_NG User ~p is registered as player <~p>", [UserId, PlayerId]),
- gas:info(?MODULE,"RELAY_NG User ~p player info: ~p", [UserId, P]),
- MonRef = erlang:monitor(process, Pid),
- SubscrId = erlang:make_ref(),
- NewSubscribers = store_subscriber(SubscrId, Pid, UserId, PlayerId, MonRef,
- _BroadcastAllowed = false, Subscribers),
- NewPlayers = if Status == offline ->
- gas:info(?MODULE,"RELAY_NG Notifying the table about user ~p (<~p>).", [PlayerId, UserId]),
- TableMod:relay_message(TablePid, {player_connected, PlayerId}),
- update_player_status(PlayerId, online, Players);
- true ->
- gas:info(?MODULE,"RELAY_NG User ~p (<~p>) is already subscribed.", [PlayerId, UserId]),
- Players
- end,
- TableMod:relay_message(TablePid, {subscriber_added, PlayerId, SubscrId}),
- {reply, {ok, SubscrId}, State#state{players = NewPlayers, subscribers = NewSubscribers}};
- {ok, #player{}=P} ->
- gas:info(?MODULE,"RELAY_NG Subscription for user ~p rejected. There is another owner of the "
- "PlayerId <~p>: ~p", [UserId, PlayerId, P]),
- {reply, {error, not_player_id_owner}, State};
- error ->
- {reply, {error, unknown_player_id}, State}
- end;
- handle_client_request({unsubscribe, SubscrId}, _From,
- #state{subscribers = Subscribers, players = Players,
- table = {TableMod, TablePid}} = State) ->
- case get_subscriber(SubscrId, Subscribers) of
- error ->
- {reply, {error, not_subscribed}, State};
- {ok, #subscriber{id = SubscrId, mon_ref = MonRef, player_id = undefined}} ->
- erlang:demonitor(MonRef, [flush]),
- NewSubscribers = del_subscriber(SubscrId, Subscribers),
- {reply, ok, State#state{subscribers = NewSubscribers}};
- {ok, #subscriber{id = SubscrId, mon_ref = MonRef, player_id = PlayerId}} ->
- erlang:demonitor(MonRef, [flush]),
- NewSubscribers = del_subscriber(SubscrId, Subscribers),
- NewPlayers = case find_subscribers_by_player_id(PlayerId, Subscribers) of
- [] ->
- TableMod:relay_message(TablePid, {player_disconnected, PlayerId}),
- update_player_status(PlayerId, offline, Players);
- _ -> Players
- end,
- {reply, ok, State#state{subscribers = NewSubscribers,
- players = NewPlayers}}
- end;
- handle_client_request(_Request, _From, State) ->
- {reply, {error, unknown_request}, State}.
- %%===================================================================
- handle_table_request({register_player, UserId, PlayerId}, _From,
- #state{players = Players} = State) ->
- NewPlayers = store_player(PlayerId, UserId, offline, Players),
- {reply, ok, State#state{players = NewPlayers}};
- handle_table_request({unregister_player, PlayerId, Reason}, _From,
- #state{players = Players,
- subscribers = Subscribers} = State) ->
- NewPlayers = del_player(PlayerId, Players),
- UpdSubscribers = find_subscribers_by_player_id(PlayerId, Subscribers),
- F = fun(#subscriber{id = SubscrId, pid = Pid, mon_ref = MonRef}, Acc) ->
- erlang:demonitor(MonRef, [flush]),
- Pid ! {relay_kick, SubscrId, Reason},
- del_subscriber(SubscrId, Acc)
- end,
- NewSubscribers = lists:foldl(F, Subscribers, UpdSubscribers),
- {reply, ok, State#state{players = NewPlayers,
- subscribers = NewSubscribers}};
- handle_table_request(_Request, _From, State) ->
- Reply = ok,
- {reply, Reply, State}.
- %%===================================================================
- %% XXX: Unsecure because of spoofing posibility (a client session can send events by the
- %% name of the table). Legacy.
- handle_client_message({publish, Msg}, #state{subscribers = Subscribers} = State) ->
- Receipients = subscribers_to_list(Subscribers),
- [Pid ! {relay_event, SubscrId, Msg} ||
- #subscriber{id = SubscrId, pid = Pid, broadcast_allowed = true} <- Receipients],
- {noreply, State};
- handle_client_message(_Msg, State) ->
- {noreply, State}.
- %%===================================================================
- handle_table_message({publish, Msg}, #state{subscribers = Subscribers} = State) ->
- gas:info(?MODULE,"RELAY_NG The table publish message: ~p", [Msg]),
- Receipients = subscribers_to_list(Subscribers),
- [Pid ! {relay_event, SubscrId, Msg} ||
- #subscriber{id = SubscrId, pid = Pid, broadcast_allowed = true} <- Receipients],
- {noreply, State};
- handle_table_message({to_client, PlayerId, Msg}, #state{subscribers = Subscribers} = State) ->
- Recepients = find_subscribers_by_player_id(PlayerId, Subscribers),
- gas:info(?MODULE,"RELAY_NG Send table message to player's (~p) sessions: ~p. Message: ~p",
- [PlayerId, Recepients, Msg]),
- [Pid ! {relay_event, SubscrId, Msg} || #subscriber{id = SubscrId, pid = Pid} <- Recepients],
- {noreply, State};
- handle_table_message({to_subscriber, SubscrId, Msg}, #state{subscribers = Subscribers} = State) ->
- gas:info(?MODULE,"RELAY_NG Send table message to subscriber: ~p. Message: ~p", [SubscrId, Msg]),
- case get_subscriber(SubscrId, Subscribers) of
- {ok, #subscriber{pid = Pid}} -> Pid ! {relay_event, SubscrId, Msg};
- _ -> do_nothing
- end,
- {noreply, State};
- handle_table_message({allow_broadcast_for_player, PlayerId},
- #state{subscribers = Subscribers} = State) ->
- gas:info(?MODULE,"RELAY_NG Received directive to allow receiving published messages for player <~p>",
- [PlayerId]),
- PlSubscribers = find_subscribers_by_player_id(PlayerId, Subscribers),
- F = fun(Subscriber, Acc) ->
- store_subscriber_rec(Subscriber#subscriber{broadcast_allowed = true}, Acc)
- end,
- NewSubscribers = lists:foldl(F, Subscribers, PlSubscribers),
- {noreply, State#state{subscribers = NewSubscribers}};
- handle_table_message(stop, #state{subscribers = Subscribers} = State) ->
- [begin
- erlang:demonitor(MonRef, [flush]),
- Pid ! {relay_kick, SubscrId, table_closed}
- end || #subscriber{id = SubscrId, pid = Pid, mon_ref = MonRef}
- <- subscribers_to_list(Subscribers)],
- {stop, normal, State#state{subscribers = subscribers_init()}};
- handle_table_message(_Msg, State) ->
- {noreply, State}.
- %%===================================================================
- init_players(PlayersInfo) ->
- init_players(PlayersInfo, players_init()).
- init_players([], Players) ->
- Players;
- init_players([{PlayerId, UserId} | PlayersInfo], Players) ->
- NewPlayers = store_player(PlayerId, UserId, offline, Players),
- init_players(PlayersInfo, NewPlayers).
- %%===================================================================
- players_init() -> midict:new().
- store_player(PlayerId, UserId, Status, Players) ->
- store_player_rec(#player{id = PlayerId, user_id = UserId, status = Status}, Players).
- store_player_rec(#player{id = PlayerId, user_id = _UserId, status = _Status} = Rec, Players) ->
- midict:store(PlayerId, Rec, [], Players).
- del_player(PlayerId, Players) ->
- midict:erase(PlayerId, Players).
- fetch_player(PlayerId, Players) ->
- midict:fetch(PlayerId, Players).
- find_player(PlayerId, Players) ->
- midict:find(PlayerId, Players).
- update_player_status(PlayerId, Status, Players) ->
- Rec = fetch_player(PlayerId, Players),
- store_player_rec(Rec#player{status = Status}, Players).
- %%===================================================================
- subscribers_init() -> midict:new().
- store_subscriber(SubscrId, Pid, UserId, PlayerId, MonRef, BroadcastAllowed, Subscribers) ->
- store_subscriber_rec(#subscriber{id = SubscrId, pid = Pid, user_id = UserId,
- player_id = PlayerId, mon_ref = MonRef,
- broadcast_allowed = BroadcastAllowed}, Subscribers).
- store_subscriber_rec(#subscriber{id = SubscrId, pid = Pid, user_id = _UserId,
- player_id = PlayerId, mon_ref = MonRef} = Rec, Subscribers) ->
- midict:store(SubscrId, Rec, [{pid, Pid}, {player_id, PlayerId}, {mon_ref, MonRef}],
- Subscribers).
- %% del_subscriber(SubscrId, Subscribers) -> NewSubscribers
- del_subscriber(SubscrId, Subscribers) ->
- midict:erase(SubscrId, Subscribers).
- %% get_subscriber(Id, Subscribers) -> {ok, #subscriber{}} | error
- get_subscriber(Id, Subscribers) ->
- midict:find(Id, Subscribers).
- %% find_subscribers_by_player_id(PlayerId, Subscribers) -> list(#subscriber{})
- find_subscribers_by_player_id(PlayerId, Subscribers) ->
- midict:geti(PlayerId, player_id, Subscribers).
- %% find_subscribers_by_mon_ref(MonRef, Subscribers) -> list(#subscriber{})
- find_subscribers_by_mon_ref(MonRef, Subscribers) ->
- midict:geti(MonRef, mon_ref, Subscribers).
- %% subscribers_to_list(Subscribers) -> list(#subscriber{})
- subscribers_to_list(Subscribers) ->
- midict:all_values(Subscribers).
|