cowboy_spdy.erl 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587
  1. %% Copyright (c) 2013, Loïc Hoguin <essen@ninenines.eu>
  2. %%
  3. %% Permission to use, copy, modify, and/or distribute this software for any
  4. %% purpose with or without fee is hereby granted, provided that the above
  5. %% copyright notice and this permission notice appear in all copies.
  6. %%
  7. %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  8. %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  9. %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  10. %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  11. %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  12. %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  13. %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  14. %% @doc SPDY protocol handler.
  15. %%
  16. %% The available options are:
  17. %% <dl>
  18. %% </dl>
  19. %%
  20. %% Note that there is no need to monitor these processes when using Cowboy as
  21. %% an application as it already supervises them under the listener supervisor.
  22. -module(cowboy_spdy).
  23. %% API.
  24. -export([start_link/4]).
  25. %% Internal.
  26. -export([init/5]).
  27. -export([system_continue/3]).
  28. -export([system_terminate/4]).
  29. -export([system_code_change/4]).
  30. %% Internal request process.
  31. -export([request_init/9]).
  32. -export([resume/5]).
  33. -export([reply/4]).
  34. -export([stream_reply/3]).
  35. -export([stream_data/2]).
  36. -export([stream_close/1]).
  37. %% Internal transport functions.
  38. -export([name/0]).
  39. -export([send/2]).
  40. -export([sendfile/2]).
  41. -record(child, {
  42. streamid :: non_neg_integer(),
  43. pid :: pid(),
  44. input = nofin :: fin | nofin,
  45. output = nofin :: fin | nofin
  46. }).
  47. -record(state, {
  48. parent = undefined :: pid(),
  49. socket,
  50. transport,
  51. buffer = <<>> :: binary(),
  52. middlewares,
  53. env,
  54. onrequest,
  55. onresponse,
  56. peer,
  57. zdef,
  58. zinf,
  59. last_streamid = 0 :: non_neg_integer(),
  60. children = [] :: [#child{}]
  61. }).
  62. -record(special_headers, {
  63. method,
  64. path,
  65. version,
  66. host,
  67. scheme %% @todo We don't use it.
  68. }).
  69. -type opts() :: [].
  70. -export_type([opts/0]).
  71. -include("cowboy_spdy.hrl").
  72. %% API.
  73. %% @doc Start a SPDY protocol process.
  74. -spec start_link(any(), inet:socket(), module(), any()) -> {ok, pid()}.
  75. start_link(Ref, Socket, Transport, Opts) ->
  76. proc_lib:start_link(?MODULE, init,
  77. [self(), Ref, Socket, Transport, Opts]).
  78. %% Internal.
  79. %% @doc Faster alternative to proplists:get_value/3.
  80. %% @private
  81. get_value(Key, Opts, Default) ->
  82. case lists:keyfind(Key, 1, Opts) of
  83. {_, Value} -> Value;
  84. _ -> Default
  85. end.
  86. %% @private
  87. -spec init(pid(), ranch:ref(), inet:socket(), module(), opts()) -> ok.
  88. init(Parent, Ref, Socket, Transport, Opts) ->
  89. process_flag(trap_exit, true),
  90. ok = proc_lib:init_ack(Parent, {ok, self()}),
  91. {ok, Peer} = Transport:peername(Socket),
  92. Middlewares = get_value(middlewares, Opts, [cowboy_router, cowboy_handler]),
  93. Env = [{listener, Ref}|get_value(env, Opts, [])],
  94. OnRequest = get_value(onrequest, Opts, undefined),
  95. OnResponse = get_value(onresponse, Opts, undefined),
  96. Zdef = zlib:open(),
  97. ok = zlib:deflateInit(Zdef),
  98. _ = zlib:deflateSetDictionary(Zdef, ?ZDICT),
  99. Zinf = zlib:open(),
  100. ok = zlib:inflateInit(Zinf),
  101. ok = ranch:accept_ack(Ref),
  102. loop(#state{parent=Parent, socket=Socket, transport=Transport,
  103. middlewares=Middlewares, env=Env, onrequest=OnRequest,
  104. onresponse=OnResponse, peer=Peer, zdef=Zdef, zinf=Zinf}).
  105. loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
  106. buffer=Buffer, children=Children}) ->
  107. {OK, Closed, Error} = Transport:messages(),
  108. Transport:setopts(Socket, [{active, once}]),
  109. receive
  110. {OK, Socket, Data} ->
  111. Data2 = << Buffer/binary, Data/binary >>,
  112. case Data2 of
  113. << _:40, Length:24, _/bits >>
  114. when byte_size(Data2) >= Length + 8 ->
  115. Length2 = Length + 8,
  116. << Frame:Length2/binary, Rest/bits >> = Data2,
  117. control_frame(State#state{buffer=Rest}, Frame);
  118. Rest ->
  119. loop(State#state{buffer=Rest})
  120. end;
  121. {Closed, Socket} ->
  122. terminate(State);
  123. {Error, Socket, _Reason} ->
  124. terminate(State);
  125. {reply, {Pid, StreamID}, Status, Headers}
  126. when Pid =:= self() ->
  127. Child = #child{output=nofin} = lists:keyfind(StreamID,
  128. #child.streamid, Children),
  129. syn_reply(State, fin, StreamID, Status, Headers),
  130. Children2 = lists:keyreplace(StreamID,
  131. #child.streamid, Children, Child#child{output=fin}),
  132. loop(State#state{children=Children2});
  133. {reply, {Pid, StreamID}, Status, Headers, Body}
  134. when Pid =:= self() ->
  135. Child = #child{output=nofin} = lists:keyfind(StreamID,
  136. #child.streamid, Children),
  137. syn_reply(State, nofin, StreamID, Status, Headers),
  138. data(State, fin, StreamID, Body),
  139. Children2 = lists:keyreplace(StreamID,
  140. #child.streamid, Children, Child#child{output=fin}),
  141. loop(State#state{children=Children2});
  142. {stream_reply, {Pid, StreamID}, Status, Headers}
  143. when Pid =:= self() ->
  144. #child{output=nofin} = lists:keyfind(StreamID,
  145. #child.streamid, Children),
  146. syn_reply(State, nofin, StreamID, Status, Headers),
  147. loop(State);
  148. {stream_data, {Pid, StreamID}, Data}
  149. when Pid =:= self() ->
  150. #child{output=nofin} = lists:keyfind(StreamID,
  151. #child.streamid, Children),
  152. data(State, nofin, StreamID, Data),
  153. loop(State);
  154. {stream_close, {Pid, StreamID}}
  155. when Pid =:= self() ->
  156. Child = #child{output=nofin} = lists:keyfind(StreamID,
  157. #child.streamid, Children),
  158. data(State, fin, StreamID),
  159. Children2 = lists:keyreplace(StreamID,
  160. #child.streamid, Children, Child#child{output=fin}),
  161. loop(State#state{children=Children2});
  162. {sendfile, {Pid, StreamID}, Filepath}
  163. when Pid =:= self() ->
  164. Child = #child{output=nofin} = lists:keyfind(StreamID,
  165. #child.streamid, Children),
  166. data_from_file(State, StreamID, Filepath),
  167. Children2 = lists:keyreplace(StreamID,
  168. #child.streamid, Children, Child#child{output=fin}),
  169. loop(State#state{children=Children2});
  170. {'EXIT', Parent, Reason} ->
  171. exit(Reason);
  172. {'EXIT', Pid, _} ->
  173. Children2 = lists:keydelete(Pid, #child.pid, Children),
  174. loop(State#state{children=Children2});
  175. {system, From, Request} ->
  176. sys:handle_system_msg(Request, From, Parent, ?MODULE, [], State);
  177. %% Calls from the supervisor module.
  178. {'$gen_call', {To, Tag}, which_children} ->
  179. Children = [{?MODULE, Pid, worker, [?MODULE]}
  180. || #child{pid=Pid} <- Children],
  181. To ! {Tag, Children},
  182. loop(State);
  183. {'$gen_call', {To, Tag}, count_children} ->
  184. NbChildren = length(Children),
  185. Counts = [{specs, 1}, {active, NbChildren},
  186. {supervisors, 0}, {workers, NbChildren}],
  187. To ! {Tag, Counts},
  188. loop(State);
  189. {'$gen_call', {To, Tag}, _} ->
  190. To ! {Tag, {error, ?MODULE}},
  191. loop(State)
  192. after 60000 ->
  193. goaway(State, ok),
  194. terminate(State)
  195. end.
  196. system_continue(_, _, State) ->
  197. loop(State).
  198. -spec system_terminate(any(), _, _, _) -> no_return().
  199. system_terminate(Reason, _, _, _) ->
  200. exit(Reason).
  201. system_code_change(Misc, _, _, _) ->
  202. {ok, Misc}.
  203. %% We do not support SYN_STREAM with FLAG_UNIDIRECTIONAL set.
  204. control_frame(State, << 1:1, 3:15, 1:16, _:6, 1:1, _:26,
  205. StreamID:31, _/bits >>) ->
  206. rst_stream(State, StreamID, internal_error),
  207. loop(State);
  208. %% We do not support Associated-To-Stream-ID and CREDENTIAL Slot.
  209. control_frame(State, << 1:1, 3:15, 1:16, _:33, StreamID:31, _:1,
  210. AssocToStreamID:31, _:8, Slot:8, _/bits >>)
  211. when AssocToStreamID =/= 0; Slot =/= 0 ->
  212. rst_stream(State, StreamID, internal_error),
  213. loop(State);
  214. %% SYN_STREAM
  215. %%
  216. %% Erlang does not allow us to control the priority of processes
  217. %% so we ignore that value entirely.
  218. control_frame(State=#state{middlewares=Middlewares, env=Env,
  219. onrequest=OnRequest, onresponse=OnResponse, peer=Peer,
  220. zinf=Zinf, children=Children},
  221. << 1:1, 3:15, 1:16, Flags:8, _:25, StreamID:31,
  222. _:32, _Priority:3, _:13, Rest/bits >>) ->
  223. IsFin = case Flags of
  224. 1 -> fin;
  225. 0 -> nofin
  226. end,
  227. [<< NbHeaders:32, Rest2/bits >>] = try
  228. zlib:inflate(Zinf, Rest)
  229. catch _:_ ->
  230. ok = zlib:inflateSetDictionary(Zinf, ?ZDICT),
  231. zlib:inflate(Zinf, <<>>)
  232. end,
  233. case syn_stream_headers(Rest2, NbHeaders, [], #special_headers{}) of
  234. {ok, Headers, Special} ->
  235. Pid = spawn_link(?MODULE, request_init,
  236. [self(), StreamID, Peer, Headers,
  237. OnRequest, OnResponse, Env, Middlewares, Special]),
  238. loop(State#state{last_streamid=StreamID,
  239. children=[#child{streamid=StreamID, pid=Pid,
  240. input=IsFin, output=nofin}|Children]});
  241. {error, badname} ->
  242. rst_stream(State, StreamID, protocol_error),
  243. loop(State#state{last_streamid=StreamID});
  244. {error, special} ->
  245. rst_stream(State, StreamID, protocol_error),
  246. loop(State#state{last_streamid=StreamID})
  247. end;
  248. %% SYN_REPLY
  249. control_frame(State, << 1:1, 3:15, 2:16, _/bits >>) ->
  250. error_logger:error_msg("Ignored SYN_REPLY control frame~n"),
  251. loop(State);
  252. %% RST_STREAM
  253. control_frame(State, << 1:1, 3:15, 3:16, _Flags:8, _Length:24,
  254. _:1, _StreamID:31, StatusCode:32 >>) ->
  255. Status = case StatusCode of
  256. 1 -> protocol_error;
  257. 2 -> invalid_stream;
  258. 3 -> refused_stream;
  259. 4 -> unsupported_version;
  260. 5 -> cancel;
  261. 6 -> internal_error;
  262. 7 -> flow_control_error;
  263. 8 -> stream_in_use;
  264. 9 -> stream_already_closed;
  265. 10 -> invalid_credentials;
  266. 11 -> frame_too_large
  267. end,
  268. error_logger:error_msg("Received RST_STREAM control frame: ~p~n", [Status]),
  269. %% @todo Stop StreamID.
  270. loop(State);
  271. %% SETTINGS
  272. control_frame(State, << 1:1, 3:15, 4:16, 0:8, _:24,
  273. NbEntries:32, Rest/bits >>) ->
  274. Settings = [begin
  275. Name = case ID of
  276. 1 -> upload_bandwidth;
  277. 2 -> download_bandwidth;
  278. 3 -> round_trip_time;
  279. 4 -> max_concurrent_streams;
  280. 5 -> current_cwnd;
  281. 6 -> download_retrans_rate;
  282. 7 -> initial_window_size;
  283. 8 -> client_certificate_vector_size
  284. end,
  285. {Flags, Name, Value}
  286. end || << Flags:8, ID:24, Value:32 >> <= Rest],
  287. if
  288. NbEntries =/= length(Settings) ->
  289. goaway(State, protocol_error),
  290. terminate(State);
  291. true ->
  292. error_logger:error_msg("Ignored SETTINGS control frame: ~p~n",
  293. [Settings]),
  294. loop(State)
  295. end;
  296. %% PING initiated by the server; ignore, we don't send any
  297. control_frame(State, << 1:1, 3:15, 6:16, 0:8, 4:24, PingID:32 >>)
  298. when PingID rem 2 =:= 0 ->
  299. error_logger:error_msg("Ignored PING control frame: ~p~n", [PingID]),
  300. loop(State);
  301. %% PING initiated by the client; send it back
  302. control_frame(State=#state{socket=Socket, transport=Transport},
  303. Data = << 1:1, 3:15, 6:16, 0:8, 4:24, _:32 >>) ->
  304. Transport:send(Socket, Data),
  305. loop(State);
  306. %% GOAWAY
  307. control_frame(State, << 1:1, 3:15, 7:16, _/bits >>) ->
  308. error_logger:error_msg("Ignored GOAWAY control frame~n"),
  309. loop(State);
  310. %% HEADERS
  311. control_frame(State, << 1:1, 3:15, 8:16, _/bits >>) ->
  312. error_logger:error_msg("Ignored HEADERS control frame~n"),
  313. loop(State);
  314. %% WINDOW_UPDATE
  315. control_frame(State, << 1:1, 3:15, 9:16, 0:8, _/bits >>) ->
  316. error_logger:error_msg("Ignored WINDOW_UPDATE control frame~n"),
  317. loop(State);
  318. %% CREDENTIAL
  319. control_frame(State, << 1:1, 3:15, 10:16, _/bits >>) ->
  320. error_logger:error_msg("Ignored CREDENTIAL control frame~n"),
  321. loop(State);
  322. %% ???
  323. control_frame(State, _) ->
  324. goaway(State, protocol_error),
  325. terminate(State).
  326. %% @todo We must wait for the children to finish here,
  327. %% but only up to N milliseconds. Then we shutdown.
  328. terminate(_State) ->
  329. ok.
  330. syn_stream_headers(<<>>, 0, Acc, Special=#special_headers{
  331. method=Method, path=Path, version=Version, host=Host, scheme=Scheme}) ->
  332. if
  333. Method =:= undefined; Path =:= undefined; Version =:= undefined;
  334. Host =:= undefined; Scheme =:= undefined ->
  335. {error, special};
  336. true ->
  337. {ok, lists:reverse(Acc), Special}
  338. end;
  339. syn_stream_headers(<< 0:32, _Rest/bits >>, _NbHeaders, _Acc, _Special) ->
  340. {error, badname};
  341. syn_stream_headers(<< NameLen:32, Rest/bits >>, NbHeaders, Acc, Special) ->
  342. << Name:NameLen/binary, ValueLen:32, Rest2/bits >> = Rest,
  343. << Value:ValueLen/binary, Rest3/bits >> = Rest2,
  344. case Name of
  345. <<":host">> ->
  346. syn_stream_headers(Rest3, NbHeaders - 1,
  347. [{<<"host">>, Value}|Acc],
  348. Special#special_headers{host=Value});
  349. <<":method">> ->
  350. syn_stream_headers(Rest3, NbHeaders - 1, Acc,
  351. Special#special_headers{method=Value});
  352. <<":path">> ->
  353. syn_stream_headers(Rest3, NbHeaders - 1, Acc,
  354. Special#special_headers{path=Value});
  355. <<":version">> ->
  356. syn_stream_headers(Rest3, NbHeaders - 1, Acc,
  357. Special#special_headers{version=Value});
  358. <<":scheme">> ->
  359. syn_stream_headers(Rest3, NbHeaders - 1, Acc,
  360. Special#special_headers{scheme=Value});
  361. _ ->
  362. syn_stream_headers(Rest3, NbHeaders - 1,
  363. [{Name, Value}|Acc], Special)
  364. end.
  365. syn_reply(#state{socket=Socket, transport=Transport, zdef=Zdef},
  366. IsFin, StreamID, Status, Headers) ->
  367. Headers2 = [{<<":status">>, Status},
  368. {<<":version">>, <<"HTTP/1.1">>}|Headers],
  369. NbHeaders = length(Headers2),
  370. HeaderBlock = [begin
  371. NameLen = byte_size(Name),
  372. ValueLen = iolist_size(Value),
  373. [<< NameLen:32, Name/binary, ValueLen:32 >>, Value]
  374. end || {Name, Value} <- Headers2],
  375. HeaderBlock2 = [<< NbHeaders:32 >>, HeaderBlock],
  376. HeaderBlock3 = zlib:deflate(Zdef, HeaderBlock2, full),
  377. Flags = case IsFin of
  378. fin -> 1;
  379. nofin -> 0
  380. end,
  381. Len = 4 + iolist_size(HeaderBlock3),
  382. Transport:send(Socket, [
  383. << 1:1, 3:15, 2:16, Flags:8, Len:24, 0:1, StreamID:31 >>,
  384. HeaderBlock3]).
  385. rst_stream(#state{socket=Socket, transport=Transport}, StreamID, Status) ->
  386. StatusCode = case Status of
  387. protocol_error -> 1;
  388. %% invalid_stream -> 2;
  389. %% refused_stream -> 3;
  390. %% unsupported_version -> 4;
  391. %% cancel -> 5;
  392. internal_error -> 6
  393. %% flow_control_error -> 7;
  394. %% stream_in_use -> 8;
  395. %% stream_already_closed -> 9;
  396. %% invalid_credentials -> 10;
  397. %% frame_too_large -> 11
  398. end,
  399. Transport:send(Socket, << 1:1, 3:15, 3:16, 0:8, 8:24,
  400. 0:1, StreamID:31, StatusCode:32 >>).
  401. goaway(#state{socket=Socket, transport=Transport, last_streamid=LastStreamID},
  402. Status) ->
  403. StatusCode = case Status of
  404. ok -> 0;
  405. protocol_error -> 1
  406. %% internal_error -> 2
  407. end,
  408. Transport:send(Socket, << 1:1, 3:15, 7:16, 0:8, 8:24,
  409. 0:1, LastStreamID:31, StatusCode:32 >>).
  410. data(#state{socket=Socket, transport=Transport}, fin, StreamID) ->
  411. Transport:send(Socket, << 0:1, StreamID:31, 1:8, 0:24 >>).
  412. data(#state{socket=Socket, transport=Transport}, IsFin, StreamID, Data) ->
  413. Flags = case IsFin of
  414. fin -> 1;
  415. nofin -> 0
  416. end,
  417. Len = iolist_size(Data),
  418. Transport:send(Socket, [
  419. << 0:1, StreamID:31, Flags:8, Len:24 >>,
  420. Data]).
  421. data_from_file(#state{socket=Socket, transport=Transport},
  422. StreamID, Filepath) ->
  423. {ok, IoDevice} = file:open(Filepath, [read, binary, raw]),
  424. data_from_file(Socket, Transport, StreamID, IoDevice).
  425. data_from_file(Socket, Transport, StreamID, IoDevice) ->
  426. case file:read(IoDevice, 16#1fff) of
  427. eof ->
  428. _ = Transport:send(Socket, << 0:1, StreamID:31, 1:8, 0:24 >>),
  429. ok;
  430. {ok, Data} ->
  431. Len = byte_size(Data),
  432. Data2 = [<< 0:1, StreamID:31, 0:8, Len:24 >>, Data],
  433. case Transport:send(Socket, Data2) of
  434. ok ->
  435. data_from_file(Socket, Transport, StreamID, IoDevice);
  436. {error, _} ->
  437. ok
  438. end
  439. end.
  440. %% Request process.
  441. request_init(Parent, StreamID, Peer,
  442. Headers, OnRequest, OnResponse, Env, Middlewares,
  443. #special_headers{method=Method, path=Path, version=Version,
  444. host=Host}) ->
  445. Version2 = parse_version(Version),
  446. {Host2, Port} = cowboy_protocol:parse_host(Host, <<>>),
  447. {Path2, Query} = parse_path(Path, <<>>),
  448. Req = cowboy_req:new({Parent, StreamID}, ?MODULE, Peer,
  449. Method, Path2, Query, Version2, Headers,
  450. Host2, Port, <<>>, true, false, OnResponse),
  451. case OnRequest of
  452. undefined ->
  453. execute(Req, Env, Middlewares);
  454. _ ->
  455. Req2 = OnRequest(Req),
  456. case cowboy_req:get(resp_state, Req2) of
  457. waiting -> execute(Req2, Env, Middlewares);
  458. _ -> ok
  459. end
  460. end.
  461. parse_version(<<"HTTP/1.1">>) ->
  462. 'HTTP/1.1';
  463. parse_version(<<"HTTP/1.0">>) ->
  464. 'HTTP/1.0'.
  465. parse_path(<<>>, Path) ->
  466. {Path, <<>>};
  467. parse_path(<< $?, Rest/binary >>, Path) ->
  468. parse_query(Rest, Path, <<>>);
  469. parse_path(<< C, Rest/binary >>, SoFar) ->
  470. parse_path(Rest, << SoFar/binary, C >>).
  471. parse_query(<<>>, Path, Query) ->
  472. {Path, Query};
  473. parse_query(<< C, Rest/binary >>, Path, SoFar) ->
  474. parse_query(Rest, Path, << SoFar/binary, C >>).
  475. -spec execute(cowboy_req:req(), cowboy_middleware:env(), [module()])
  476. -> ok.
  477. execute(Req, _, []) ->
  478. cowboy_req:ensure_response(Req, 204);
  479. execute(Req, Env, [Middleware|Tail]) ->
  480. case Middleware:execute(Req, Env) of
  481. {ok, Req2, Env2} ->
  482. execute(Req2, Env2, Tail);
  483. {suspend, Module, Function, Args} ->
  484. erlang:hibernate(?MODULE, resume,
  485. [Env, Tail, Module, Function, Args]);
  486. {halt, Req2} ->
  487. cowboy_req:ensure_response(Req2, 204);
  488. {error, Code, Req2} ->
  489. error_terminate(Code, Req2)
  490. end.
  491. %% @private
  492. -spec resume(cowboy_middleware:env(), [module()],
  493. module(), module(), [any()]) -> ok.
  494. resume(Env, Tail, Module, Function, Args) ->
  495. case apply(Module, Function, Args) of
  496. {ok, Req2, Env2} ->
  497. execute(Req2, Env2, Tail);
  498. {suspend, Module2, Function2, Args2} ->
  499. erlang:hibernate(?MODULE, resume,
  500. [Env, Tail, Module2, Function2, Args2]);
  501. {halt, Req2} ->
  502. cowboy_req:ensure_response(Req2, 204);
  503. {error, Code, Req2} ->
  504. error_terminate(Code, Req2)
  505. end.
  506. %% Only send an error reply if there is no resp_sent message.
  507. -spec error_terminate(cowboy:http_status(), cowboy_req:req()) -> ok.
  508. error_terminate(Code, Req) ->
  509. receive
  510. {cowboy_req, resp_sent} -> ok
  511. after 0 ->
  512. _ = cowboy_req:reply(Code, Req),
  513. ok
  514. end.
  515. %% Reply functions used by cowboy_req.
  516. reply(Socket = {Pid, _}, Status, Headers, Body) ->
  517. _ = case iolist_size(Body) of
  518. 0 -> Pid ! {reply, Socket, Status, Headers};
  519. _ -> Pid ! {reply, Socket, Status, Headers, Body}
  520. end,
  521. ok.
  522. stream_reply(Socket = {Pid, _}, Status, Headers) ->
  523. _ = Pid ! {stream_reply, Socket, Status, Headers},
  524. ok.
  525. stream_data(Socket = {Pid, _}, Data) ->
  526. _ = Pid ! {stream_data, Socket, Data},
  527. ok.
  528. stream_close(Socket = {Pid, _}) ->
  529. _ = Pid ! {stream_close, Socket},
  530. ok.
  531. %% Internal transport functions.
  532. %% @todo recv
  533. name() ->
  534. spdy.
  535. send(Socket, Data) ->
  536. stream_data(Socket, Data).
  537. %% We don't wait for the result of the actual sendfile call,
  538. %% therefore we can't know how much was actually sent.
  539. sendfile(Socket = {Pid, _}, Filepath) ->
  540. _ = Pid ! {sendfile, Socket, Filepath},
  541. {ok, undefined}.