n2o_async.erl 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. -module(n2o_async).
  2. -author('Maxim Sokhatsky').
  3. -include_lib("n2o/include/wf.hrl").
  4. -behaviour(gen_server).
  5. -export([start_link/1]).
  6. -export([init/1,handle_call/3,handle_cast/2,handle_info/2,terminate/2,code_change/3]).
  7. -compile(export_all).
  8. % n2o_async API
  9. async(Fun) -> async(async,wf:temp_id(),Fun).
  10. async(Name, F) -> async(async,Name,F).
  11. async(Class,Name,F) ->
  12. Key = key(),
  13. Handler = #handler{module=?MODULE,class=async,group=n2o,
  14. name={Name,Key},config={F,?REQ},state=self()},
  15. case n2o_async:start(Handler) of
  16. {error,{already_started,P}} -> init(P,Class,{Name,Key}), {P,{Class,{Name,Key}}};
  17. {P,X} when is_pid(P) -> init(P,Class,X), {P,{Class,X}};
  18. Else -> Else end.
  19. init(Pid,Class,Name) when is_pid(Pid) -> wf:cache({Class,Name},Pid,infinity), send(Pid,{parent,self()}).
  20. send(Pid,Message) when is_pid(Pid) -> gen_server:call(Pid,Message);
  21. send(Name,Message) -> send(async,{Name,key()},Message).
  22. send(Class,Name,Message) -> gen_server:call(n2o_async:pid({Class,Name}),Message).
  23. pid({Class,Name}) -> wf:cache({Class,Name}).
  24. key() -> n2o_session:session_id().
  25. restart(Name) -> restart(async,{Name,key()}).
  26. restart(Class,Name) ->
  27. case stop(Class,Name) of #handler{}=Async -> start(Async); Error -> Error end.
  28. flush() -> A=wf:actions(), wf:actions([]), get(parent) ! {flush,A}.
  29. flush(Pool) -> A=wf:actions(), wf:actions([]), wf:send(Pool,{flush,A}).
  30. stop(Name) -> stop(async,{Name,key()}).
  31. stop(Class,Name) ->
  32. case n2o_async:pid({Class,Name}) of
  33. Pid when is_pid(Pid) ->
  34. #handler{group=Group} = Async = send(Pid,{get}),
  35. [ supervisor:F(Group,{Class,Name})||F<-[terminate_child,delete_child]],
  36. wf:cache({Class,Name},undefined), Async;
  37. Data -> {error,{not_pid,Data}} end.
  38. start(#handler{class=Class,name=Name,module=Module,group=Group} = Async) ->
  39. ChildSpec = {{Class,Name},{?MODULE,start_link,[Async]},transient,5000,worker,[Module]},
  40. wf:info(?MODULE,"Async Start Attempt ~p~n",[Async#handler{config=[]}]),
  41. case supervisor:start_child(Group,ChildSpec) of
  42. {ok,Pid} -> {Pid,Async#handler.name};
  43. {ok,Pid,_} -> {Pid,Async#handler.name};
  44. Else -> Else end.
  45. init_context(undefined) -> [];
  46. init_context(Req) ->
  47. Ctx = wf:init_context(Req),
  48. NewCtx = wf:fold(init, Ctx#cx.handlers, Ctx),
  49. wf:actions(NewCtx#cx.actions),
  50. wf:context(NewCtx),
  51. NewCtx.
  52. % Generic Async Server
  53. init(#handler{module=Mod,class=Class,name=Name}=Handler) -> wf:cache({Class,Name},self(),infinity), Mod:proc(init,Handler).
  54. handle_call({get},_,#handler{module=Mod}=Async) -> {reply,Async,Async};
  55. handle_call(Message,_,#handler{module=Mod}=Async) -> Mod:proc(Message,Async).
  56. handle_cast(Message, #handler{module=Mod}=Async) -> Mod:proc(Message,Async).
  57. handle_info(timeout, #handler{module=Mod}=Async) -> Mod:proc(timeout,Async);
  58. handle_info(Message, #handler{module=Mod}=Async) -> {noreply,element(3,Mod:proc(Message,Async))}.
  59. start_link(Parameters) -> gen_server:start_link(?MODULE, Parameters, []).
  60. code_change(_,State,_) -> {ok, State}.
  61. terminate(_Reason, #handler{name=Name,group=Group,class=Class}) ->
  62. spawn(fun() -> supervisor:delete_child(Group,{Class,Name}) end),
  63. wf:cache({Class,Name},undefined), ok.
  64. % wf:async page workers
  65. proc(init,#handler{class=Class,name=Name,config={F,Req},state=Parent}=Async) -> put(parent,Parent), try F(init) catch _:_ -> skip end, init_context(Req), {ok,Async};
  66. proc({parent,Parent},Async) -> {reply,put(parent,Parent),Async#handler{state=Parent}};
  67. proc(Message,#handler{config={F,Req}}=Async) -> {reply,F(Message),Async}.