cowboy_websocket.erl 32 KB


  1. %% Copyright (c) 2011-2013, Loïc Hoguin <essen@ninenines.eu>
  2. %%
  3. %% Permission to use, copy, modify, and/or distribute this software for any
  4. %% purpose with or without fee is hereby granted, provided that the above
  5. %% copyright notice and this permission notice appear in all copies.
  6. %%
  7. %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  8. %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  9. %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  10. %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  11. %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  12. %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  13. %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  14. %% @doc Websocket protocol implementation.
  15. %%
  16. %% Cowboy supports versions 7 through 17 of the Websocket drafts.
  17. %% It also supports RFC6455, the proposed standard for Websocket.
  18. -module(cowboy_websocket).
  19. -behaviour(cowboy_sub_protocol).
  20. %% Ignore the deprecation warning for crypto:sha/1.
  21. %% @todo Remove when we support only R16B+.
  22. -compile(nowarn_deprecated_function).
  23. %% API.
  24. -export([upgrade/4]).
  25. %% Internal.
  26. -export([handler_loop/4]).
  27. -type close_code() :: 1000..4999.
  28. -export_type([close_code/0]).
  29. -type frame() :: close | ping | pong
  30. | {text | binary | close | ping | pong, iodata()}
  31. | {close, close_code(), iodata()}.
  32. -export_type([frame/0]).
  33. -type opcode() :: 0 | 1 | 2 | 8 | 9 | 10.
  34. -type mask_key() :: 0..16#ffffffff.
  35. -type frag_state() :: undefined
  36. | {nofin, opcode(), binary()} | {fin, opcode(), binary()}.
  37. -type rsv() :: << _:3 >>.
  38. -record(state, {
  39. env :: cowboy_middleware:env(),
  40. socket = undefined :: inet:socket(),
  41. transport = undefined :: module(),
  42. handler :: module(),
  43. key = undefined :: undefined | binary(),
  44. timeout = infinity :: timeout(),
  45. timeout_ref = undefined :: undefined | reference(),
  46. messages = undefined :: undefined | {atom(), atom(), atom()},
  47. hibernate = false :: boolean(),
  48. frag_state = undefined :: frag_state(),
  49. utf8_state = <<>> :: binary(),
  50. deflate_frame = false :: boolean(),
  51. inflate_state :: undefined | port(),
  52. deflate_state :: undefined | port()
  53. }).
  54. %% @doc Upgrade an HTTP request to the Websocket protocol.
  55. %%
  56. %% You do not need to call this function manually. To upgrade to the Websocket
  57. %% protocol, you simply need to return <em>{upgrade, protocol, {@module}}</em>
  58. %% in your <em>cowboy_http_handler:init/3</em> handler function.
  59. -spec upgrade(Req, Env, module(), any())
  60. -> {ok, Req, Env} | {error, 400, Req}
  61. | {suspend, module(), atom(), [any()]}
  62. when Req::cowboy_req:req(), Env::cowboy_middleware:env().
  63. upgrade(Req, Env, Handler, HandlerOpts) ->
  64. {_, Ref} = lists:keyfind(listener, 1, Env),
  65. ranch:remove_connection(Ref),
  66. [Socket, Transport] = cowboy_req:get([socket, transport], Req),
  67. State = #state{env=Env, socket=Socket, transport=Transport,
  68. handler=Handler},
  69. try websocket_upgrade(State, Req) of
  70. {ok, State2, Req2} ->
  71. handler_init(State2, Req2, HandlerOpts)
  72. catch _:_ ->
  73. cowboy_req:maybe_reply(400, Req),
  74. exit(normal)
  75. end.
  76. -spec websocket_upgrade(#state{}, Req)
  77. -> {ok, #state{}, Req} when Req::cowboy_req:req().
  78. websocket_upgrade(State, Req) ->
  79. {ok, ConnTokens, Req2}
  80. = cowboy_req:parse_header(<<"connection">>, Req),
  81. true = lists:member(<<"upgrade">>, ConnTokens),
  82. %% @todo Should probably send a 426 if the Upgrade header is missing.
  83. {ok, [<<"websocket">>], Req3}
  84. = cowboy_req:parse_header(<<"upgrade">>, Req2),
  85. {Version, Req4} = cowboy_req:header(<<"sec-websocket-version">>, Req3),
  86. IntVersion = list_to_integer(binary_to_list(Version)),
  87. true = (IntVersion =:= 7) orelse (IntVersion =:= 8)
  88. orelse (IntVersion =:= 13),
  89. {Key, Req5} = cowboy_req:header(<<"sec-websocket-key">>, Req4),
  90. false = Key =:= undefined,
  91. websocket_extensions(State#state{key=Key},
  92. cowboy_req:set_meta(websocket_version, IntVersion, Req5)).
  93. -spec websocket_extensions(#state{}, Req)
  94. -> {ok, #state{}, Req} when Req::cowboy_req:req().
  95. websocket_extensions(State, Req) ->
  96. case cowboy_req:parse_header(<<"sec-websocket-extensions">>, Req) of
  97. {ok, Extensions, Req2} when Extensions =/= undefined ->
  98. [Compress] = cowboy_req:get([resp_compress], Req),
  99. case lists:keyfind(<<"x-webkit-deflate-frame">>, 1, Extensions) of
  100. {<<"x-webkit-deflate-frame">>, []} when Compress =:= true ->
  101. Inflate = zlib:open(),
  102. Deflate = zlib:open(),
  103. % Since we are negotiating an unconstrained deflate-frame
  104. % then we must be willing to accept frames using the
  105. % maximum window size which is 2^15. The negative value
  106. % indicates that zlib headers are not used.
  107. ok = zlib:inflateInit(Inflate, -15),
  108. % Initialize the deflater with a window size of 2^15 bits and disable
  109. % the zlib headers.
  110. ok = zlib:deflateInit(Deflate, best_compression, deflated, -15, 8, default),
  111. {ok, State#state{
  112. deflate_frame = true,
  113. inflate_state = Inflate,
  114. deflate_state = Deflate
  115. }, cowboy_req:set_meta(websocket_compress, true, Req2)};
  116. _ ->
  117. {ok, State, cowboy_req:set_meta(websocket_compress, false, Req2)}
  118. end;
  119. _ ->
  120. {ok, State, cowboy_req:set_meta(websocket_compress, false, Req)}
  121. end.
  122. -spec handler_init(#state{}, Req, any())
  123. -> {ok, Req, cowboy_middleware:env()} | {error, 400, Req}
  124. | {suspend, module(), atom(), [any()]}
  125. when Req::cowboy_req:req().
  126. handler_init(State=#state{env=Env, transport=Transport,
  127. handler=Handler}, Req, HandlerOpts) ->
  128. try Handler:websocket_init(Transport:name(), Req, HandlerOpts) of
  129. {ok, Req2, HandlerState} ->
  130. websocket_handshake(State, Req2, HandlerState);
  131. {ok, Req2, HandlerState, hibernate} ->
  132. websocket_handshake(State#state{hibernate=true},
  133. Req2, HandlerState);
  134. {ok, Req2, HandlerState, Timeout} ->
  135. websocket_handshake(State#state{timeout=Timeout},
  136. Req2, HandlerState);
  137. {ok, Req2, HandlerState, Timeout, hibernate} ->
  138. websocket_handshake(State#state{timeout=Timeout,
  139. hibernate=true}, Req2, HandlerState);
  140. {shutdown, Req2} ->
  141. cowboy_req:ensure_response(Req2, 400),
  142. {ok, Req2, [{result, closed}|Env]}
  143. catch Class:Reason ->
  144. cowboy_req:maybe_reply(400, Req),
  145. erlang:Class([
  146. {reason, Reason},
  147. {mfa, {Handler, websocket_init, 3}},
  148. {stacktrace, erlang:get_stacktrace()},
  149. {req, cowboy_req:to_list(Req)},
  150. {opts, HandlerOpts}
  151. ])
  152. end.
  153. -spec websocket_handshake(#state{}, Req, any())
  154. -> {ok, Req, cowboy_middleware:env()}
  155. | {suspend, module(), atom(), [any()]}
  156. when Req::cowboy_req:req().
  157. websocket_handshake(State=#state{
  158. transport=Transport, key=Key, deflate_frame=DeflateFrame},
  159. Req, HandlerState) ->
  160. %% @todo Change into crypto:hash/2 for R17B+ or when supporting only R16B+.
  161. Challenge = base64:encode(crypto:sha(
  162. << Key/binary, "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" >>)),
  163. Extensions = case DeflateFrame of
  164. false -> [];
  165. true -> [{<<"sec-websocket-extensions">>, <<"x-webkit-deflate-frame">>}]
  166. end,
  167. {ok, Req2} = cowboy_req:upgrade_reply(
  168. 101,
  169. [{<<"upgrade">>, <<"websocket">>},
  170. {<<"sec-websocket-accept">>, Challenge}|
  171. Extensions],
  172. Req),
  173. %% Flush the resp_sent message before moving on.
  174. receive {cowboy_req, resp_sent} -> ok after 0 -> ok end,
  175. State2 = handler_loop_timeout(State),
  176. handler_before_loop(State2#state{key=undefined,
  177. messages=Transport:messages()}, Req2, HandlerState, <<>>).
  178. -spec handler_before_loop(#state{}, Req, any(), binary())
  179. -> {ok, Req, cowboy_middleware:env()}
  180. | {suspend, module(), atom(), [any()]}
  181. when Req::cowboy_req:req().
  182. handler_before_loop(State=#state{
  183. socket=Socket, transport=Transport, hibernate=true},
  184. Req, HandlerState, SoFar) ->
  185. Transport:setopts(Socket, [{active, once}]),
  186. {suspend, ?MODULE, handler_loop,
  187. [State#state{hibernate=false}, Req, HandlerState, SoFar]};
  188. handler_before_loop(State=#state{socket=Socket, transport=Transport},
  189. Req, HandlerState, SoFar) ->
  190. Transport:setopts(Socket, [{active, once}]),
  191. handler_loop(State, Req, HandlerState, SoFar).
  192. -spec handler_loop_timeout(#state{}) -> #state{}.
  193. handler_loop_timeout(State=#state{timeout=infinity}) ->
  194. State#state{timeout_ref=undefined};
  195. handler_loop_timeout(State=#state{timeout=Timeout, timeout_ref=PrevRef}) ->
  196. _ = case PrevRef of undefined -> ignore; PrevRef ->
  197. erlang:cancel_timer(PrevRef) end,
  198. TRef = erlang:start_timer(Timeout, self(), ?MODULE),
  199. State#state{timeout_ref=TRef}.
  200. %% @private
  201. -spec handler_loop(#state{}, Req, any(), binary())
  202. -> {ok, Req, cowboy_middleware:env()}
  203. | {suspend, module(), atom(), [any()]}
  204. when Req::cowboy_req:req().
  205. handler_loop(State=#state{socket=Socket, messages={OK, Closed, Error},
  206. timeout_ref=TRef}, Req, HandlerState, SoFar) ->
  207. receive
  208. {OK, Socket, Data} ->
  209. State2 = handler_loop_timeout(State),
  210. websocket_data(State2, Req, HandlerState,
  211. << SoFar/binary, Data/binary >>);
  212. {Closed, Socket} ->
  213. handler_terminate(State, Req, HandlerState, {error, closed});
  214. {Error, Socket, Reason} ->
  215. handler_terminate(State, Req, HandlerState, {error, Reason});
  216. {timeout, TRef, ?MODULE} ->
  217. websocket_close(State, Req, HandlerState, {normal, timeout});
  218. {timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) ->
  219. handler_loop(State, Req, HandlerState, SoFar);
  220. Message ->
  221. handler_call(State, Req, HandlerState,
  222. SoFar, websocket_info, Message, fun handler_before_loop/4)
  223. end.
  224. %% All frames passing through this function are considered valid,
  225. %% with the only exception of text and close frames with a payload
  226. %% which may still contain errors.
  227. -spec websocket_data(#state{}, Req, any(), binary())
  228. -> {ok, Req, cowboy_middleware:env()}
  229. | {suspend, module(), atom(), [any()]}
  230. when Req::cowboy_req:req().
  231. %% RSV bits MUST be 0 unless an extension is negotiated
  232. %% that defines meanings for non-zero values.
  233. websocket_data(State, Req, HandlerState, << _:1, Rsv:3, _/bits >>)
  234. when Rsv =/= 0, State#state.deflate_frame =:= false ->
  235. websocket_close(State, Req, HandlerState, {error, badframe});
  236. %% Invalid opcode. Note that these opcodes may be used by extensions.
  237. websocket_data(State, Req, HandlerState, << _:4, Opcode:4, _/bits >>)
  238. when Opcode > 2, Opcode =/= 8, Opcode =/= 9, Opcode =/= 10 ->
  239. websocket_close(State, Req, HandlerState, {error, badframe});
  240. %% Control frames MUST NOT be fragmented.
  241. websocket_data(State, Req, HandlerState, << 0:1, _:3, Opcode:4, _/bits >>)
  242. when Opcode >= 8 ->
  243. websocket_close(State, Req, HandlerState, {error, badframe});
  244. %% A frame MUST NOT use the zero opcode unless fragmentation was initiated.
  245. websocket_data(State=#state{frag_state=undefined}, Req, HandlerState,
  246. << _:4, 0:4, _/bits >>) ->
  247. websocket_close(State, Req, HandlerState, {error, badframe});
  248. %% Non-control opcode when expecting control message or next fragment.
  249. websocket_data(State=#state{frag_state={nofin, _, _}}, Req, HandlerState,
  250. << _:4, Opcode:4, _/bits >>)
  251. when Opcode =/= 0, Opcode < 8 ->
  252. websocket_close(State, Req, HandlerState, {error, badframe});
  253. %% Close control frame length MUST be 0 or >= 2.
  254. websocket_data(State, Req, HandlerState, << _:4, 8:4, _:1, 1:7, _/bits >>) ->
  255. websocket_close(State, Req, HandlerState, {error, badframe});
  256. %% Close control frame with incomplete close code. Need more data.
  257. websocket_data(State, Req, HandlerState,
  258. Data = << _:4, 8:4, 1:1, Len:7, _/bits >>)
  259. when Len > 1, byte_size(Data) < 8 ->
  260. handler_before_loop(State, Req, HandlerState, Data);
  261. %% 7 bits payload length.
  262. websocket_data(State, Req, HandlerState, << Fin:1, Rsv:3/bits, Opcode:4, 1:1,
  263. Len:7, MaskKey:32, Rest/bits >>)
  264. when Len < 126 ->
  265. websocket_data(State, Req, HandlerState,
  266. Opcode, Len, MaskKey, Rest, Rsv, Fin);
  267. %% 16 bits payload length.
  268. websocket_data(State, Req, HandlerState, << Fin:1, Rsv:3/bits, Opcode:4, 1:1,
  269. 126:7, Len:16, MaskKey:32, Rest/bits >>)
  270. when Len > 125, Opcode < 8 ->
  271. websocket_data(State, Req, HandlerState,
  272. Opcode, Len, MaskKey, Rest, Rsv, Fin);
  273. %% 63 bits payload length.
  274. websocket_data(State, Req, HandlerState, << Fin:1, Rsv:3/bits, Opcode:4, 1:1,
  275. 127:7, 0:1, Len:63, MaskKey:32, Rest/bits >>)
  276. when Len > 16#ffff, Opcode < 8 ->
  277. websocket_data(State, Req, HandlerState,
  278. Opcode, Len, MaskKey, Rest, Rsv, Fin);
  279. %% When payload length is over 63 bits, the most significant bit MUST be 0.
  280. websocket_data(State, Req, HandlerState, << _:8, 1:1, 127:7, 1:1, _:7, _/binary >>) ->
  281. websocket_close(State, Req, HandlerState, {error, badframe});
  282. %% All frames sent from the client to the server are masked.
  283. websocket_data(State, Req, HandlerState, << _:8, 0:1, _/bits >>) ->
  284. websocket_close(State, Req, HandlerState, {error, badframe});
  285. %% For the next two clauses, it can be one of the following:
  286. %%
  287. %% * The minimal number of bytes MUST be used to encode the length
  288. %% * All control frames MUST have a payload length of 125 bytes or less
  289. websocket_data(State, Req, HandlerState, << _:9, 126:7, _:48, _/bits >>) ->
  290. websocket_close(State, Req, HandlerState, {error, badframe});
  291. websocket_data(State, Req, HandlerState, << _:9, 127:7, _:96, _/bits >>) ->
  292. websocket_close(State, Req, HandlerState, {error, badframe});
  293. %% Need more data.
  294. websocket_data(State, Req, HandlerState, Data) ->
  295. handler_before_loop(State, Req, HandlerState, Data).
  296. %% Initialize or update fragmentation state.
  297. -spec websocket_data(#state{}, Req, any(),
  298. opcode(), non_neg_integer(), mask_key(), binary(), rsv(), 0 | 1)
  299. -> {ok, Req, cowboy_middleware:env()}
  300. | {suspend, module(), atom(), [any()]}
  301. when Req::cowboy_req:req().
  302. %% The opcode is only included in the first frame fragment.
  303. websocket_data(State=#state{frag_state=undefined}, Req, HandlerState,
  304. Opcode, Len, MaskKey, Data, Rsv, 0) ->
  305. websocket_payload(State#state{frag_state={nofin, Opcode, <<>>}},
  306. Req, HandlerState, 0, Len, MaskKey, <<>>, 0, Data, Rsv);
  307. %% Subsequent frame fragments.
  308. websocket_data(State=#state{frag_state={nofin, _, _}}, Req, HandlerState,
  309. 0, Len, MaskKey, Data, Rsv, 0) ->
  310. websocket_payload(State, Req, HandlerState,
  311. 0, Len, MaskKey, <<>>, 0, Data, Rsv);
  312. %% Final frame fragment.
  313. websocket_data(State=#state{frag_state={nofin, Opcode, SoFar}},
  314. Req, HandlerState, 0, Len, MaskKey, Data, Rsv, 1) ->
  315. websocket_payload(State#state{frag_state={fin, Opcode, SoFar}},
  316. Req, HandlerState, 0, Len, MaskKey, <<>>, 0, Data, Rsv);
  317. %% Unfragmented frame.
  318. websocket_data(State, Req, HandlerState, Opcode, Len, MaskKey, Data, Rsv, 1) ->
  319. websocket_payload(State, Req, HandlerState,
  320. Opcode, Len, MaskKey, <<>>, 0, Data, Rsv).
  321. -spec websocket_payload(#state{}, Req, any(),
  322. opcode(), non_neg_integer(), mask_key(), binary(), non_neg_integer(),
  323. binary(), rsv())
  324. -> {ok, Req, cowboy_middleware:env()}
  325. | {suspend, module(), atom(), [any()]}
  326. when Req::cowboy_req:req().
  327. %% Close control frames with a payload MUST contain a valid close code.
  328. websocket_payload(State, Req, HandlerState,
  329. Opcode=8, Len, MaskKey, <<>>, 0,
  330. << MaskedCode:2/binary, Rest/bits >>, Rsv) ->
  331. Unmasked = << Code:16 >> = websocket_unmask(MaskedCode, MaskKey, <<>>),
  332. if Code < 1000; Code =:= 1004; Code =:= 1005; Code =:= 1006;
  333. (Code > 1011) and (Code < 3000); Code > 4999 ->
  334. websocket_close(State, Req, HandlerState, {error, badframe});
  335. true ->
  336. websocket_payload(State, Req, HandlerState,
  337. Opcode, Len - 2, MaskKey, Unmasked, byte_size(MaskedCode),
  338. Rest, Rsv)
  339. end;
  340. %% Text frames and close control frames MUST have a payload that is valid UTF-8.
  341. websocket_payload(State=#state{utf8_state=Incomplete},
  342. Req, HandlerState, Opcode, Len, MaskKey, Unmasked, UnmaskedLen,
  343. Data, Rsv)
  344. when (byte_size(Data) < Len) andalso ((Opcode =:= 1) orelse
  345. ((Opcode =:= 8) andalso (Unmasked =/= <<>>))) ->
  346. Unmasked2 = websocket_unmask(Data,
  347. rotate_mask_key(MaskKey, UnmaskedLen), <<>>),
  348. {Unmasked3, State2} = websocket_inflate_frame(Unmasked2, Rsv, false, State),
  349. case is_utf8(<< Incomplete/binary, Unmasked3/binary >>) of
  350. false ->
  351. websocket_close(State2, Req, HandlerState, {error, badencoding});
  352. Utf8State ->
  353. websocket_payload_loop(State2#state{utf8_state=Utf8State},
  354. Req, HandlerState, Opcode, Len - byte_size(Data), MaskKey,
  355. << Unmasked/binary, Unmasked3/binary >>,
  356. UnmaskedLen + byte_size(Data), Rsv)
  357. end;
  358. websocket_payload(State=#state{utf8_state=Incomplete},
  359. Req, HandlerState, Opcode, Len, MaskKey, Unmasked, UnmaskedLen,
  360. Data, Rsv)
  361. when Opcode =:= 1; (Opcode =:= 8) and (Unmasked =/= <<>>) ->
  362. << End:Len/binary, Rest/bits >> = Data,
  363. Unmasked2 = websocket_unmask(End,
  364. rotate_mask_key(MaskKey, UnmaskedLen), <<>>),
  365. {Unmasked3, State2} = websocket_inflate_frame(Unmasked2, Rsv, true, State),
  366. case is_utf8(<< Incomplete/binary, Unmasked3/binary >>) of
  367. <<>> ->
  368. websocket_dispatch(State2#state{utf8_state= <<>>},
  369. Req, HandlerState, Rest, Opcode,
  370. << Unmasked/binary, Unmasked3/binary >>);
  371. _ ->
  372. websocket_close(State2, Req, HandlerState, {error, badencoding})
  373. end;
  374. %% Fragmented text frames may cut payload in the middle of UTF-8 codepoints.
  375. websocket_payload(State=#state{frag_state={_, 1, _}, utf8_state=Incomplete},
  376. Req, HandlerState, Opcode=0, Len, MaskKey, Unmasked, UnmaskedLen,
  377. Data, Rsv)
  378. when byte_size(Data) < Len ->
  379. Unmasked2 = websocket_unmask(Data,
  380. rotate_mask_key(MaskKey, UnmaskedLen), <<>>),
  381. {Unmasked3, State2} = websocket_inflate_frame(Unmasked2, Rsv, false, State),
  382. case is_utf8(<< Incomplete/binary, Unmasked3/binary >>) of
  383. false ->
  384. websocket_close(State2, Req, HandlerState, {error, badencoding});
  385. Utf8State ->
  386. websocket_payload_loop(State2#state{utf8_state=Utf8State},
  387. Req, HandlerState, Opcode, Len - byte_size(Data), MaskKey,
  388. << Unmasked/binary, Unmasked3/binary >>,
  389. UnmaskedLen + byte_size(Data), Rsv)
  390. end;
  391. websocket_payload(State=#state{frag_state={Fin, 1, _}, utf8_state=Incomplete},
  392. Req, HandlerState, Opcode=0, Len, MaskKey, Unmasked, UnmaskedLen,
  393. Data, Rsv) ->
  394. << End:Len/binary, Rest/bits >> = Data,
  395. Unmasked2 = websocket_unmask(End,
  396. rotate_mask_key(MaskKey, UnmaskedLen), <<>>),
  397. {Unmasked3, State2} = websocket_inflate_frame(Unmasked2, Rsv, Fin =:= fin, State),
  398. case is_utf8(<< Incomplete/binary, Unmasked3/binary >>) of
  399. <<>> ->
  400. websocket_dispatch(State2#state{utf8_state= <<>>},
  401. Req, HandlerState, Rest, Opcode,
  402. << Unmasked/binary, Unmasked3/binary >>);
  403. Utf8State when is_binary(Utf8State), Fin =:= nofin ->
  404. websocket_dispatch(State2#state{utf8_state=Utf8State},
  405. Req, HandlerState, Rest, Opcode,
  406. << Unmasked/binary, Unmasked3/binary >>);
  407. _ ->
  408. websocket_close(State, Req, HandlerState, {error, badencoding})
  409. end;
  410. %% Other frames have a binary payload.
  411. websocket_payload(State, Req, HandlerState,
  412. Opcode, Len, MaskKey, Unmasked, UnmaskedLen, Data, Rsv)
  413. when byte_size(Data) < Len ->
  414. Unmasked2 = websocket_unmask(Data,
  415. rotate_mask_key(MaskKey, UnmaskedLen), <<>>),
  416. {Unmasked3, State2} = websocket_inflate_frame(Unmasked2, Rsv, false, State),
  417. websocket_payload_loop(State2, Req, HandlerState,
  418. Opcode, Len - byte_size(Data), MaskKey,
  419. << Unmasked/binary, Unmasked3/binary >>, UnmaskedLen + byte_size(Data),
  420. Rsv);
  421. websocket_payload(State, Req, HandlerState,
  422. Opcode, Len, MaskKey, Unmasked, UnmaskedLen, Data, Rsv) ->
  423. << End:Len/binary, Rest/bits >> = Data,
  424. Unmasked2 = websocket_unmask(End,
  425. rotate_mask_key(MaskKey, UnmaskedLen), <<>>),
  426. {Unmasked3, State2} = websocket_inflate_frame(Unmasked2, Rsv, true, State),
  427. websocket_dispatch(State2, Req, HandlerState, Rest, Opcode,
  428. << Unmasked/binary, Unmasked3/binary >>).
  429. -spec websocket_inflate_frame(binary(), rsv(), boolean(), #state{}) ->
  430. {binary(), #state{}}.
  431. websocket_inflate_frame(Data, << Rsv1:1, _:2 >>, _,
  432. #state{deflate_frame = DeflateFrame} = State)
  433. when DeflateFrame =:= false orelse Rsv1 =:= 0 ->
  434. {Data, State};
  435. websocket_inflate_frame(Data, << 1:1, _:2 >>, false, State) ->
  436. Result = zlib:inflate(State#state.inflate_state, Data),
  437. {iolist_to_binary(Result), State};
  438. websocket_inflate_frame(Data, << 1:1, _:2 >>, true, State) ->
  439. Result = zlib:inflate(State#state.inflate_state,
  440. << Data/binary, 0:8, 0:8, 255:8, 255:8 >>),
  441. {iolist_to_binary(Result), State}.
  442. -spec websocket_unmask(B, mask_key(), B) -> B when B::binary().
  443. websocket_unmask(<<>>, _, Unmasked) ->
  444. Unmasked;
  445. websocket_unmask(<< O:32, Rest/bits >>, MaskKey, Acc) ->
  446. T = O bxor MaskKey,
  447. websocket_unmask(Rest, MaskKey, << Acc/binary, T:32 >>);
  448. websocket_unmask(<< O:24 >>, MaskKey, Acc) ->
  449. << MaskKey2:24, _:8 >> = << MaskKey:32 >>,
  450. T = O bxor MaskKey2,
  451. << Acc/binary, T:24 >>;
  452. websocket_unmask(<< O:16 >>, MaskKey, Acc) ->
  453. << MaskKey2:16, _:16 >> = << MaskKey:32 >>,
  454. T = O bxor MaskKey2,
  455. << Acc/binary, T:16 >>;
  456. websocket_unmask(<< O:8 >>, MaskKey, Acc) ->
  457. << MaskKey2:8, _:24 >> = << MaskKey:32 >>,
  458. T = O bxor MaskKey2,
  459. << Acc/binary, T:8 >>.
  460. %% Because we unmask on the fly we need to continue from the right mask byte.
  461. -spec rotate_mask_key(mask_key(), non_neg_integer()) -> mask_key().
  462. rotate_mask_key(MaskKey, UnmaskedLen) ->
  463. Left = UnmaskedLen rem 4,
  464. Right = 4 - Left,
  465. (MaskKey bsl (Left * 8)) + (MaskKey bsr (Right * 8)).
  466. %% Returns <<>> if the argument is valid UTF-8, false if not,
  467. %% or the incomplete part of the argument if we need more data.
  468. -spec is_utf8(binary()) -> false | binary().
  469. is_utf8(Valid = <<>>) ->
  470. Valid;
  471. is_utf8(<< _/utf8, Rest/binary >>) ->
  472. is_utf8(Rest);
  473. %% 2 bytes. Codepages C0 and C1 are invalid; fail early.
  474. is_utf8(<< 2#1100000:7, _/bits >>) ->
  475. false;
  476. is_utf8(Incomplete = << 2#110:3, _:5 >>) ->
  477. Incomplete;
  478. %% 3 bytes.
  479. is_utf8(Incomplete = << 2#1110:4, _:4 >>) ->
  480. Incomplete;
  481. is_utf8(Incomplete = << 2#1110:4, _:4, 2#10:2, _:6 >>) ->
  482. Incomplete;
  483. %% 4 bytes. Codepage F4 may have invalid values greater than 0x10FFFF.
  484. is_utf8(<< 2#11110100:8, 2#10:2, High:6, _/bits >>) when High >= 2#10000 ->
  485. false;
  486. is_utf8(Incomplete = << 2#11110:5, _:3 >>) ->
  487. Incomplete;
  488. is_utf8(Incomplete = << 2#11110:5, _:3, 2#10:2, _:6 >>) ->
  489. Incomplete;
  490. is_utf8(Incomplete = << 2#11110:5, _:3, 2#10:2, _:6, 2#10:2, _:6 >>) ->
  491. Incomplete;
  492. %% Invalid.
  493. is_utf8(_) ->
  494. false.
  495. -spec websocket_payload_loop(#state{}, Req, any(),
  496. opcode(), non_neg_integer(), mask_key(), binary(),
  497. non_neg_integer(), rsv())
  498. -> {ok, Req, cowboy_middleware:env()}
  499. | {suspend, module(), atom(), [any()]}
  500. when Req::cowboy_req:req().
  501. websocket_payload_loop(State=#state{socket=Socket, transport=Transport,
  502. messages={OK, Closed, Error}, timeout_ref=TRef},
  503. Req, HandlerState, Opcode, Len, MaskKey, Unmasked, UnmaskedLen, Rsv) ->
  504. Transport:setopts(Socket, [{active, once}]),
  505. receive
  506. {OK, Socket, Data} ->
  507. State2 = handler_loop_timeout(State),
  508. websocket_payload(State2, Req, HandlerState,
  509. Opcode, Len, MaskKey, Unmasked, UnmaskedLen, Data, Rsv);
  510. {Closed, Socket} ->
  511. handler_terminate(State, Req, HandlerState, {error, closed});
  512. {Error, Socket, Reason} ->
  513. handler_terminate(State, Req, HandlerState, {error, Reason});
  514. {timeout, TRef, ?MODULE} ->
  515. websocket_close(State, Req, HandlerState, {normal, timeout});
  516. {timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) ->
  517. websocket_payload_loop(State, Req, HandlerState,
  518. Opcode, Len, MaskKey, Unmasked, UnmaskedLen, Rsv);
  519. Message ->
  520. handler_call(State, Req, HandlerState,
  521. <<>>, websocket_info, Message,
  522. fun (State2, Req2, HandlerState2, _) ->
  523. websocket_payload_loop(State2, Req2, HandlerState2,
  524. Opcode, Len, MaskKey, Unmasked, UnmaskedLen, Rsv)
  525. end)
  526. end.
  527. -spec websocket_dispatch(#state{}, Req, any(), binary(), opcode(), binary())
  528. -> {ok, Req, cowboy_middleware:env()}
  529. | {suspend, module(), atom(), [any()]}
  530. when Req::cowboy_req:req().
  531. %% Continuation frame.
  532. websocket_dispatch(State=#state{frag_state={nofin, Opcode, SoFar}},
  533. Req, HandlerState, RemainingData, 0, Payload) ->
  534. websocket_data(State#state{frag_state={nofin, Opcode,
  535. << SoFar/binary, Payload/binary >>}}, Req, HandlerState, RemainingData);
  536. %% Last continuation frame.
  537. websocket_dispatch(State=#state{frag_state={fin, Opcode, SoFar}},
  538. Req, HandlerState, RemainingData, 0, Payload) ->
  539. websocket_dispatch(State#state{frag_state=undefined}, Req, HandlerState,
  540. RemainingData, Opcode, << SoFar/binary, Payload/binary >>);
  541. %% Text frame.
  542. websocket_dispatch(State, Req, HandlerState, RemainingData, 1, Payload) ->
  543. handler_call(State, Req, HandlerState, RemainingData,
  544. websocket_handle, {text, Payload}, fun websocket_data/4);
  545. %% Binary frame.
  546. websocket_dispatch(State, Req, HandlerState, RemainingData, 2, Payload) ->
  547. handler_call(State, Req, HandlerState, RemainingData,
  548. websocket_handle, {binary, Payload}, fun websocket_data/4);
  549. %% Close control frame.
  550. websocket_dispatch(State, Req, HandlerState, _RemainingData, 8, <<>>) ->
  551. websocket_close(State, Req, HandlerState, {remote, closed});
  552. websocket_dispatch(State, Req, HandlerState, _RemainingData, 8,
  553. << Code:16, Payload/bits >>) ->
  554. websocket_close(State, Req, HandlerState, {remote, Code, Payload});
  555. %% Ping control frame. Send a pong back and forward the ping to the handler.
  556. websocket_dispatch(State=#state{socket=Socket, transport=Transport},
  557. Req, HandlerState, RemainingData, 9, Payload) ->
  558. Len = payload_length_to_binary(byte_size(Payload)),
  559. Transport:send(Socket, << 1:1, 0:3, 10:4, 0:1, Len/bits, Payload/binary >>),
  560. handler_call(State, Req, HandlerState, RemainingData,
  561. websocket_handle, {ping, Payload}, fun websocket_data/4);
  562. %% Pong control frame.
  563. websocket_dispatch(State, Req, HandlerState, RemainingData, 10, Payload) ->
  564. handler_call(State, Req, HandlerState, RemainingData,
  565. websocket_handle, {pong, Payload}, fun websocket_data/4).
  566. -spec handler_call(#state{}, Req, any(), binary(), atom(), any(), fun())
  567. -> {ok, Req, cowboy_middleware:env()}
  568. | {suspend, module(), atom(), [any()]}
  569. when Req::cowboy_req:req().
  570. handler_call(State=#state{handler=Handler}, Req, HandlerState,
  571. RemainingData, Callback, Message, NextState) ->
  572. try Handler:Callback(Message, Req, HandlerState) of
  573. {ok, Req2, HandlerState2} ->
  574. NextState(State, Req2, HandlerState2, RemainingData);
  575. {ok, Req2, HandlerState2, hibernate} ->
  576. NextState(State#state{hibernate=true},
  577. Req2, HandlerState2, RemainingData);
  578. {reply, Payload, Req2, HandlerState2}
  579. when is_list(Payload) ->
  580. case websocket_send_many(Payload, State) of
  581. {ok, State2} ->
  582. NextState(State2, Req2, HandlerState2, RemainingData);
  583. {shutdown, State2} ->
  584. handler_terminate(State2, Req2, HandlerState2,
  585. {normal, shutdown});
  586. {{error, _} = Error, State2} ->
  587. handler_terminate(State2, Req2, HandlerState2, Error)
  588. end;
  589. {reply, Payload, Req2, HandlerState2, hibernate}
  590. when is_list(Payload) ->
  591. case websocket_send_many(Payload, State) of
  592. {ok, State2} ->
  593. NextState(State2#state{hibernate=true},
  594. Req2, HandlerState2, RemainingData);
  595. {shutdown, State2} ->
  596. handler_terminate(State2, Req2, HandlerState2,
  597. {normal, shutdown});
  598. {{error, _} = Error, State2} ->
  599. handler_terminate(State2, Req2, HandlerState2, Error)
  600. end;
  601. {reply, Payload, Req2, HandlerState2} ->
  602. case websocket_send(Payload, State) of
  603. {ok, State2} ->
  604. NextState(State2, Req2, HandlerState2, RemainingData);
  605. {shutdown, State2} ->
  606. handler_terminate(State2, Req2, HandlerState2,
  607. {normal, shutdown});
  608. {{error, _} = Error, State2} ->
  609. handler_terminate(State2, Req2, HandlerState2, Error)
  610. end;
  611. {reply, Payload, Req2, HandlerState2, hibernate} ->
  612. case websocket_send(Payload, State) of
  613. {ok, State2} ->
  614. NextState(State2#state{hibernate=true},
  615. Req2, HandlerState2, RemainingData);
  616. {shutdown, State2} ->
  617. handler_terminate(State2, Req2, HandlerState2,
  618. {normal, shutdown});
  619. {{error, _} = Error, State2} ->
  620. handler_terminate(State2, Req2, HandlerState2, Error)
  621. end;
  622. {shutdown, Req2, HandlerState2} ->
  623. websocket_close(State, Req2, HandlerState2, {normal, shutdown})
  624. catch Class:Reason ->
  625. _ = websocket_close(State, Req, HandlerState, {error, handler}),
  626. erlang:Class([
  627. {reason, Reason},
  628. {mfa, {Handler, Callback, 3}},
  629. {stacktrace, erlang:get_stacktrace()},
  630. {msg, Message},
  631. {req, cowboy_req:to_list(Req)},
  632. {state, HandlerState}
  633. ])
  634. end.
  635. websocket_opcode(text) -> 1;
  636. websocket_opcode(binary) -> 2;
  637. websocket_opcode(close) -> 8;
  638. websocket_opcode(ping) -> 9;
  639. websocket_opcode(pong) -> 10.
  640. -spec websocket_deflate_frame(opcode(), binary(), #state{}) ->
  641. {binary(), rsv(), #state{}}.
  642. websocket_deflate_frame(Opcode, Payload,
  643. State=#state{deflate_frame = DeflateFrame})
  644. when DeflateFrame =:= false orelse Opcode >= 8 ->
  645. {Payload, << 0:3 >>, State};
  646. websocket_deflate_frame(_, Payload, State=#state{deflate_state = Deflate}) ->
  647. Deflated = iolist_to_binary(zlib:deflate(Deflate, Payload, sync)),
  648. DeflatedBodyLength = erlang:size(Deflated) - 4,
  649. Deflated1 = case Deflated of
  650. << Body:DeflatedBodyLength/binary, 0:8, 0:8, 255:8, 255:8 >> -> Body;
  651. _ -> Deflated
  652. end,
  653. {Deflated1, << 1:1, 0:2 >>, State}.
  654. -spec websocket_send(frame(), #state{})
  655. -> {ok, #state{}} | {shutdown, #state{}} | {{error, atom()}, #state{}}.
  656. websocket_send(Type, State=#state{socket=Socket, transport=Transport})
  657. when Type =:= close ->
  658. Opcode = websocket_opcode(Type),
  659. case Transport:send(Socket, << 1:1, 0:3, Opcode:4, 0:8 >>) of
  660. ok -> {shutdown, State};
  661. Error -> {Error, State}
  662. end;
  663. websocket_send(Type, State=#state{socket=Socket, transport=Transport})
  664. when Type =:= ping; Type =:= pong ->
  665. Opcode = websocket_opcode(Type),
  666. {Transport:send(Socket, << 1:1, 0:3, Opcode:4, 0:8 >>), State};
  667. websocket_send({close, Payload}, State) ->
  668. websocket_send({close, 1000, Payload}, State);
  669. websocket_send({Type = close, StatusCode, Payload}, State=#state{
  670. socket=Socket, transport=Transport}) ->
  671. Opcode = websocket_opcode(Type),
  672. Len = 2 + iolist_size(Payload),
  673. %% Control packets must not be > 125 in length.
  674. true = Len =< 125,
  675. BinLen = payload_length_to_binary(Len),
  676. Transport:send(Socket,
  677. [<< 1:1, 0:3, Opcode:4, 0:1, BinLen/bits, StatusCode:16 >>, Payload]),
  678. {shutdown, State};
  679. websocket_send({Type, Payload0}, State=#state{socket=Socket, transport=Transport}) ->
  680. Opcode = websocket_opcode(Type),
  681. {Payload, Rsv, State2} = websocket_deflate_frame(Opcode, iolist_to_binary(Payload0), State),
  682. Len = iolist_size(Payload),
  683. %% Control packets must not be > 125 in length.
  684. true = if Type =:= ping; Type =:= pong ->
  685. Len =< 125;
  686. true ->
  687. true
  688. end,
  689. BinLen = payload_length_to_binary(Len),
  690. {Transport:send(Socket,
  691. [<< 1:1, Rsv/bits, Opcode:4, 0:1, BinLen/bits >>, Payload]), State2}.
  692. -spec websocket_send_many([frame()], #state{})
  693. -> {ok, #state{}} | {shutdown, #state{}} | {{error, atom()}, #state{}}.
  694. websocket_send_many([], State) ->
  695. {ok, State};
  696. websocket_send_many([Frame|Tail], State) ->
  697. case websocket_send(Frame, State) of
  698. {ok, State2} -> websocket_send_many(Tail, State2);
  699. {shutdown, State2} -> {shutdown, State2};
  700. {Error, State2} -> {Error, State2}
  701. end.
  702. -spec websocket_close(#state{}, Req, any(),
  703. {atom(), atom()} | {remote, close_code(), binary()})
  704. -> {ok, Req, cowboy_middleware:env()}
  705. when Req::cowboy_req:req().
  706. websocket_close(State=#state{socket=Socket, transport=Transport},
  707. Req, HandlerState, Reason) ->
  708. case Reason of
  709. {normal, _} ->
  710. Transport:send(Socket, << 1:1, 0:3, 8:4, 0:1, 2:7, 1000:16 >>);
  711. {error, badframe} ->
  712. Transport:send(Socket, << 1:1, 0:3, 8:4, 0:1, 2:7, 1002:16 >>);
  713. {error, badencoding} ->
  714. Transport:send(Socket, << 1:1, 0:3, 8:4, 0:1, 2:7, 1007:16 >>);
  715. {error, handler} ->
  716. Transport:send(Socket, << 1:1, 0:3, 8:4, 0:1, 2:7, 1011:16 >>);
  717. {remote, closed} ->
  718. Transport:send(Socket, << 1:1, 0:3, 8:4, 0:8 >>);
  719. {remote, Code, _} ->
  720. Transport:send(Socket, << 1:1, 0:3, 8:4, 0:1, 2:7, Code:16 >>)
  721. end,
  722. handler_terminate(State, Req, HandlerState, Reason).
  723. -spec handler_terminate(#state{}, Req, any(), atom() | {atom(), atom()})
  724. -> {ok, Req, cowboy_middleware:env()}
  725. when Req::cowboy_req:req().
  726. handler_terminate(#state{env=Env, handler=Handler},
  727. Req, HandlerState, TerminateReason) ->
  728. try
  729. Handler:websocket_terminate(TerminateReason, Req, HandlerState)
  730. catch Class:Reason ->
  731. erlang:Class([
  732. {reason, Reason},
  733. {mfa, {Handler, websocket_terminate, 3}},
  734. {stacktrace, erlang:get_stacktrace()},
  735. {req, cowboy_req:to_list(Req)},
  736. {state, HandlerState},
  737. {terminate_reason, TerminateReason}
  738. ])
  739. end,
  740. {ok, Req, [{result, closed}|Env]}.
  741. -spec payload_length_to_binary(0..16#7fffffffffffffff)
  742. -> << _:7 >> | << _:23 >> | << _:71 >>.
  743. payload_length_to_binary(N) ->
  744. case N of
  745. N when N =< 125 -> << N:7 >>;
  746. N when N =< 16#ffff -> << 126:7, N:16 >>;
  747. N when N =< 16#7fffffffffffffff -> << 127:7, N:64 >>
  748. end.