cow_sse.erl 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. %% Copyright (c) 2017, Loïc Hoguin <essen@ninenines.eu>
  2. %%
  3. %% Permission to use, copy, modify, and/or distribute this software for any
  4. %% purpose with or without fee is hereby granted, provided that the above
  5. %% copyright notice and this permission notice appear in all copies.
  6. %%
  7. %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  8. %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  9. %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  10. %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  11. %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  12. %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  13. %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  14. -module(cow_sse).
  15. -export([init/0]).
  16. -export([parse/2]).
  17. -record(state, {
  18. state_name = bom :: bom | events,
  19. buffer = <<>> :: binary(),
  20. last_event_id = <<>> :: binary(),
  21. event_type = <<>> :: binary(),
  22. data = [] :: iolist(),
  23. retry = undefined :: undefined | non_neg_integer()
  24. }).
  25. -type state() :: #state{}.
  26. -export_type([state/0]).
  27. -type event() :: #{
  28. last_event_id := binary(),
  29. event_type := binary(),
  30. data := iolist()
  31. }.
  32. -spec init() -> state().
  33. init() ->
  34. #state{}.
  35. %% @todo Add a function to retrieve the retry value from the state.
  36. -spec parse(binary(), state())
  37. -> {event, event(), State} | {more, State}.
  38. parse(Data0, State=#state{state_name=bom, buffer=Buffer}) ->
  39. Data1 = case Buffer of
  40. <<>> -> Data0;
  41. _ -> << Buffer/binary, Data0/binary >>
  42. end,
  43. case Data1 of
  44. %% Skip the BOM.
  45. << 16#fe, 16#ff, Data/bits >> ->
  46. parse_event(Data, State#state{state_name=events, buffer= <<>>});
  47. %% Not enough data to know wether we have a BOM.
  48. << 16#fe >> ->
  49. {more, State#state{buffer=Data1}};
  50. <<>> ->
  51. {more, State};
  52. %% No BOM.
  53. _ ->
  54. parse_event(Data1, State#state{state_name=events, buffer= <<>>})
  55. end;
  56. %% Try to process data from the buffer if there is no new input.
  57. parse(<<>>, State=#state{buffer=Buffer}) ->
  58. parse_event(Buffer, State#state{buffer= <<>>});
  59. %% Otherwise process the input data as-is.
  60. parse(Data, State) ->
  61. parse_event(Data, State).
  62. parse_event(Data0, State0=#state{buffer=Buffer}) ->
  63. Data = case Buffer of
  64. <<>> -> Data0;
  65. _ -> << Buffer/binary, Data0/binary >>
  66. end,
  67. case binary:split(Data, [<<"\r\n">>, <<"\r">>, <<"\n">>]) of
  68. [Line, Rest] ->
  69. case parse_line(Line, State0) of
  70. {ok, State} ->
  71. parse_event(Rest, State);
  72. {event, Event, State} ->
  73. {event, Event, State#state{buffer=Rest}}
  74. end;
  75. [_] ->
  76. {more, State0#state{buffer=Data}}
  77. end.
  78. %% Dispatch events on empty line.
  79. parse_line(<<>>, State) ->
  80. dispatch_event(State);
  81. %% Ignore comments.
  82. parse_line(<< $:, _/bits >>, State) ->
  83. {ok, State};
  84. %% Normal line.
  85. parse_line(Line, State) ->
  86. case binary:split(Line, [<<":\s">>, <<":">>]) of
  87. [Field, Value] ->
  88. process_field(Field, Value, State);
  89. [Field] ->
  90. process_field(Field, <<>>, State)
  91. end.
  92. process_field(<<"event">>, Value, State) ->
  93. {ok, State#state{event_type=Value}};
  94. process_field(<<"data">>, Value, State=#state{data=Data}) ->
  95. {ok, State#state{data=[<<$\n>>, Value|Data]}};
  96. process_field(<<"id">>, Value, State) ->
  97. {ok, State#state{last_event_id=Value}};
  98. process_field(<<"retry">>, Value, State) ->
  99. try
  100. {ok, State#state{retry=binary_to_integer(Value)}}
  101. catch _:_ ->
  102. {ok, State}
  103. end;
  104. process_field(_, _, State) ->
  105. {ok, State}.
  106. %% Data is an empty string; abort.
  107. dispatch_event(State=#state{data=[]}) ->
  108. {ok, State#state{event_type= <<>>}};
  109. %% Dispatch the event.
  110. %%
  111. %% Always remove the last linebreak from the data.
  112. dispatch_event(State=#state{last_event_id=LastEventID,
  113. event_type=EventType, data=[_|Data]}) ->
  114. {event, #{
  115. last_event_id => LastEventID,
  116. event_type => case EventType of
  117. <<>> -> <<"message">>;
  118. _ -> EventType
  119. end,
  120. data => lists:reverse(Data)
  121. }, State#state{event_type= <<>>, data=[]}}.
  122. -ifdef(TEST).
  123. parse_example1_test() ->
  124. {event, #{
  125. event_type := <<"message">>,
  126. last_event_id := <<>>,
  127. data := Data
  128. }, State} = parse(<<
  129. "data: YHOO\n"
  130. "data: +2\n"
  131. "data: 10\n"
  132. "\n">>, init()),
  133. <<"YHOO\n+2\n10">> = iolist_to_binary(Data),
  134. {more, _} = parse(<<>>, State),
  135. ok.
  136. parse_example2_test() ->
  137. {event, #{
  138. event_type := <<"message">>,
  139. last_event_id := <<"1">>,
  140. data := Data1
  141. }, State0} = parse(<<
  142. ": test stream\n"
  143. "\n"
  144. "data: first event\n"
  145. "id: 1\n"
  146. "\n"
  147. "data:second event\n"
  148. "id\n"
  149. "\n"
  150. "data: third event\n"
  151. "\n">>, init()),
  152. <<"first event">> = iolist_to_binary(Data1),
  153. {event, #{
  154. event_type := <<"message">>,
  155. last_event_id := <<>>,
  156. data := Data2
  157. }, State1} = parse(<<>>, State0),
  158. <<"second event">> = iolist_to_binary(Data2),
  159. {event, #{
  160. event_type := <<"message">>,
  161. last_event_id := <<>>,
  162. data := Data3
  163. }, State} = parse(<<>>, State1),
  164. <<" third event">> = iolist_to_binary(Data3),
  165. {more, _} = parse(<<>>, State),
  166. ok.
  167. parse_example3_test() ->
  168. {event, #{
  169. event_type := <<"message">>,
  170. last_event_id := <<>>,
  171. data := Data1
  172. }, State0} = parse(<<
  173. "data\n"
  174. "\n"
  175. "data\n"
  176. "data\n"
  177. "\n"
  178. "data:\n">>, init()),
  179. <<>> = iolist_to_binary(Data1),
  180. {event, #{
  181. event_type := <<"message">>,
  182. last_event_id := <<>>,
  183. data := Data2
  184. }, State} = parse(<<>>, State0),
  185. <<"\n">> = iolist_to_binary(Data2),
  186. {more, _} = parse(<<>>, State),
  187. ok.
  188. parse_example4_test() ->
  189. {event, Event, State0} = parse(<<
  190. "data:test\n"
  191. "\n"
  192. "data: test\n"
  193. "\n">>, init()),
  194. {event, Event, State} = parse(<<>>, State0),
  195. {more, _} = parse(<<>>, State),
  196. ok.
  197. -endif.