cow_sse.erl 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. %% Copyright (c) 2017-2018, Loïc Hoguin <essen@ninenines.eu>
  2. %%
  3. %% Permission to use, copy, modify, and/or distribute this software for any
  4. %% purpose with or without fee is hereby granted, provided that the above
  5. %% copyright notice and this permission notice appear in all copies.
  6. %%
  7. %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  8. %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  9. %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  10. %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  11. %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  12. %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  13. %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  14. -module(cow_sse).
  15. -export([init/0]).
  16. -export([parse/2]).
  17. -export([events/1]).
  18. -export([event/1]).
  19. -record(state, {
  20. state_name = bom :: bom | events,
  21. buffer = <<>> :: binary(),
  22. last_event_id = <<>> :: binary(),
  23. last_event_id_set = false :: boolean(),
  24. event_type = <<>> :: binary(),
  25. data = [] :: iolist(),
  26. retry = undefined :: undefined | non_neg_integer()
  27. }).
  28. -type state() :: #state{}.
  29. -export_type([state/0]).
  30. -type parsed_event() :: #{
  31. last_event_id := binary(),
  32. event_type := binary(),
  33. data := iolist()
  34. }.
  35. -type event() :: #{
  36. comment => iodata(),
  37. data => iodata(),
  38. event => iodata() | atom(),
  39. id => iodata(),
  40. retry => non_neg_integer()
  41. }.
  42. -export_type([event/0]).
  43. -spec init() -> state().
  44. init() ->
  45. #state{}.
  46. %% @todo Add a function to retrieve the retry value from the state.
  47. -spec parse(binary(), state())
  48. -> {event, parsed_event(), State} | {more, State}.
  49. parse(Data0, State=#state{state_name=bom, buffer=Buffer}) ->
  50. Data1 = case Buffer of
  51. <<>> -> Data0;
  52. _ -> << Buffer/binary, Data0/binary >>
  53. end,
  54. case Data1 of
  55. %% Skip the BOM.
  56. << 16#fe, 16#ff, Data/bits >> ->
  57. parse_event(Data, State#state{state_name=events, buffer= <<>>});
  58. %% Not enough data to know wether we have a BOM.
  59. << 16#fe >> ->
  60. {more, State#state{buffer=Data1}};
  61. <<>> ->
  62. {more, State};
  63. %% No BOM.
  64. _ ->
  65. parse_event(Data1, State#state{state_name=events, buffer= <<>>})
  66. end;
  67. %% Try to process data from the buffer if there is no new input.
  68. parse(<<>>, State=#state{buffer=Buffer}) ->
  69. parse_event(Buffer, State#state{buffer= <<>>});
  70. %% Otherwise process the input data as-is.
  71. parse(Data0, State=#state{buffer=Buffer}) ->
  72. Data = case Buffer of
  73. <<>> -> Data0;
  74. _ -> << Buffer/binary, Data0/binary >>
  75. end,
  76. parse_event(Data, State).
  77. parse_event(Data, State0) ->
  78. case binary:split(Data, [<<"\r\n">>, <<"\r">>, <<"\n">>]) of
  79. [Line, Rest] ->
  80. case parse_line(Line, State0) of
  81. {ok, State} ->
  82. parse_event(Rest, State);
  83. {event, Event, State} ->
  84. {event, Event, State#state{buffer=Rest}}
  85. end;
  86. [_] ->
  87. {more, State0#state{buffer=Data}}
  88. end.
  89. %% Dispatch events on empty line.
  90. parse_line(<<>>, State) ->
  91. dispatch_event(State);
  92. %% Ignore comments.
  93. parse_line(<< $:, _/bits >>, State) ->
  94. {ok, State};
  95. %% Normal line.
  96. parse_line(Line, State) ->
  97. case binary:split(Line, [<<":\s">>, <<":">>]) of
  98. [Field, Value] ->
  99. process_field(Field, Value, State);
  100. [Field] ->
  101. process_field(Field, <<>>, State)
  102. end.
  103. process_field(<<"event">>, Value, State) ->
  104. {ok, State#state{event_type=Value}};
  105. process_field(<<"data">>, Value, State=#state{data=Data}) ->
  106. {ok, State#state{data=[<<$\n>>, Value|Data]}};
  107. process_field(<<"id">>, Value, State) ->
  108. {ok, State#state{last_event_id=Value, last_event_id_set=true}};
  109. process_field(<<"retry">>, Value, State) ->
  110. try
  111. {ok, State#state{retry=binary_to_integer(Value)}}
  112. catch _:_ ->
  113. {ok, State}
  114. end;
  115. process_field(_, _, State) ->
  116. {ok, State}.
  117. %% Data is an empty string; abort.
  118. dispatch_event(State=#state{last_event_id_set=false, data=[]}) ->
  119. {ok, State#state{event_type= <<>>}};
  120. %% Data is an empty string but we have a last_event_id:
  121. %% propagate it on its own so that the caller knows the
  122. %% most recent ID.
  123. dispatch_event(State=#state{last_event_id=LastEventID, data=[]}) ->
  124. {event, #{
  125. last_event_id => LastEventID
  126. }, State#state{last_event_id_set=false, event_type= <<>>}};
  127. %% Dispatch the event.
  128. %%
  129. %% Always remove the last linebreak from the data.
  130. dispatch_event(State=#state{last_event_id=LastEventID,
  131. event_type=EventType, data=[_|Data]}) ->
  132. {event, #{
  133. last_event_id => LastEventID,
  134. event_type => case EventType of
  135. <<>> -> <<"message">>;
  136. _ -> EventType
  137. end,
  138. data => lists:reverse(Data)
  139. }, State#state{last_event_id_set=false, event_type= <<>>, data=[]}}.
  140. -ifdef(TEST).
  141. parse_example1_test() ->
  142. {event, #{
  143. event_type := <<"message">>,
  144. last_event_id := <<>>,
  145. data := Data
  146. }, State} = parse(<<
  147. "data: YHOO\n"
  148. "data: +2\n"
  149. "data: 10\n"
  150. "\n">>, init()),
  151. <<"YHOO\n+2\n10">> = iolist_to_binary(Data),
  152. {more, _} = parse(<<>>, State),
  153. ok.
  154. parse_example2_test() ->
  155. {event, #{
  156. event_type := <<"message">>,
  157. last_event_id := <<"1">>,
  158. data := Data1
  159. }, State0} = parse(<<
  160. ": test stream\n"
  161. "\n"
  162. "data: first event\n"
  163. "id: 1\n"
  164. "\n"
  165. "data:second event\n"
  166. "id\n"
  167. "\n"
  168. "data: third event\n"
  169. "\n">>, init()),
  170. <<"first event">> = iolist_to_binary(Data1),
  171. {event, #{
  172. event_type := <<"message">>,
  173. last_event_id := <<>>,
  174. data := Data2
  175. }, State1} = parse(<<>>, State0),
  176. <<"second event">> = iolist_to_binary(Data2),
  177. {event, #{
  178. event_type := <<"message">>,
  179. last_event_id := <<>>,
  180. data := Data3
  181. }, State} = parse(<<>>, State1),
  182. <<" third event">> = iolist_to_binary(Data3),
  183. {more, _} = parse(<<>>, State),
  184. ok.
  185. parse_example3_test() ->
  186. {event, #{
  187. event_type := <<"message">>,
  188. last_event_id := <<>>,
  189. data := Data1
  190. }, State0} = parse(<<
  191. "data\n"
  192. "\n"
  193. "data\n"
  194. "data\n"
  195. "\n"
  196. "data:\n">>, init()),
  197. <<>> = iolist_to_binary(Data1),
  198. {event, #{
  199. event_type := <<"message">>,
  200. last_event_id := <<>>,
  201. data := Data2
  202. }, State} = parse(<<>>, State0),
  203. <<"\n">> = iolist_to_binary(Data2),
  204. {more, _} = parse(<<>>, State),
  205. ok.
  206. parse_example4_test() ->
  207. {event, Event, State0} = parse(<<
  208. "data:test\n"
  209. "\n"
  210. "data: test\n"
  211. "\n">>, init()),
  212. {event, Event, State} = parse(<<>>, State0),
  213. {more, _} = parse(<<>>, State),
  214. ok.
  215. parse_id_without_data_test() ->
  216. {event, Event1, State0} = parse(<<
  217. "id: 1\n"
  218. "\n"
  219. "data: data\n"
  220. "\n"
  221. "id: 2\n"
  222. "\n">>, init()),
  223. 1 = maps:size(Event1),
  224. #{last_event_id := <<"1">>} = Event1,
  225. {event, #{
  226. event_type := <<"message">>,
  227. last_event_id := <<"1">>,
  228. data := Data
  229. }, State1} = parse(<<>>, State0),
  230. <<"data">> = iolist_to_binary(Data),
  231. {event, Event2, State} = parse(<<>>, State1),
  232. 1 = maps:size(Event2),
  233. #{last_event_id := <<"2">>} = Event2,
  234. {more, _} = parse(<<>>, State),
  235. ok.
  236. parse_repeated_id_without_data_test() ->
  237. {event, Event1, State0} = parse(<<
  238. "id: 1\n"
  239. "\n"
  240. "event: message\n" %% This will be ignored since there's no data.
  241. "\n"
  242. "id: 1\n"
  243. "\n"
  244. "id: 2\n"
  245. "\n">>, init()),
  246. {event, Event1, State1} = parse(<<>>, State0),
  247. 1 = maps:size(Event1),
  248. #{last_event_id := <<"1">>} = Event1,
  249. {event, Event2, State} = parse(<<>>, State1),
  250. 1 = maps:size(Event2),
  251. #{last_event_id := <<"2">>} = Event2,
  252. {more, _} = parse(<<>>, State),
  253. ok.
  254. parse_split_event_test() ->
  255. {more, State} = parse(<<
  256. "data: AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
  257. "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
  258. "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA">>, init()),
  259. {event, _, _} = parse(<<"==\n\n">>, State),
  260. ok.
  261. -endif.
  262. -spec events([event()]) -> iolist().
  263. events(Events) ->
  264. [event(Event) || Event <- Events].
  265. -spec event(event()) -> iolist().
  266. event(Event) ->
  267. [
  268. event_comment(Event),
  269. event_id(Event),
  270. event_name(Event),
  271. event_data(Event),
  272. event_retry(Event),
  273. $\n
  274. ].
  275. event_comment(#{comment := Comment}) ->
  276. prefix_lines(Comment, <<>>);
  277. event_comment(_) ->
  278. [].
  279. event_id(#{id := ID}) ->
  280. nomatch = binary:match(iolist_to_binary(ID), <<"\n">>),
  281. [<<"id: ">>, ID, $\n];
  282. event_id(_) ->
  283. [].
  284. event_name(#{event := Name0}) ->
  285. Name = if
  286. is_atom(Name0) -> atom_to_binary(Name0, utf8);
  287. true -> iolist_to_binary(Name0)
  288. end,
  289. nomatch = binary:match(Name, <<"\n">>),
  290. [<<"event: ">>, Name, $\n];
  291. event_name(_) ->
  292. [].
  293. event_data(#{data := Data}) ->
  294. prefix_lines(Data, <<"data">>);
  295. event_data(_) ->
  296. [].
  297. event_retry(#{retry := Retry}) ->
  298. [<<"retry: ">>, integer_to_binary(Retry), $\n];
  299. event_retry(_) ->
  300. [].
  301. prefix_lines(IoData, Prefix) ->
  302. Lines = binary:split(iolist_to_binary(IoData), <<"\n">>, [global]),
  303. [[Prefix, <<": ">>, Line, $\n] || Line <- Lines].
  304. -ifdef(TEST).
  305. event_test() ->
  306. _ = event(#{}),
  307. _ = event(#{comment => "test"}),
  308. _ = event(#{data => "test"}),
  309. _ = event(#{data => "test\ntest\ntest"}),
  310. _ = event(#{data => "test\ntest\ntest\n"}),
  311. _ = event(#{data => <<"test\ntest\ntest">>}),
  312. _ = event(#{data => [<<"test">>, $\n, <<"test">>, [$\n, "test"]]}),
  313. _ = event(#{event => test}),
  314. _ = event(#{event => "test"}),
  315. _ = event(#{id => "test"}),
  316. _ = event(#{retry => 5000}),
  317. _ = event(#{event => "test", data => "test"}),
  318. _ = event(#{id => "test", event => "test", data => "test"}),
  319. ok.
  320. -endif.