cowboy_spdy.erl 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505
  1. %% Copyright (c) 2013-2014, Loïc Hoguin <essen@ninenines.eu>
  2. %%
  3. %% Permission to use, copy, modify, and/or distribute this software for any
  4. %% purpose with or without fee is hereby granted, provided that the above
  5. %% copyright notice and this permission notice appear in all copies.
  6. %%
  7. %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  8. %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  9. %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  10. %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  11. %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  12. %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  13. %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  14. -module(cowboy_spdy).
  15. %% API.
  16. -export([start_link/4]).
  17. %% Internal.
  18. -export([init/5]).
  19. -export([system_continue/3]).
  20. -export([system_terminate/4]).
  21. -export([system_code_change/4]).
  22. %% Internal request process.
  23. -export([request_init/11]).
  24. -export([resume/5]).
  25. -export([reply/4]).
  26. -export([stream_reply/3]).
  27. -export([stream_data/2]).
  28. -export([stream_close/1]).
  29. %% Internal transport functions.
  30. -export([name/0]).
  31. -export([messages/0]).
  32. -export([recv/3]).
  33. -export([send/2]).
  34. -export([sendfile/2]).
  35. -export([setopts/2]).
  36. -type streamid() :: non_neg_integer().
  37. -type socket() :: {pid(), streamid()}.
  38. -record(child, {
  39. streamid :: streamid(),
  40. pid :: pid(),
  41. input = nofin :: fin | nofin,
  42. in_buffer = <<>> :: binary(),
  43. is_recv = false :: false | {active, socket(), pid()}
  44. | {passive, socket(), pid(), non_neg_integer(), reference()},
  45. output = nofin :: fin | nofin
  46. }).
  47. -record(state, {
  48. parent = undefined :: pid(),
  49. socket,
  50. transport,
  51. buffer = <<>> :: binary(),
  52. middlewares,
  53. env,
  54. onrequest,
  55. onresponse,
  56. peer,
  57. zdef,
  58. zinf,
  59. last_streamid = 0 :: streamid(),
  60. children = [] :: [#child{}]
  61. }).
  62. -type opts() :: [{env, cowboy_middleware:env()}
  63. | {middlewares, [module()]}
  64. | {onrequest, cowboy:onrequest_fun()}
  65. | {onresponse, cowboy:onresponse_fun()}].
  66. -export_type([opts/0]).
  67. %% API.
  68. -spec start_link(any(), inet:socket(), module(), any()) -> {ok, pid()}.
  69. start_link(Ref, Socket, Transport, Opts) ->
  70. proc_lib:start_link(?MODULE, init,
  71. [self(), Ref, Socket, Transport, Opts]).
  72. %% Internal.
  73. %% Faster alternative to proplists:get_value/3.
  74. get_value(Key, Opts, Default) ->
  75. case lists:keyfind(Key, 1, Opts) of
  76. {_, Value} -> Value;
  77. _ -> Default
  78. end.
  79. -spec init(pid(), ranch:ref(), inet:socket(), module(), opts()) -> ok.
  80. init(Parent, Ref, Socket, Transport, Opts) ->
  81. process_flag(trap_exit, true),
  82. ok = proc_lib:init_ack(Parent, {ok, self()}),
  83. {ok, Peer} = Transport:peername(Socket),
  84. Middlewares = get_value(middlewares, Opts, [cowboy_router, cowboy_handler]),
  85. Env = [{listener, Ref}|get_value(env, Opts, [])],
  86. OnRequest = get_value(onrequest, Opts, undefined),
  87. OnResponse = get_value(onresponse, Opts, undefined),
  88. Zdef = cow_spdy:deflate_init(),
  89. Zinf = cow_spdy:inflate_init(),
  90. ok = ranch:accept_ack(Ref),
  91. loop(#state{parent=Parent, socket=Socket, transport=Transport,
  92. middlewares=Middlewares, env=Env, onrequest=OnRequest,
  93. onresponse=OnResponse, peer=Peer, zdef=Zdef, zinf=Zinf}).
  94. loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
  95. buffer=Buffer, children=Children}) ->
  96. {OK, Closed, Error} = Transport:messages(),
  97. Transport:setopts(Socket, [{active, once}]),
  98. receive
  99. {OK, Socket, Data} ->
  100. parse_frame(State, << Buffer/binary, Data/binary >>);
  101. {Closed, Socket} ->
  102. terminate(State);
  103. {Error, Socket, _Reason} ->
  104. terminate(State);
  105. {recv, FromSocket = {Pid, StreamID}, FromPid, Length, Timeout}
  106. when Pid =:= self() ->
  107. Child = #child{in_buffer=InBuffer, is_recv=false}
  108. = get_child(StreamID, State),
  109. if
  110. Length =:= 0, InBuffer =/= <<>> ->
  111. FromPid ! {recv, FromSocket, {ok, InBuffer}},
  112. loop(replace_child(Child#child{in_buffer= <<>>}, State));
  113. byte_size(InBuffer) >= Length ->
  114. << Data:Length/binary, Rest/binary >> = InBuffer,
  115. FromPid ! {recv, FromSocket, {ok, Data}},
  116. loop(replace_child(Child#child{in_buffer=Rest}, State));
  117. true ->
  118. TRef = erlang:send_after(Timeout, self(),
  119. {recv_timeout, FromSocket}),
  120. loop(replace_child(Child#child{
  121. is_recv={passive, FromSocket, FromPid, Length, TRef}},
  122. State))
  123. end;
  124. {recv_timeout, {Pid, StreamID}}
  125. when Pid =:= self() ->
  126. Child = #child{is_recv={passive, FromSocket, FromPid, _, _}}
  127. = get_child(StreamID, State),
  128. FromPid ! {recv, FromSocket, {error, timeout}},
  129. loop(replace_child(Child, State));
  130. {reply, {Pid, StreamID}, Status, Headers}
  131. when Pid =:= self() ->
  132. Child = #child{output=nofin} = get_child(StreamID, State),
  133. syn_reply(State, StreamID, true, Status, Headers),
  134. loop(replace_child(Child#child{output=fin}, State));
  135. {reply, {Pid, StreamID}, Status, Headers, Body}
  136. when Pid =:= self() ->
  137. Child = #child{output=nofin} = get_child(StreamID, State),
  138. syn_reply(State, StreamID, false, Status, Headers),
  139. data(State, StreamID, true, Body),
  140. loop(replace_child(Child#child{output=fin}, State));
  141. {stream_reply, {Pid, StreamID}, Status, Headers}
  142. when Pid =:= self() ->
  143. #child{output=nofin} = get_child(StreamID, State),
  144. syn_reply(State, StreamID, false, Status, Headers),
  145. loop(State);
  146. {stream_data, {Pid, StreamID}, Data}
  147. when Pid =:= self() ->
  148. #child{output=nofin} = get_child(StreamID, State),
  149. data(State, StreamID, false, Data),
  150. loop(State);
  151. {stream_close, {Pid, StreamID}}
  152. when Pid =:= self() ->
  153. Child = #child{output=nofin} = get_child(StreamID, State),
  154. data(State, StreamID, true, <<>>),
  155. loop(replace_child(Child#child{output=fin}, State));
  156. {sendfile, {Pid, StreamID}, Filepath}
  157. when Pid =:= self() ->
  158. Child = #child{output=nofin} = get_child(StreamID, State),
  159. data_from_file(State, StreamID, Filepath),
  160. loop(replace_child(Child#child{output=fin}, State));
  161. {active, FromSocket = {Pid, StreamID}, FromPid} when Pid =:= self() ->
  162. Child = #child{in_buffer=InBuffer, is_recv=false}
  163. = get_child(StreamID, State),
  164. case InBuffer of
  165. <<>> ->
  166. loop(replace_child(Child#child{
  167. is_recv={active, FromSocket, FromPid}}, State));
  168. _ ->
  169. FromPid ! {spdy, FromSocket, InBuffer},
  170. loop(replace_child(Child#child{in_buffer= <<>>}, State))
  171. end;
  172. {passive, FromSocket = {Pid, StreamID}, FromPid} when Pid =:= self() ->
  173. Child = #child{is_recv=IsRecv} = get_child(StreamID, State),
  174. %% Make sure we aren't in the middle of a recv call.
  175. case IsRecv of false -> ok; {active, FromSocket, FromPid} -> ok end,
  176. loop(replace_child(Child#child{is_recv=false}, State));
  177. {'EXIT', Parent, Reason} ->
  178. exit(Reason);
  179. {'EXIT', Pid, _} ->
  180. %% @todo Report the error if any.
  181. loop(delete_child(Pid, State));
  182. {system, From, Request} ->
  183. sys:handle_system_msg(Request, From, Parent, ?MODULE, [], State);
  184. %% Calls from the supervisor module.
  185. {'$gen_call', {To, Tag}, which_children} ->
  186. Workers = [{?MODULE, Pid, worker, [?MODULE]}
  187. || #child{pid=Pid} <- Children],
  188. To ! {Tag, Workers},
  189. loop(State);
  190. {'$gen_call', {To, Tag}, count_children} ->
  191. NbChildren = length(Children),
  192. Counts = [{specs, 1}, {active, NbChildren},
  193. {supervisors, 0}, {workers, NbChildren}],
  194. To ! {Tag, Counts},
  195. loop(State);
  196. {'$gen_call', {To, Tag}, _} ->
  197. To ! {Tag, {error, ?MODULE}},
  198. loop(State)
  199. after 60000 ->
  200. goaway(State, ok),
  201. terminate(State)
  202. end.
  203. -spec system_continue(_, _, #state{}) -> ok.
  204. system_continue(_, _, State) ->
  205. loop(State).
  206. -spec system_terminate(any(), _, _, _) -> no_return().
  207. system_terminate(Reason, _, _, _) ->
  208. exit(Reason).
  209. -spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::#state{}.
  210. system_code_change(Misc, _, _, _) ->
  211. {ok, Misc}.
  212. parse_frame(State=#state{zinf=Zinf}, Data) ->
  213. case cow_spdy:split(Data) of
  214. {true, Frame, Rest} ->
  215. P = cow_spdy:parse(Frame, Zinf),
  216. case handle_frame(State#state{buffer = Rest}, P) of
  217. error ->
  218. terminate(State);
  219. State2 ->
  220. parse_frame(State2, Rest)
  221. end;
  222. false ->
  223. loop(State#state{buffer=Data})
  224. end.
  225. %% FLAG_UNIDIRECTIONAL can only be set by the server.
  226. handle_frame(State, {syn_stream, StreamID, _, _, true,
  227. _, _, _, _, _, _, _}) ->
  228. rst_stream(State, StreamID, protocol_error),
  229. State;
  230. %% We do not support Associated-To-Stream-ID.
  231. handle_frame(State, {syn_stream, StreamID, AssocToStreamID,
  232. _, _, _, _, _, _, _, _, _}) when AssocToStreamID =/= 0 ->
  233. rst_stream(State, StreamID, internal_error),
  234. State;
  235. %% SYN_STREAM.
  236. %%
  237. %% Erlang does not allow us to control the priority of processes
  238. %% so we ignore that value entirely.
  239. handle_frame(State=#state{middlewares=Middlewares, env=Env,
  240. onrequest=OnRequest, onresponse=OnResponse, peer=Peer},
  241. {syn_stream, StreamID, _, IsFin, _, _,
  242. Method, _, Host, Path, Version, Headers}) ->
  243. Pid = spawn_link(?MODULE, request_init, [
  244. {self(), StreamID}, Peer, OnRequest, OnResponse,
  245. Env, Middlewares, Method, Host, Path, Version, Headers
  246. ]),
  247. new_child(State, StreamID, Pid, IsFin);
  248. %% RST_STREAM.
  249. handle_frame(State, {rst_stream, StreamID, Status}) ->
  250. error_logger:error_msg("Received RST_STREAM frame ~p ~p",
  251. [StreamID, Status]),
  252. %% @todo Stop StreamID.
  253. State;
  254. %% PING initiated by the server; ignore, we don't send any.
  255. handle_frame(State, {ping, PingID}) when PingID rem 2 =:= 0 ->
  256. error_logger:error_msg("Ignored PING control frame: ~p~n", [PingID]),
  257. State;
  258. %% PING initiated by the client; send it back.
  259. handle_frame(State=#state{socket=Socket, transport=Transport},
  260. {ping, PingID}) ->
  261. Transport:send(Socket, cow_spdy:ping(PingID)),
  262. State;
  263. %% Data received for a stream.
  264. handle_frame(State, {data, StreamID, IsFin, Data}) ->
  265. Child = #child{input=nofin, in_buffer=Buffer, is_recv=IsRecv}
  266. = get_child(StreamID, State),
  267. Data2 = << Buffer/binary, Data/binary >>,
  268. IsFin2 = if IsFin -> fin; true -> nofin end,
  269. Child2 = case IsRecv of
  270. {active, FromSocket, FromPid} ->
  271. FromPid ! {spdy, FromSocket, Data},
  272. Child#child{input=IsFin2, is_recv=false};
  273. {passive, FromSocket, FromPid, 0, TRef} ->
  274. FromPid ! {recv, FromSocket, {ok, Data2}},
  275. cancel_recv_timeout(StreamID, TRef),
  276. Child#child{input=IsFin2, in_buffer= <<>>, is_recv=false};
  277. {passive, FromSocket, FromPid, Length, TRef}
  278. when byte_size(Data2) >= Length ->
  279. << Data3:Length/binary, Rest/binary >> = Data2,
  280. FromPid ! {recv, FromSocket, {ok, Data3}},
  281. cancel_recv_timeout(StreamID, TRef),
  282. Child#child{input=IsFin2, in_buffer=Rest, is_recv=false};
  283. _ ->
  284. Child#child{input=IsFin2, in_buffer=Data2}
  285. end,
  286. replace_child(Child2, State);
  287. %% General error, can't recover.
  288. handle_frame(State, {error, badprotocol}) ->
  289. goaway(State, protocol_error),
  290. error;
  291. %% Ignore all other frames for now.
  292. handle_frame(State, Frame) ->
  293. error_logger:error_msg("Ignored frame ~p", [Frame]),
  294. State.
  295. cancel_recv_timeout(StreamID, TRef) ->
  296. _ = erlang:cancel_timer(TRef),
  297. receive
  298. {recv_timeout, {Pid, StreamID}}
  299. when Pid =:= self() ->
  300. ok
  301. after 0 ->
  302. ok
  303. end.
  304. %% @todo We must wait for the children to finish here,
  305. %% but only up to N milliseconds. Then we shutdown.
  306. terminate(_State) ->
  307. ok.
  308. syn_reply(#state{socket=Socket, transport=Transport, zdef=Zdef},
  309. StreamID, IsFin, Status, Headers) ->
  310. Transport:send(Socket, cow_spdy:syn_reply(Zdef, StreamID, IsFin,
  311. Status, <<"HTTP/1.1">>, Headers)).
  312. rst_stream(#state{socket=Socket, transport=Transport}, StreamID, Status) ->
  313. Transport:send(Socket, cow_spdy:rst_stream(StreamID, Status)).
  314. goaway(#state{socket=Socket, transport=Transport, last_streamid=LastStreamID},
  315. Status) ->
  316. Transport:send(Socket, cow_spdy:goaway(LastStreamID, Status)).
  317. data(#state{socket=Socket, transport=Transport}, StreamID, IsFin, Data) ->
  318. Transport:send(Socket, cow_spdy:data(StreamID, IsFin, Data)).
  319. data_from_file(#state{socket=Socket, transport=Transport},
  320. StreamID, Filepath) ->
  321. {ok, IoDevice} = file:open(Filepath, [read, binary, raw]),
  322. data_from_file(Socket, Transport, StreamID, IoDevice).
  323. data_from_file(Socket, Transport, StreamID, IoDevice) ->
  324. case file:read(IoDevice, 16#1fff) of
  325. eof ->
  326. _ = Transport:send(Socket, cow_spdy:data(StreamID, true, <<>>)),
  327. ok;
  328. {ok, Data} ->
  329. case Transport:send(Socket, cow_spdy:data(StreamID, false, Data)) of
  330. ok ->
  331. data_from_file(Socket, Transport, StreamID, IoDevice);
  332. {error, _} ->
  333. ok
  334. end
  335. end.
  336. %% Children.
  337. new_child(State=#state{children=Children}, StreamID, Pid, IsFin) ->
  338. IsFin2 = if IsFin -> fin; true -> nofin end,
  339. State#state{last_streamid=StreamID,
  340. children=[#child{streamid=StreamID,
  341. pid=Pid, input=IsFin2}|Children]}.
  342. get_child(StreamID, #state{children=Children}) ->
  343. lists:keyfind(StreamID, #child.streamid, Children).
  344. replace_child(Child=#child{streamid=StreamID},
  345. State=#state{children=Children}) ->
  346. Children2 = lists:keyreplace(StreamID, #child.streamid, Children, Child),
  347. State#state{children=Children2}.
  348. delete_child(Pid, State=#state{children=Children}) ->
  349. Children2 = lists:keydelete(Pid, #child.pid, Children),
  350. State#state{children=Children2}.
  351. %% Request process.
  352. -spec request_init(socket(), {inet:ip_address(), inet:port_number()},
  353. cowboy:onrequest_fun(), cowboy:onresponse_fun(),
  354. cowboy_middleware:env(), [module()],
  355. binary(), binary(), binary(), binary(), [{binary(), binary()}])
  356. -> ok.
  357. request_init(FakeSocket, Peer, OnRequest, OnResponse,
  358. Env, Middlewares, Method, Host, Path, Version, Headers) ->
  359. {Host2, Port} = cow_http:parse_fullhost(Host),
  360. {Path2, Qs} = cow_http:parse_fullpath(Path),
  361. Version2 = cow_http:parse_version(Version),
  362. Req = cowboy_req:new(FakeSocket, ?MODULE, Peer,
  363. Method, Path2, Qs, Version2, Headers,
  364. Host2, Port, <<>>, true, false, OnResponse),
  365. case OnRequest of
  366. undefined ->
  367. execute(Req, Env, Middlewares);
  368. _ ->
  369. Req2 = OnRequest(Req),
  370. case cowboy_req:get(resp_state, Req2) of
  371. waiting -> execute(Req2, Env, Middlewares);
  372. _ -> ok
  373. end
  374. end.
  375. -spec execute(cowboy_req:req(), cowboy_middleware:env(), [module()])
  376. -> ok.
  377. execute(Req, _, []) ->
  378. cowboy_req:ensure_response(Req, 204);
  379. execute(Req, Env, [Middleware|Tail]) ->
  380. case Middleware:execute(Req, Env) of
  381. {ok, Req2, Env2} ->
  382. execute(Req2, Env2, Tail);
  383. {suspend, Module, Function, Args} ->
  384. erlang:hibernate(?MODULE, resume,
  385. [Env, Tail, Module, Function, Args]);
  386. {halt, Req2} ->
  387. cowboy_req:ensure_response(Req2, 204);
  388. {error, Status, Req2} ->
  389. cowboy_req:maybe_reply(Status, Req2)
  390. end.
  391. -spec resume(cowboy_middleware:env(), [module()],
  392. module(), module(), [any()]) -> ok.
  393. resume(Env, Tail, Module, Function, Args) ->
  394. case apply(Module, Function, Args) of
  395. {ok, Req2, Env2} ->
  396. execute(Req2, Env2, Tail);
  397. {suspend, Module2, Function2, Args2} ->
  398. erlang:hibernate(?MODULE, resume,
  399. [Env, Tail, Module2, Function2, Args2]);
  400. {halt, Req2} ->
  401. cowboy_req:ensure_response(Req2, 204);
  402. {error, Status, Req2} ->
  403. cowboy_req:maybe_reply(Status, Req2)
  404. end.
  405. %% Reply functions used by cowboy_req.
  406. -spec reply(socket(), binary(), cowboy:http_headers(), iodata()) -> ok.
  407. reply(Socket = {Pid, _}, Status, Headers, Body) ->
  408. _ = case iolist_size(Body) of
  409. 0 -> Pid ! {reply, Socket, Status, Headers};
  410. _ -> Pid ! {reply, Socket, Status, Headers, Body}
  411. end,
  412. ok.
  413. -spec stream_reply(socket(), binary(), cowboy:http_headers()) -> ok.
  414. stream_reply(Socket = {Pid, _}, Status, Headers) ->
  415. _ = Pid ! {stream_reply, Socket, Status, Headers},
  416. ok.
  417. -spec stream_data(socket(), iodata()) -> ok.
  418. stream_data(Socket = {Pid, _}, Data) ->
  419. _ = Pid ! {stream_data, Socket, Data},
  420. ok.
  421. -spec stream_close(socket()) -> ok.
  422. stream_close(Socket = {Pid, _}) ->
  423. _ = Pid ! {stream_close, Socket},
  424. ok.
  425. %% Internal transport functions.
  426. -spec name() -> spdy.
  427. name() ->
  428. spdy.
  429. -spec messages() -> {spdy, spdy_closed, spdy_error}.
  430. messages() ->
  431. {spdy, spdy_closed, spdy_error}.
  432. -spec recv(socket(), non_neg_integer(), timeout())
  433. -> {ok, binary()} | {error, timeout}.
  434. recv(Socket = {Pid, _}, Length, Timeout) ->
  435. _ = Pid ! {recv, Socket, self(), Length, Timeout},
  436. receive
  437. {recv, Socket, Ret} ->
  438. Ret
  439. end.
  440. -spec send(socket(), iodata()) -> ok.
  441. send(Socket, Data) ->
  442. stream_data(Socket, Data).
  443. %% We don't wait for the result of the actual sendfile call,
  444. %% therefore we can't know how much was actually sent.
  445. %% This isn't a problem as we don't use this value in Cowboy.
  446. -spec sendfile(socket(), file:name_all()) -> {ok, undefined}.
  447. sendfile(Socket = {Pid, _}, Filepath) ->
  448. _ = Pid ! {sendfile, Socket, Filepath},
  449. {ok, undefined}.
  450. -spec setopts({pid(), _}, list()) -> ok.
  451. setopts(Socket = {Pid, _}, [{active, once}]) ->
  452. _ = Pid ! {active, Socket, self()},
  453. ok;
  454. setopts(Socket = {Pid, _}, [{active, false}]) ->
  455. _ = Pid ! {passive, Socket, self()},
  456. ok.