n2o_file.erl 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. -module(n2o_file).
  2. -include_lib("n4u/include/n4u.hrl").
  3. -include_lib("kernel/include/file.hrl").
  4. -compile([export_all, nowarn_export_all]).
  5. -define(ROOT, wf:config(n2o,upload,code:priv_dir(n2o))).
  6. -define(NEXT, 256*1024). % 256K chunks for best 25MB/s speed
  7. -define(STOP, 0).
  8. % Callbacks
  9. filename(#ftp{sid=Sid,filename=FileName}) -> filename:join(wf:to_list(Sid),FileName).
  10. % N2O Protocols
  11. info(#ftp{status={event,_}}=FTP, Req, State) ->
  12. wf:info(?MODULE,"Event Message: ~p",[FTP#ftp{data= <<>>}]),
  13. Module=State#cx.module,
  14. Reply=try Module:event(FTP)
  15. catch E:R -> Error=wf:stack(E,R), wf:error(?MODULE,"Catch: ~p:~p~n~p",Error), Error end,
  16. {reply,wf:format({io,n4u_nitrogen:render_actions(wf:actions()),Reply}),Req,State};
  17. info(#ftp{id=Link,sid= _Sid,filename= _FileName,status= <<"init">>,block=Block,offset=Offset,size= _TotalSize}=FTP,Req,State) ->
  18. Root=?ROOT,
  19. RelPath=(wf:config(n2o,filename,n2o_file)):filename(FTP),
  20. FilePath=filename:join(Root,RelPath),
  21. ok=filelib:ensure_dir(FilePath),
  22. FileSize=case file:read_file_info(FilePath) of {ok,Fi} -> Fi#file_info.size; {error,_} -> 0 end,
  23. wf:info(?MODULE,"Info Init: ~p Offset: ~p Block: ~p~n",[FilePath,FileSize,Block]),
  24. % Name={Sid,filename:basename(FileName)},
  25. Block2=case Block of 0 -> ?STOP; _ -> ?NEXT end,
  26. Offset2=case FileSize >= Offset of true -> FileSize; false -> 0 end,
  27. FTP2=FTP#ftp{block=Block2,offset=Offset2,filename=RelPath,data= <<>>},
  28. n4u_async:stop(file,Link),
  29. n4u_async:start(#handler{module=?MODULE,class=file,group=n2o,state=FTP2,name=Link}),
  30. {reply,wf:format(FTP2),Req,State};
  31. info(#ftp{id=Link,sid= _Sid,filename= _FileName,status= <<"send">>}=FTP,Req,State) ->
  32. wf:info(?MODULE,"Info Send: ~p",[FTP#ftp{data= <<>>}]),
  33. Reply=try gen_server:call(n4u_async:pid({file,Link}),FTP)
  34. catch _E:_R -> wf:error(?MODULE,"Info Error call the sync: ~p~n",[FTP#ftp{data= <<>>}]),
  35. FTP#ftp{data= <<>>,block=?STOP} end,
  36. wf:info(?MODULE,"reply ~p",[Reply#ftp{data= <<>>}]),
  37. {reply,wf:format(Reply),Req,State};
  38. info(#ftp{status= <<"recv">>}=FTP,Req,State) -> {reply,wf:format(FTP),Req,State};
  39. info(#ftp{status= <<"relay">>}=FTP,Req,State) -> {reply,wf:format(FTP),Req, State};
  40. info(Message,Req,State) -> wf:info(?MODULE, "Info Unknown message: ~p",[Message]),
  41. {unknown,Message,Req,State}.
  42. % N2O Handlers
  43. proc(init,#handler{state=#ftp{sid=Sid}=FTP}=Async) ->
  44. wf:info(?MODULE,"Proc Init: ~p",[FTP#ftp{data= <<>>}]),
  45. wf:send(Sid,FTP#ftp{data= <<>>,status={event,init}}),
  46. {ok,Async};
  47. proc(#ftp{id=Link,sid=Sid,data=Data,filename= _FileName,status= <<"send">>,block=Block}=FTP,
  48. #handler{state=#ftp{data= _State,size=TotalSize,offset=Offset,filename=RelPath}}=Async) when Offset+Block >= TotalSize ->
  49. wf:info(?MODULE,"Proc Stop ~p, last piece size: ~p", [FTP#ftp{data= <<>>},byte_size(Data)]),
  50. case file:write_file(filename:join(?ROOT,RelPath),<<Data/binary>>,[append,raw]) of
  51. {error,Reason} -> {reply,{error,Reason},Async};
  52. ok ->
  53. FTP2=FTP#ftp{data= <<>>,block=?STOP},
  54. wf:send(Sid,FTP2#ftp{status={event,stop},filename=RelPath}),
  55. spawn(fun() -> n4u_async:stop(file,Link) end),
  56. {stop,normal,FTP2,Async#handler{state=FTP2}} end;
  57. proc(#ftp{sid= _Sid,data=Data,block=Block}=FTP,
  58. #handler{state=#ftp{data= _State,offset=Offset,filename=RelPath}}=Async) ->
  59. FTP2=FTP#ftp{status= <<"send">>,offset=Offset+Block },
  60. wf:info(?MODULE,"Proc Process ~p",[FTP2#ftp{data= <<>>}]),
  61. case file:write_file(filename:join(?ROOT,RelPath),<<Data/binary>>,[append,raw]) of
  62. {error,Reason} -> {reply,{error,Reason},Async};
  63. ok -> {reply,FTP2#ftp{data= <<>>},Async#handler{state=FTP2#ftp{filename=RelPath}}} end.