|
@@ -1,74 +1,148 @@
|
|
|
-module(n4u_async).
|
|
|
--include_lib("n4u/include/n4u.hrl").
|
|
|
-behaviour(gen_server).
|
|
|
--export([start_link/1]).
|
|
|
--export([init/1,handle_call/3,handle_cast/2,handle_info/2,terminate/2,code_change/3]).
|
|
|
--compile([export_all, nowarn_export_all]).
|
|
|
-
|
|
|
-% n4u_async API
|
|
|
-
|
|
|
-async(Fun) -> async(async,wf:temp_id(),Fun).
|
|
|
-async(Name, F) -> async(async,Name,F).
|
|
|
-async(Class,Name,F) ->
|
|
|
- Key = key(),
|
|
|
- Handler = #handler{module=?MODULE,class=async,group=n2o,
|
|
|
- name={Name,Key},config={F,?REQ},state=self()},
|
|
|
- case ?MODULE:start(Handler) of
|
|
|
- {error,{already_started,P}} -> init(P,Class,{Name,Key}), {P,{Class,{Name,Key}}};
|
|
|
- {P,X} when is_pid(P) -> init(P,Class,X), {P,{Class,X}};
|
|
|
- Else -> Else end.
|
|
|
-
|
|
|
-init(Pid,Class,Name) when is_pid(Pid) -> wf:cache({Class,Name},Pid,infinity), send(Pid,{parent,self()}).
|
|
|
-send(Pid,Message) when is_pid(Pid) -> gen_server:call(Pid,Message);
|
|
|
-send(Name,Message) -> send(async,{Name,key()},Message).
|
|
|
-send(Class,Name,Message) -> gen_server:call(?MODULE:pid({Class,Name}),Message).
|
|
|
-pid({Class,Name}) -> wf:cache({Class,Name}).
|
|
|
-key() -> n2o_session:session_id().
|
|
|
-restart(Name) -> restart(async,{Name,key()}).
|
|
|
-restart(Class,Name) ->
|
|
|
- case stop(Class,Name) of #handler{}=Async -> start(Async); Error -> Error end.
|
|
|
-flush() -> A=wf:actions(), wf:actions([]), get(parent) ! {flush,A}.
|
|
|
-flush(Pool) -> A=wf:actions(), wf:actions([]), wf:send(Pool,{flush,A}).
|
|
|
-stop(Name) -> stop(async,{Name,key()}).
|
|
|
-stop(Class,Name) ->
|
|
|
- case ?MODULE:pid({Class,Name}) of
|
|
|
- Pid when is_pid(Pid) ->
|
|
|
- #handler{group=Group} = Async = send(Pid,{get}),
|
|
|
- [ supervisor:F(Group,{Class,Name})||F<-[terminate_child,delete_child]],
|
|
|
- wf:cache({Class,Name},undefined), Async;
|
|
|
- Data -> {error,{not_pid,Data}} end.
|
|
|
-start(#handler{class=Class,name=Name,module=Module,group=Group} = Async) ->
|
|
|
- ChildSpec = {{Class,Name},{?MODULE,start_link,[Async]},transient,5000,worker,[Module]},
|
|
|
- wf:info(?MODULE,"Async Start Attempt ~p~n",[Async#handler{config=[]}]),
|
|
|
- case supervisor:start_child(Group,ChildSpec) of
|
|
|
- {ok,Pid} -> {Pid,Async#handler.name};
|
|
|
- {ok,Pid,_} -> {Pid,Async#handler.name};
|
|
|
- Else -> Else end.
|
|
|
+% gen_server wrapper
|
|
|
+
|
|
|
+-include_lib("n4u/include/n4u.hrl").
|
|
|
+
|
|
|
+-export([start_link/1, init/1, handle_call/3, handle_cast/2,
|
|
|
+ handle_info/2, terminate/2, code_change/3]).
|
|
|
+
|
|
|
+-export([start/1, stop/1, stop/2, send/2, send/3, pid/1, restart/2,
|
|
|
+ async/1, async/2, async/3, init/3, flush/0, flush/1, init_context/1, proc/2]).
|
|
|
+
|
|
|
+
|
|
|
+start(#handler{class=Class, name=Name, module=Module, group=Group} = Async) ->
|
|
|
+ ChildSpec = {{Class, Name}, {?MODULE, start_link, [Async]},
|
|
|
+ transient, 5000, worker, [Module]},
|
|
|
+ wf:info(?MODULE, "Async Start Attempt ~p~n", [Async#handler{config=[]}]),
|
|
|
+ case supervisor:start_child(Group, ChildSpec) of
|
|
|
+ {ok, Pid} -> {Pid, Async#handler.name};
|
|
|
+ {ok, Pid, _} -> {Pid, Async#handler.name};
|
|
|
+ {error, Reason} = Error -> Error
|
|
|
+ end.
|
|
|
+
|
|
|
+
|
|
|
+stop(Name) -> stop(async, {Name, erlang:get(session_id)}).
|
|
|
+stop(Class, Name) ->
|
|
|
+ case ?MODULE:pid({Class, Name}) of
|
|
|
+ Pid when erlang:is_pid(Pid) ->
|
|
|
+ #handler{group=Group} = Async = send(Pid, {'get'}),
|
|
|
+
|
|
|
+ %[ supervisor:F(Group, {Class, Name}) || F <-[terminate_child, delete_child]],
|
|
|
+ supervisor:terminate_child(Group, {Class, Name}),
|
|
|
+ supervisor:delete_child(Group, {Class, Name}),
|
|
|
+
|
|
|
+ wf:cache({Class, Name}, undefined),
|
|
|
+ Async;
|
|
|
+ Data -> {error, {not_pid, Data}}
|
|
|
+ end.
|
|
|
+
|
|
|
+
|
|
|
+send(Pid, Message) when erlang:is_pid(Pid) -> gen_server:call(Pid, Message);
|
|
|
+send(Name, Message) -> send(async, {Name, erlang:get(session_id)}, Message).
|
|
|
+send(Class, Name, Message) -> gen_server:call(?MODULE:pid({Class, Name}), Message).
|
|
|
+
|
|
|
+
|
|
|
+pid({Class, Name}) -> wf:cache({Class, Name}).
|
|
|
+
|
|
|
+
|
|
|
+restart(Name) -> restart(async, {Name, erlang:get(session_id)}).
|
|
|
+restart(Class, Name) ->
|
|
|
+ case stop(Class, Name) of
|
|
|
+ #handler{}=Async -> start(Async);
|
|
|
+ Error -> Error
|
|
|
+ end.
|
|
|
+
|
|
|
+
|
|
|
+async(Fun) -> async(async, wf:temp_id(), Fun).
|
|
|
+async(Name, F) -> async(async, Name, F).
|
|
|
+async(Class, Name, F) ->
|
|
|
+ Key = erlang:get(session_id),
|
|
|
+ Handler = #handler{module=?MODULE, class=async, group=n4u_sup, name={Name, Key},
|
|
|
+ config={F, ?REQ}, state=erlang:self()},
|
|
|
+
|
|
|
+ case ?MODULE:start(Handler) of
|
|
|
+ {error, {already_started, P}} ->
|
|
|
+ init(P, Class, {Name, Key}),
|
|
|
+ {P, {Class, {Name, Key}}};
|
|
|
+
|
|
|
+ {P, X} when erlang:is_pid(P) ->
|
|
|
+ init(P, Class, X),
|
|
|
+ {P, {Class, X}};
|
|
|
+ Else -> Else
|
|
|
+ end.
|
|
|
+
|
|
|
+
|
|
|
+init(Pid, Class, Name) when erlang:is_pid(Pid) ->
|
|
|
+ wf:cache({Class, Name}, Pid, infinity),
|
|
|
+ send(Pid, {parent, erlang:self()}).
|
|
|
+
|
|
|
+
|
|
|
+flush() ->
|
|
|
+ A = erlang:get(actions),
|
|
|
+ erlang:put(actions, []),
|
|
|
+ erlang:get(parent) ! {flush, A}.
|
|
|
+
|
|
|
+flush(Pool) ->
|
|
|
+ A = erlang:get(actions),
|
|
|
+ erlang:put(actions, []),
|
|
|
+ wf:send(Pool, {flush, A}).
|
|
|
|
|
|
init_context(undefined) -> [];
|
|
|
init_context(Req) ->
|
|
|
- Ctx = wf:init_context(Req),
|
|
|
- NewCtx = wf:fold(init, Ctx#cx.handlers, Ctx),
|
|
|
- wf:actions(NewCtx#cx.actions),
|
|
|
- wf:context(NewCtx),
|
|
|
- NewCtx.
|
|
|
+ Ctx = n2o_cx:init_context(Req),
|
|
|
+ NewCtx = n2o_cx:fold(init, Ctx#cx.handlers, Ctx),
|
|
|
+ erlang:put(actions, NewCtx#cx.actions),
|
|
|
+ erlang:put(context, NewCtx),
|
|
|
+ NewCtx.
|
|
|
+
|
|
|
|
|
|
% Generic Async Server
|
|
|
|
|
|
-init(#handler{module=Mod,class=Class,name=Name}=Handler) -> wf:cache({Class,Name},self(),infinity), Mod:proc(init,Handler).
|
|
|
-handle_call({get},_,#handler{module= _Mod}=Async) -> {reply,Async,Async};
|
|
|
-handle_call(Message,_,#handler{module=Mod}=Async) -> Mod:proc(Message,Async).
|
|
|
-handle_cast(Message, #handler{module=Mod}=Async) -> Mod:proc(Message,Async).
|
|
|
-handle_info(timeout, #handler{module=Mod}=Async) -> Mod:proc(timeout,Async);
|
|
|
-handle_info(Message, #handler{module=Mod}=Async) -> {noreply,element(3,Mod:proc(Message,Async))}.
|
|
|
start_link(Parameters) -> gen_server:start_link(?MODULE, Parameters, []).
|
|
|
-code_change(_,State,_) -> {ok, State}.
|
|
|
-terminate(_Reason, #handler{name=Name,group=Group,class=Class}) ->
|
|
|
- spawn(fun() -> supervisor:delete_child(Group,{Class,Name}) end),
|
|
|
- wf:cache({Class,Name},undefined), ok.
|
|
|
+
|
|
|
+
|
|
|
+init(#handler{module=Mod, class=Class, name=Name}=Handler) ->
|
|
|
+ wf:cache({Class, Name}, erlang:self(), infinity),
|
|
|
+ Mod:proc(init, Handler).
|
|
|
+
|
|
|
+
|
|
|
+handle_call({'get'}, _, #handler{module= _Mod} = Async) -> {reply, Async, Async};
|
|
|
+handle_call(Message, _, #handler{module=Mod} = Async) -> Mod:proc(Message, Async).
|
|
|
+
|
|
|
+
|
|
|
+handle_cast(Message, #handler{module=Mod} = Async) -> Mod:proc(Message, Async).
|
|
|
+
|
|
|
+
|
|
|
+handle_info(timeout, #handler{module=Mod} = Async) -> Mod:proc(timeout, Async);
|
|
|
+handle_info(Message, #handler{module=Mod} = Async) -> {noreply, erlang:element(3, Mod:proc(Message, Async))}.
|
|
|
+
|
|
|
+
|
|
|
+terminate(_Reason, #handler{name=Name, group=Group, class=Class}) ->
|
|
|
+ erlang:spawn(fun() ->
|
|
|
+ supervisor:delete_child(Group, {Class, Name})
|
|
|
+ end),
|
|
|
+ wf:cache({Class, Name}, undefined),
|
|
|
+ ok.
|
|
|
+
|
|
|
+
|
|
|
+code_change(_, State, _) -> {ok, State}.
|
|
|
+
|
|
|
|
|
|
% wf:async page workers
|
|
|
|
|
|
-proc(init,#handler{class= _Class,name= _Name,config={F, Req},state=Parent}=Async) -> put(parent,Parent), try F(init) catch _:_ -> skip end, init_context(Req), {ok,Async};
|
|
|
-proc({parent,Parent},Async) -> {reply,put(parent,Parent),Async#handler{state=Parent}};
|
|
|
-proc(Message,#handler{config={F, _Req}}=Async) -> {reply,F(Message),Async}.
|
|
|
+proc(init, #handler{class= _Class, name= _Name, config={F, Req}, state=Parent}=Async) ->
|
|
|
+ erlang:put(parent, Parent),
|
|
|
+
|
|
|
+ try F(init)
|
|
|
+ catch _:_ -> skip
|
|
|
+ end,
|
|
|
+ init_context(Req),
|
|
|
+ {ok, Async};
|
|
|
+
|
|
|
+proc({parent, Parent}, Async) ->
|
|
|
+ {reply, erlang:put(parent, Parent), Async#handler{state=Parent}};
|
|
|
+
|
|
|
+proc(Message, #handler{config={F, _Req}} = Async) ->
|
|
|
+ {reply, F(Message), Async}.
|
|
|
+
|